Hi guys, this is the continuation for Connect Salesforce Pub/Sub API with .NET – Part I
In this part of tutorial I am going to show you how to connect and subscribe with the salesforce pub/sub api, so, before to start I am going to list some requirements to get accomplish this task.
- Configure one connected app on salesforce – Salesforce Documentation
- Configure one platform event on salesforce – Salesforce Documentation
The first topic on the list is necessary to get an access token from salesforce oauth endpoint.
The second topic on the list is necessary to subscribe to that platform event and test the connection and subscription from .NET
I’m not going to delve into these topics because they are beyond the scope of this tutorial.
How to get an access token from salesforce
When we have configured a connected app we can get an access token with C# following the Salesforce Documentation so I created some DTOs to work with the salesforce auth data structures.
AuthResponse.cs
public class AuthResponse { [JsonProperty("access_token")] public string AccessToken { get; set; } [JsonProperty("instance_url")] public string InstanceUrl { get; set; } [JsonProperty("id")] public string Id { get; set; } [JsonProperty("token_type")] public string TokenType { get; set; } [JsonProperty("issued_at")] public string IssuedAt { get; set; } [JsonProperty("signature")] public string Signature { get; set; } }
With the DTO for salesforce auth, we can construct the HTTP Client to get access token:
SalesforceHttpClient.cs
public class SalesforceHttpClient { public string LoginEndpoint = "https://login.salesforce.com/services/oauth2/token"; public string Username = ""; //change with your data public string Password = "";//change with your data public string Token = ""; public string ClientId = "";//change with your data public string ClientSecret = "";//change with your data public string GrantType = "password"; public readonly HttpClient client; public SalesforceHttpClient() { client = new HttpClient(); } public async Task<AuthResponse> GetToken() { HttpContent formData= new FormUrlEncodedContent(new Dictionary<string, string> { { "username",Username }, { "password", Password }, { "client_id", ClientId }, { "client_secret", ClientSecret }, { "grant_type", GrantType } }); HttpRequestMessage requestMessage = new HttpRequestMessage(); requestMessage.Content = formData; requestMessage.Method = HttpMethod.Post; requestMessage.RequestUri = new Uri(LoginEndpoint); HttpResponseMessage httpResponse = await client.SendAsync(requestMessage); var content = await httpResponse.Content.ReadAsStringAsync(); AuthResponse authResponse = JsonConvert.DeserializeObject<AuthResponse>(content)!; return authResponse; } }
With these classes we can call the salesforce auth endpoint and get a valid access token
SalesforceHttpClient salesforceClient = new SalesforceHttpClient(); var auth = await salesforceClient.GetToken();
Executing the code above we should get something similar to this
Preparing data to connect with Salesforce Pub/Sub API
In this step we need to check the .proto file that we used in the previous tutorial to check what we need to send to the pub/sub client.
As is show in the next image the pub/sub client needs to use: access token, instanceurl and tenantid; and this information should be sent as metadata headers for grpc.
So, we need to create a metadata object and set this information to use it in the client:
string tenantId = ""; Metadata metadata = new Metadata{ {"accesstoken", auth.AccessToken}, { "instanceurl", auth.InstanceUrl}, { "tenantid", tenantId} };
We can get the tenantId following these steps:
- For Classic: Click on Setup | Under Administer | Company Profile | Company Information
- For Lightning: Click on Gear Icon | Setup | Company Settings | Company information
Search for Salesforce.com Organization Id after performing the above steps.
Alternatively, your Organization ID can be found via the Salesforce Help Portal.
1. Log In to the Salesforce Help
2. Scroll to the Support & Services tile and view your Organization ID
Creating the Pub/Sub client
First we need to create a new class, in my case I have created a class called SalesforcePubSubClient, in this class we are going to create some properties needed for the client and initialize them in the class constructor.
public class SalesforcePubSubClient { private readonly PubSub.PubSubClient _client; private readonly Metadata _metadata; private readonly ILogger _logger; public SalesforcePubSubClient(string address, Metadata metadata, ILogger logger) { var channelSalesforce = GrpcChannel.ForAddress(address); _client = new PubSub.PubSubClient(channelSalesforce); _metadata = metadata; _logger = logger; } }
_client is an instance of the PubSubClient generated in the part I of this tutorial.
_metadata is an instance of GRPC Metadata class.
_logger is an instance of logger to log messages on console.
The class constructor is receiving three parameters
address: the address of salesforce pub/sub server – Global Endpoint (https://api.pubsub.salesforce.com:7443)
metadata: it contains the auth info from salesforce oauth
logger: logger instance
To initialize the PubSubClient instance we are creating a GrpcChannel with the input address and sending it as parameter of PubSubClient
GetTopicByName and GetSchemaById
To subscribe to the salesforce pub sub/api we need to get the a Topic from PubSubClient and with this Topic we can obtain the SchemaId to retrieve the Schema and convert it to json to deserialize the Avro Response from pub/sub API subscription, so I created two methods:
public TopicInfo GetTopicByName(string platformEventName) { TopicRequest topicRequest = new TopicRequest { TopicName = platformEventName }; return _client.GetTopic(request: topicRequest, headers: _metadata); } public SchemaInfo GetSchemaById(string schemaId) { SchemaRequest schemaRequest = new SchemaRequest { SchemaId = schemaId }; return _client.GetSchema(request: schemaRequest, headers: _metadata); }
Subscribe Method
Once we have the topic and schema we can construct our subscribe method to listen new incoming events from salesforce pub/sub api
First of all we should know that the salesforce pub/sub api implements a grpc server, so we are going to work with grpc concepts.
If we look at the proto file for subscribe method:
We can see that the subscribe method is implementing a bidirectional streaming communication, in this case we need to send a FetchRequest and when we are subscribed to the channel we will receive a FetchResponse with each new event coming from salesforce pub/sub api.
gRPC supports duplex communications by means of streams. We can mark either a request or a response as a stream and can write/read from the stream until it is closed.
I this case to subscribe we are going to use Full Duplex Communication
Full Duplex communication – where both the client and server send in continuous flow of data over time
Understanding the concept that we are going to use, here is the subscribe method:
public async Task Subscribe(string platformEventName, string jsonSchema) { var source = new CancellationTokenSource(); try { using AsyncDuplexStreamingCall<FetchRequest, FetchResponse> stream = _client.Subscribe(headers: _metadata, cancellationToken: source.Token); FetchRequest fetchRequest = new FetchRequest { TopicName = platformEventName, ReplayPreset = ReplayPreset.Latest, NumRequested = 2 }; await WriteToStream(stream.RequestStream, fetchRequest); await ReadFromStream(stream.ResponseStream, jsonSchema, source); } catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) { _logger.LogError($"Operation Cancelled: {e.Message} Source {e.Source} {e.StackTrace}"); throw; } }
To subscribe we are creating a FetchRequest with the platformEventName, to understand the FetchRequest object we can read the official documentation
Additional to this we need to create two more methods:
WriteToStream: Stream for FetchRequest
ReadFromStream: Stream for FetchResponse
public async Task WriteToStream(IClientStreamWriter<FetchRequest> requestStream, FetchRequest fetchRequest) { await requestStream.WriteAsync(fetchRequest); } public async Task ReadFromStream(IAsyncStreamReader<FetchResponse> responseStream, string jsonSchema, CancellationTokenSource source = null) { while (await responseStream.MoveNext()) { _logger.LogInformation($"Time: {DateTime.Now} RPC ID: {responseStream.Current.RpcId}"); if (responseStream.Current.Events != null) { Console.WriteLine($"Number of events received: {responseStream.Current.Events.Count}"); foreach (var item in responseStream.Current.Events) { byte[] bytePayload = item.Event.Payload.ToByteArray(); string jsonPayload = AvroConvert.Avro2Json(bytePayload, jsonSchema); _logger.LogInformation($"response: {jsonPayload}"); } } else { Console.WriteLine($"{DateTime.Now} Subscription is active"); } } }
Client Execution
Finally here is the code to run the subscription method:
using Grpc.Core; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using SalesforcePubSub.Client.Utils; using System; #region Add Console Logger ServiceProvider serviceProvider = new ServiceCollection() .AddLogging((loggingBuilder) => loggingBuilder .SetMinimumLevel(LogLevel.Trace) .AddConsole() ) .BuildServiceProvider(); var logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger<Program>(); logger.LogInformation("Testing Salesforce Pub/Sub API with .NET"); #endregion #region Add Json Settings var builder = new ConfigurationBuilder() .AddJsonFile($"local.settings.json"); var config = builder.Build(); #endregion #region Get Salesforce Access Token SalesforceHttpClient salesforceClient = new SalesforceHttpClient(config); var auth = await salesforceClient.GetToken(); string tenantId = config.GetSection("salesforce:tenant_id").Value!; #endregion #region Call Salesforce Pub/Sub Client Metadata metadata = new Metadata{ { "accesstoken", auth.AccessToken}, { "instanceurl", auth.InstanceUrl}, { "tenantid", tenantId} }; string pubSubEndpoint = config.GetSection("salesforce:pub_sub_endpoint").Value!; string platformEventName = config.GetSection("salesforce:platform_event_name").Value!; SalesforcePubSubClient salesforcePubSubClient = new SalesforcePubSubClient(pubSubEndpoint, metadata, logger); var topic = salesforcePubSubClient.GetTopicByName(platformEventName); var schema = salesforcePubSubClient.GetSchemaById(topic.SchemaId); while (true) { await salesforcePubSubClient.Subscribe(platformEventName, schema.SchemaJson); }
The result of this execution is showing in the next image
I hope this post help you on your activities, don’t forget to check my new posts.
If you want to download the code, you can help me with a donation and I will send you the code, donations help me to pay for the infrastructure of this blog.
Best Wishes!
Great article. This is exactly what I have been trying to unsuccessfully do for a while. I have now successfully subscribed to a Salesforce Platform event, and debugged the results.
I have one question though. In the ReadFromStream() method, item.ReplayId is of type Google.Protobuf.ButeString. I haven’t been able to convert this to a string. Have tried many ways, including the below, but it just results in something like “\0\0\0\0\0\u001fEv\0\0”.
Encoding.UTF8.GetString(item.ReplayId.ToArray())
Encoding.UTF8.GetString(item.ReplayId.ToByteArray(), 0, 10)
Any ideas?
Hi John, you can use:
byte[] bRepId = item.ReplayId.ToByteArray();
BigInteger repId = new BigInteger(bRepId, true, true );
Hello Juan, excellent and helpful article. Strangely, following all of your suggestions in this article and doing some additional reading and some trial & error, I seem to still get this same outcome & message, whether I use a standard default topic of “/data/ChangeEvents” or “/data/AccountChangeEvent” or even a custom event I’ve configured “/event/TestMe__e” always yields an exception and status similar to “The topic information for org CORE/prod/00D780000008xUr and topic /event/data/ChangeEvents is unavailable. Try again later. rpcId: f51da66a-9237-42dc-8e97-c7f2ce055fd9”
Authentication and all the usual easy to fix issues have been solved. Even watching the conversation with a protocol analyzer shows connectivity to the server at https://api.pubsub.salesforce.com:7443 is OK. What kind of donation are you looking for to take a look at your complete source? 🙂
Hi Brian, the donation is voluntary, you can donate the value you want using the paypal button, and then I send you the compressed code to the email of your choice.
Hi Juan,
You’re a legend!
I’ve been working using cometd and now Pub/Sub for SalesForce using gRPC is something I have been really trying to get working. Thank you for you great article, I’m almost there.
I have everything working upto the point of obtaining a schemaID. The error I receive is ->
The schema ID cannot be blank. rpcId: bdd0b51a-7a6c-488a-a5c0-e8ceeb4f8118
I am trying to subscribe to all ChangeEvents. I am using a developer instance of SalesForce (trial period) so I can learn how this all works.
string pubSubEndpoint = “https://api.pubsub.salesforce.com:443”;
string platformEventName = “/data/ChangeEvents”;
Any assistance would be grealy appreciated on why I receive a null schemaId. Thank you in advance.
Ned
Try checking the event name and check the response from the GetTopicByName method, I think you are getting a null or empty response from that method, it could be possible that you are not configuring the data change events correctly, try to check this configuration https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_select_objects.htm
In your final section you have
SalesforceHttpClient salesforceClient = new SalesforceHttpClient(config);
But your SalesforceHttpClient has no constructor that takes a parameter?
Hi Cliff, yes, that was an improvement to receive the config in SalesforceHttpClient
#region Add Json Settings
var builder = new ConfigurationBuilder()
.AddJsonFile($”local.settings.json”);
var config = builder.Build();