From 7813a26be7688f1fd48eb3f63cb2e54cf4a2ab71 Mon Sep 17 00:00:00 2001 From: Jason Alaya Date: Fri, 19 Jun 2026 10:47:19 -0700 Subject: [PATCH 1/5] Add OAuth2 client-credentials auth and stream auth-refresh (orgJWT) for Subscribe --- java/README.md | 1 + .../main/java/genericpubsub/Subscribe.java | 78 +- java/src/main/java/utility/CommonContext.java | 92 +- .../java/utility/ExampleConfigurations.java | 20 + .../utility/OAuthClientCredSessionFlow.java | 109 +++ java/src/main/proto/pubsub_api.proto | 813 +++++++++--------- java/src/main/resources/arguments.yaml | 12 +- 7 files changed, 705 insertions(+), 420 deletions(-) create mode 100644 java/src/main/java/utility/OAuthClientCredSessionFlow.java diff --git a/java/README.md b/java/README.md index 41be6df..0ad371d 100644 --- a/java/README.md +++ b/java/README.md @@ -38,6 +38,7 @@ In the `src/main` directory of the project, you will find several sub-directorie * `LOGIN_URL`: Specify the login url of the Salesforce org being used to run the examples. * `USERNAME` & `PASSWORD`: For authentication via username and password, you will need to specify the username and password of the Salesforce org. * `ACCESS_TOKEN` & `TENANT_ID`: For authentication via session token and tenant ID, you will need to specify the sessionToken and tenant ID of the Salesforce org. + * `CONSUMER_KEY` & `CONSUMER_SECRET` & `TENANT_ID`: For authentication via the OAuth2 client credentials flow, specify the consumer key and consumer secret of the org's External Client App (or Connected App) along with the tenant ID. When the app is configured to issue JWT-based (orgJWT) access tokens, the `Subscribe` example periodically mints a fresh token and sends it to the server via the `auth_refresh` field of a `FetchRequest` to keep a long-lived subscription stream alive. See the `refreshAuth()` method in `Subscribe.java` and the `scheduleAuthRefresh()`/`getFreshAuthToken()` helpers in `CommonContext.java`. * When using managed event subscriptions (beta), one of these configurations is required. * `MANAGED_SUB_DEVELOPER_NAME`: Specify the developer name of ManagedEventSubscription. This parameter is used in ManagedSubscribe.java. * `MANAGED_SUB_ID`: Specify the ID of the ManagedEventSubscription Tooling API record. This parameter is used in ManagedSubscribe.java. diff --git a/java/src/main/java/genericpubsub/Subscribe.java b/java/src/main/java/genericpubsub/Subscribe.java index b07bdb3..08e2282 100644 --- a/java/src/main/java/genericpubsub/Subscribe.java +++ b/java/src/main/java/genericpubsub/Subscribe.java @@ -22,7 +22,9 @@ * A single-topic subscriber that consumes events using Event Bus API Subscribe RPC. The example demonstrates how to: * - implement a long-lived subscription to a single topic * - a basic flow control strategy - * - a basic retry strategy. + * - a basic retry strategy + * - a periodic auth refresh strategy to keep the stream alive when authenticating via the OAuth2 + * client credentials flow (which may issue short-lived orgJWT access tokens). See {@link #refreshAuth()}. * * Example: * ./run.sh genericpubsub.Subscribe @@ -40,6 +42,13 @@ public class Subscribe extends CommonContext { public static ExampleConfigurations exampleConfigurations; public static AtomicBoolean isActive = new AtomicBoolean(false); public static AtomicInteger retriesLeft = new AtomicInteger(MAX_RETRIES); + // serverStream is written/read from multiple threads: the main thread and retry scheduler thread + // (fetch()), the gRPC response-callback thread (fetchMore()), and the auth-refresh scheduler thread + // (refreshAuth()). gRPC forbids concurrent onNext()/onCompleted() on the same call, so ALL access is + // serialized on streamLock. streamActive (guarded by streamLock) tracks whether the current stream is + // open, so we never call onNext() after onCompleted(). + private final Object streamLock = new Object(); + private boolean streamActive = false; private StreamObserver serverStream; private Map schemaCache = new ConcurrentHashMap<>(); private AtomicInteger receivedEvents = new AtomicInteger(0); @@ -83,6 +92,10 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver { + Thread thread = new Thread(runnable, "auth-refresh-thread"); + thread.setDaemon(true); + return thread; + }; + private final ScheduledExecutorService authRefreshScheduler = + Executors.newScheduledThreadPool(1, authRefreshThreadFactory); + protected String tenantGuid; protected String busTopicName; protected TopicInfo topicInfo; @@ -119,8 +134,24 @@ public CallCredentials setupCallCredentials(ExampleConfigurations options) { close(); throw new IllegalArgumentException("cannot log in with username/password", e); } + } else if (options.getConsumerKey() != null && options.getConsumerSecret() != null) { + if (options.getTenantId() == null || options.getTenantId().isEmpty()) { + close(); + throw new IllegalArgumentException("Please provide a Tenant ID for the OAuth2 client credentials flow"); + } + try { + oAuthClientCredSessionFlow = new OAuthClientCredSessionFlow(httpClient, + options.getLoginUrl(), + options.getTenantId(), + options.getConsumerKey(), + options.getConsumerSecret()); + return oAuthClientCredSessionFlow.loginWithAccessToken(); + } catch (Exception e) { + close(); + throw new IllegalArgumentException("Unable to obtain OAuth2 token", e); + } } else { - logger.warn("Please use either username/password or session token for authentication"); + logger.warn("Please use username/password, session token, or OAuth2 client credentials for authentication"); close(); return null; } @@ -165,6 +196,60 @@ protected void setupTopicDetails(final String topicName, final boolean pubOrSubM } } + /** + * Schedules a periodic auth refresh to keep a long-lived bidirectional stream alive. + * + * This is necessary when authenticating via the OAuth2 client credentials flow, since the + * External Client App may issue short-lived orgJWT access tokens that expire while the stream + * is open. Subscribers that maintain a stream (e.g. Subscribe) should call this after starting + * the subscription, and override {@link #refreshAuth()} to send the refreshed token to the + * server over their stream. Other auth methods (username/password, session token) do not need + * to refresh, so this is a no-op for them. + */ + protected void scheduleAuthRefresh() { + if (oAuthClientCredSessionFlow != null) { + authRefreshScheduler.scheduleAtFixedRate(this::refreshAuth, 10, 60, TimeUnit.SECONDS); + } + } + + /** + * Stops the periodic auth refresh and waits for any in-flight refreshAuth() to finish. This is + * idempotent. Streaming examples that close their own stream should call this BEFORE completing the + * stream, so a scheduled refreshAuth() cannot call onNext() on an already-completed stream. It is + * also called from {@link #close()} as a safety net. + */ + protected void stopAuthRefresh() { + authRefreshScheduler.shutdownNow(); + try { + authRefreshScheduler.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting to stop auth refresh scheduler", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Mints a fresh auth token for use in {@code auth_refresh} requests on bidirectional streams. + * + * @return a fresh token string, or null if auth refresh is not applicable for the current auth method + */ + protected String getFreshAuthToken() { + if (oAuthClientCredSessionFlow != null) { + APISessionCredentials freshCreds = oAuthClientCredSessionFlow.loginWithAccessToken(); + return freshCreds.getToken(); + } + return null; + } + + /** + * Sends a refreshed auth token to the server over the example's stream. The base implementation + * is a no-op; streaming examples that support auth refresh (e.g. Subscribe) override this to put + * the fresh token on their request stream via {@code FetchRequest.setAuthRefresh}. + */ + protected void refreshAuth() { + // No-op by default. See Subscribe#refreshAuth for an example implementation. + } + /** * Helper function to convert the replayId in long to ByteString type. * @@ -362,6 +447,11 @@ public String getSessionToken() { */ @Override public void close() { + // Stop the auth refresh scheduler first so a scheduled refreshAuth() cannot fire against an + // already-stopped HTTP client or a shutting-down gRPC channel. Idempotent: subclasses that manage + // their own stream (e.g. Subscribe) may have already called stopAuthRefresh() before this. + stopAuthRefresh(); + if (httpClient != null) { try { httpClient.stop(); diff --git a/java/src/main/java/utility/ExampleConfigurations.java b/java/src/main/java/utility/ExampleConfigurations.java index ee0128a..7e020da 100644 --- a/java/src/main/java/utility/ExampleConfigurations.java +++ b/java/src/main/java/utility/ExampleConfigurations.java @@ -21,6 +21,8 @@ public class ExampleConfigurations { private String loginUrl; private String tenantId; private String accessToken; + private String consumerKey; + private String consumerSecret; private String pubsubHost; private Integer pubsubPort; private String topic; @@ -57,6 +59,8 @@ public ExampleConfigurations(String filename) throws IOException { this.topic = obj.get("TOPIC") == null ? "/event/Order_Event__e" : obj.get("TOPIC").toString(); this.tenantId = obj.get("TENANT_ID") == null ? null : obj.get("TENANT_ID").toString(); this.accessToken = obj.get("ACCESS_TOKEN") == null ? null : obj.get("ACCESS_TOKEN").toString(); + this.consumerKey = obj.get("CONSUMER_KEY") == null ? null : obj.get("CONSUMER_KEY").toString(); + this.consumerSecret = obj.get("CONSUMER_SECRET") == null ? null : obj.get("CONSUMER_SECRET").toString(); this.numberOfEventsToPublish = obj.get("NUMBER_OF_EVENTS_TO_PUBLISH") == null ? 5 : Integer.parseInt(obj.get("NUMBER_OF_EVENTS_TO_PUBLISH").toString()); this.singlePublishRequest = obj.get("SINGLE_PUBLISH_REQUEST") == null ? @@ -156,6 +160,22 @@ public void setAccessToken(String accessToken) { this.accessToken = accessToken; } + public String getConsumerKey() { + return consumerKey; + } + + public void setConsumerKey(String consumerKey) { + this.consumerKey = consumerKey; + } + + public String getConsumerSecret() { + return consumerSecret; + } + + public void setConsumerSecret(String consumerSecret) { + this.consumerSecret = consumerSecret; + } + public String getPubsubHost() { return pubsubHost; } diff --git a/java/src/main/java/utility/OAuthClientCredSessionFlow.java b/java/src/main/java/utility/OAuthClientCredSessionFlow.java new file mode 100644 index 0000000..78d5f87 --- /dev/null +++ b/java/src/main/java/utility/OAuthClientCredSessionFlow.java @@ -0,0 +1,109 @@ +package utility; + +import java.net.URL; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.FormContentProvider; +import org.eclipse.jetty.util.Fields; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OAuth2 client credentials flow used to obtain access tokens for Pub/Sub API. + * + * The access token returned by this flow can be either an opaque session token or a JWT-based + * (orgJWT) token. The token type is determined by the External Client App configuration in the + * Salesforce org. JWT-based access tokens are short-lived and must be refreshed periodically while + * a subscription stays open; the {@link CommonContext#scheduleAuthRefresh()} helper drives that + * refresh by minting a fresh token via {@link #loginWithAccessToken()} and sending it back to the + * server over the bidirectional stream as an {@code auth_refresh}. + */ +public class OAuthClientCredSessionFlow { + private static final Logger LOGGER = LoggerFactory.getLogger(OAuthClientCredSessionFlow.class); + + private static final String OAUTH2_TOKEN_ENDPOINT = "/services/oauth2/token"; + private final HttpClient httpClient; + private final String loginEndpoint; + private final String tenantId; + private final String clientId; + private final String clientSecret; + + public OAuthClientCredSessionFlow(HttpClient httpClient, String loginEndpoint, String tenantId, String clientId, + String clientSecret) { + this.httpClient = httpClient; + this.loginEndpoint = loginEndpoint; + this.tenantId = tenantId; + this.clientId = clientId; + this.clientSecret = clientSecret; + } + + /** + * Performs the OAuth2 client credentials exchange and returns gRPC call credentials populated + * with the freshly minted access token and the instance URL returned by the token endpoint. + */ + public APISessionCredentials loginWithAccessToken() { + JSONObject responseJson = requestOAuth2Token(loginEndpoint); + Object accessToken = responseJson.get("access_token"); + Object instanceUrl = responseJson.get("instance_url"); + if (accessToken == null || instanceUrl == null) { + throw new RuntimeException( + "OAuth2 token response is missing access_token or instance_url: " + responseJson); + } + LOGGER.debug("created OAuth2 session token credentials for tenant {} from {}", tenantId, instanceUrl); + return new APISessionCredentials(tenantId, instanceUrl.toString(), accessToken.toString()); + } + + /** + * Makes an HTTP POST request to the OAuth2 token endpoint and returns the parsed JSON response. + * + * @param loginEndpoint The OAuth2 token endpoint base URL + * @return JSONObject containing the OAuth2 token response + * @throws RuntimeException if the request fails or the response cannot be parsed + */ + private JSONObject requestOAuth2Token(String loginEndpoint) { + try { + URL endpoint = new URL(loginEndpoint + OAUTH2_TOKEN_ENDPOINT); + Request post = httpClient.POST(endpoint.toURI()); + + post.header("Content-Type", "application/x-www-form-urlencoded"); + Fields fields = new Fields(); + fields.add("grant_type", "client_credentials"); + fields.add("client_id", clientId); + fields.add("client_secret", clientSecret); + + post.content(new FormContentProvider(fields)); + ContentResponse response = post.send(); + + if (response.getStatus() != 200) { + String errorMessage = parseErrorResponse(response.getContentAsString()); + throw new RuntimeException( + String.format("OAuth2 client credentials error: %d - %s", response.getStatus(), errorMessage)); + } + return (JSONObject) new JSONParser().parse(response.getContentAsString()); + } catch (Exception e) { + throw new RuntimeException("Failed to obtain OAuth2 token", e); + } + } + + private static String parseErrorResponse(String responseBody) { + try { + JSONObject errorResponse = (JSONObject) new JSONParser().parse(responseBody); + + String error = errorResponse.get("error") == null ? null : errorResponse.get("error").toString(); + String errorDescription = errorResponse.get("error_description") == null ? null + : errorResponse.get("error_description").toString(); + if (error != null && errorDescription != null) { + return String.format("%s: %s", error, errorDescription); + } else if (error != null) { + return error; + } + } catch (Exception e) { + LOGGER.debug("Could not parse error response as JSON", e); + } + return responseBody; + } +} diff --git a/java/src/main/proto/pubsub_api.proto b/java/src/main/proto/pubsub_api.proto index 0152e77..f200016 100644 --- a/java/src/main/proto/pubsub_api.proto +++ b/java/src/main/proto/pubsub_api.proto @@ -2,412 +2,409 @@ * Salesforce Pub/Sub API Version 1. */ - syntax = "proto3"; - package eventbus.v1; - - option java_multiple_files = true; - option java_package = "com.salesforce.eventbus.protobuf"; - option java_outer_classname = "PubSubProto"; - - option go_package = "github.com/developerforce/pub-sub-api/go/proto"; - - /* - * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. - */ - message TopicInfo { - // Topic name - string topic_name = 1; - // Tenant/org GUID - string tenant_guid = 2; - // Is publishing allowed? - bool can_publish = 3; - // Is subscription allowed? - bool can_subscribe = 4; - /* ID of the current topic schema, which can be used for - * publishing of generically serialized events. - */ - string schema_id = 5; - // RPC ID used to trace errors. - string rpc_id = 6; - } - - /* - * A request message for GetTopic. Note that the tenant/org is not directly referenced - * in the request, but is implicitly identified by the authentication headers. - */ - message TopicRequest { - // The name of the topic to retrieve. - string topic_name = 1; - } - - /* - * Reserved for future use. - * Header that contains information for distributed tracing, filtering, routing, etc. - * For example, X-B3-* headers assigned by a publisher are stored with the event and - * can provide a full distributed trace of the event across its entire lifecycle. - */ - message EventHeader { - string key = 1; - bytes value = 2; - } - - /* - * Represents an event that an event publishing app creates. - */ - message ProducerEvent { - // Either a user-provided ID or a system generated guid - string id = 1; - // Schema fingerprint for this event which is hash of the schema - string schema_id = 2; - // The message data field - bytes payload = 3; - // Reserved for future use. Key-value pairs of headers. - repeated EventHeader headers = 4; - } - - /* - * Represents an event that is consumed in a subscriber client. - * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. - */ - message ConsumerEvent { - // The event with fields identical to ProducerEvent - ProducerEvent event = 1; - /* The replay ID of the event. - * A subscriber app can store the replay ID. When the app restarts, it can resume subscription - * starting from events in the event bus after the event with that replay ID. - */ - bytes replay_id = 2; - } - - /* - * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. - */ - message PublishResult { - // Replay ID of the event - bytes replay_id = 1; - // Publish error if any - Error error = 2; - // Correlation key of the ProducerEvent - string correlation_key = 3; - } - - // Contains error information for an error that an RPC method returns. - message Error { - // Error code - ErrorCode code = 1; - // Error message - string msg = 2; - } - - // Supported error codes - enum ErrorCode { - UNKNOWN = 0; - PUBLISH = 1; - // ErrorCode for unrecoverable commit errors. - COMMIT = 2; - } - - /* - * Supported subscription replay start values. - * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. - */ - enum ReplayPreset { - // Start the subscription at the tip of the stream. - LATEST = 0; - // Start the subscription at the earliest point in the stream. - EARLIEST = 1; - // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. - CUSTOM = 2; - } - - /* - * Request for the Subscribe streaming RPC method. This request is used to: - * 1. Establish the initial subscribe stream. - * 2. Request more events from the subscription stream. - * Flow Control is handled by the subscriber via num_requested. - * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. - * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). - * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If - * a client needs to start at another point in the stream, it must start a new subscription. - */ - message FetchRequest { - /* - * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change - * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. - */ - string topic_name = 1; - - /* - * Subscription starting point. This is consumed only as part of the first FetchRequest - * when the subscription is set up. - */ - ReplayPreset replay_preset = 2; - /* - * If replay_preset of CUSTOM is selected, specify the subscription point to start after. - * This is consumed only as part of the first FetchRequest when the subscription is set up. - */ - bytes replay_id = 3; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 4; - // For internal Salesforce use only. - string auth_refresh = 5; - } - - /* - * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). - * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The - * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the - * server and the latest replay ID. - */ - message FetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track - // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - } - - /* - * Request for the GetSchema RPC method. The schema request is based on the event schema ID. - */ - message SchemaRequest { - // Schema fingerprint for this event, which is a hash of the schema. - string schema_id = 1; - } - - /* - * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. - */ - message SchemaInfo { - // Avro schema in JSON format - string schema_json = 1; - // Schema fingerprint - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - } - - // Request for the Publish and PublishStream RPC method. - message PublishRequest { - // Topic to publish on - string topic_name = 1; - // Batch of ProducerEvent(s) to send - repeated ProducerEvent events = 2; - // For internal Salesforce use only. - string auth_refresh = 3; - } - - /* - * Response for the Publish and PublishStream RPC methods. This returns - * a list of PublishResults for each event that the client attempted to - * publish. PublishResult indicates if publish succeeded or not - * for each event. It also returns the schema ID that was used to create - * the ProducerEvents in the PublishRequest. - */ - message PublishResponse { - // Publish results - repeated PublishResult results = 1; - // Schema fingerprint for this event, which is a hash of the schema - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request for the ManagedSubscribe streaming RPC method. This request is used to: - * 1. Establish the initial managed subscribe stream. - * 2. Request more events from the subscription stream. - * 3. Commit a Replay ID using CommitReplayRequest. - */ - message ManagedFetchRequest { - /* - * Managed subscription ID or developer name. This value corresponds to the - * ID or developer name of the ManagedEventSubscription Tooling API record. - * This value is consumed as part of the first ManagedFetchRequest only. - * The subscription_id cannot change in subsequent ManagedFetchRequests - * within the same subscribe stream, but can be omitted for efficiency. - */ - string subscription_id = 1; - string developer_name = 2; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 3; - // For internal Salesforce use only. - string auth_refresh = 4; - CommitReplayRequest commit_replay_id_request = 5; - - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Response for the ManagedSubscribe streaming RPC method. This can return - * ConsumerEvent(s) or CommitReplayResponse along with other metadata. - */ - message ManagedFetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - // commit response - CommitReplayResponse commit_response = 5; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request to commit a Replay ID for the last processed event or for the latest - * replay ID received in an empty batch of events. - */ - message CommitReplayRequest { - // commit_request_id to identify commit responses - string commit_request_id = 1; - // replayId to commit - bytes replay_id = 2; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. - * N CommitReplayRequest(s) can get compressed in a batch resulting in a single - * CommitReplayResponse which reflects the latest values of last - * CommitReplayRequest in that batch. - */ - message CommitReplayResponse { - // commit_request_id to identify commit responses. - string commit_request_id = 1; - // replayId that may have been committed - bytes replay_id = 2; - // for failed commits - Error error = 3; - // time when server received request in epoch ms - int64 process_time = 4; - } - - /* - * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time - * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. - * - * A session token is needed to authenticate. Any of the Salesforce supported - * OAuth flows can be used to obtain a session token: - * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 - * - * For each RPC, a client needs to pass authentication information - * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. - * - * For Salesforce session token authentication, use: - * accesstoken : access token - * instanceurl : Salesforce instance URL - * tenantid : tenant/org id of the client - * - * StatusException is thrown in case of response failure for any request. - */ - service PubSub { - /* - * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request - * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. - * - * Typical flow: - * 1. Client requests for X number of events via FetchRequest. - * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. - * 3. Client consumes the FetchResponse messages as they come. - * 4. Client issues new FetchRequest for Y more number of events. This request can - * come before the server has delivered the earlier requested X number of events - * so the client gets a continuous stream of events if any. - * - * If a client requests more events before the server finishes the last - * requested amount, the server appends the new amount to the current amount of - * events it still needs to fetch and deliver. - * - * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. - * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a - * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription - * at a new point in the stream. - * - * The first FetchRequest of the stream identifies the topic to subscribe to. - * If any subsequent FetchRequest provides topic_name, it must match what - * was provided in the first FetchRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - */ - rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); - - // Get the event schema for a topic based on a schema ID. - rpc GetSchema (SchemaRequest) returns (SchemaInfo); - - /* - * Get the topic Information related to the specified topic. - */ - rpc GetTopic (TopicRequest) returns (TopicInfo); - - /* - * Send a publish request to synchronously publish events to a topic. - */ - rpc Publish (PublishRequest) returns (PublishResponse); - - /* - * Bidirectional Streaming RPC to publish events to the event bus. - * PublishRequest contains the batch of events to publish. - * - * The first PublishRequest of the stream identifies the topic to publish on. - * If any subsequent PublishRequest provides topic_name, it must match what - * was provided in the first PublishRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - * - * The server returns a PublishResponse for each PublishRequest when publish is - * complete for the batch. A client does not have to wait for a PublishResponse - * before sending a new PublishRequest, i.e. multiple publish batches can be queued - * up, which allows for higher publish rate as a client can asynchronously - * publish more events while publishes are still in flight on the server side. - * - * PublishResponse holds a PublishResult for each event published that indicates success - * or failure of the publish. A client can then retry the publish as needed before sending - * more PublishRequests for new events to publish. - * - * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. - * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, - * it must make a new PublishStream call to resume publishing. - */ - rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Same as Subscribe, but for Managed Subscription clients. - * This feature is part of an open beta release. +syntax = "proto3"; +package eventbus.v1; + +option java_multiple_files = true; +option java_package = "com.salesforce.eventbus.protobuf"; +option java_outer_classname = "PubSubProto"; + +option go_package = "github.com/developerforce/pub-sub-api/go/proto"; + +/* + * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. + */ +message TopicInfo { + // Topic name + string topic_name = 1; + // Tenant/org GUID + string tenant_guid = 2; + // Is publishing allowed? + bool can_publish = 3; + // Is subscription allowed? + bool can_subscribe = 4; + /* ID of the current topic schema, which can be used for + * publishing of generically serialized events. + */ + string schema_id = 5; + // RPC ID used to trace errors. + string rpc_id = 6; +} + +/* + * A request message for GetTopic. Note that the tenant/org is not directly referenced + * in the request, but is implicitly identified by the authentication headers. + */ +message TopicRequest { + // The name of the topic to retrieve. + string topic_name = 1; +} + +/* + * Reserved for future use. + * Header that contains information for distributed tracing, filtering, routing, etc. + * For example, X-B3-* headers assigned by a publisher are stored with the event and + * can provide a full distributed trace of the event across its entire lifecycle. + */ +message EventHeader { + string key = 1; + bytes value = 2; +} + +/* + * Represents an event that an event publishing app creates. + */ +message ProducerEvent { + // Either a user-provided ID or a system generated guid + string id = 1; + // Schema fingerprint for this event which is hash of the schema + string schema_id = 2; + // The message data field + bytes payload = 3; + // Reserved for future use. Key-value pairs of headers. + repeated EventHeader headers = 4; +} + +/* + * Represents an event that is consumed in a subscriber client. + * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. + */ +message ConsumerEvent { + // The event with fields identical to ProducerEvent + ProducerEvent event = 1; + /* The replay ID of the event. + * A subscriber app can store the replay ID. When the app restarts, it can resume subscription + * starting from events in the event bus after the event with that replay ID. + */ + bytes replay_id = 2; +} + +/* + * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. + */ +message PublishResult { + // Replay ID of the event + bytes replay_id = 1; + // Publish error if any + Error error = 2; + // Correlation key of the ProducerEvent + string correlation_key = 3; +} + +// Contains error information for an error that an RPC method returns. +message Error { + // Error code + ErrorCode code = 1; + // Error message + string msg = 2; +} + +// Supported error codes +enum ErrorCode { + UNKNOWN = 0; + PUBLISH = 1; + // ErrorCode for unrecoverable commit errors. + COMMIT = 2; +} + +/* + * Supported subscription replay start values. + * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. + */ +enum ReplayPreset { + // Start the subscription at the tip of the stream. + LATEST = 0; + // Start the subscription at the earliest point in the stream. + EARLIEST = 1; + // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. + CUSTOM = 2; +} + +/* + * Request for the Subscribe streaming RPC method. This request is used to: + * 1. Establish the initial subscribe stream. + * 2. Request more events from the subscription stream. + * Flow Control is handled by the subscriber via num_requested. + * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. + * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). + * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If + * a client needs to start at another point in the stream, it must start a new subscription. + */ +message FetchRequest { + /* + * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change + * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. + */ + string topic_name = 1; + + /* + * Subscription starting point. This is consumed only as part of the first FetchRequest + * when the subscription is set up. + */ + ReplayPreset replay_preset = 2; + /* + * If replay_preset of CUSTOM is selected, specify the subscription point to start after. + * This is consumed only as part of the first FetchRequest when the subscription is set up. + */ + bytes replay_id = 3; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 4; + string auth_refresh = 5; +} + +/* + * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). + * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The + * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the + * server and the latest replay ID. + */ +message FetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track + // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; +} + +/* + * Request for the GetSchema RPC method. The schema request is based on the event schema ID. + */ +message SchemaRequest { + // Schema fingerprint for this event, which is a hash of the schema. + string schema_id = 1; +} + +/* + * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. + */ +message SchemaInfo { + // Avro schema in JSON format + string schema_json = 1; + // Schema fingerprint + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; +} + +// Request for the Publish and PublishStream RPC method. +message PublishRequest { + // Topic to publish on + string topic_name = 1; + // Batch of ProducerEvent(s) to send + repeated ProducerEvent events = 2; + string auth_refresh = 3; +} + +/* + * Response for the Publish and PublishStream RPC methods. This returns + * a list of PublishResults for each event that the client attempted to + * publish. PublishResult indicates if publish succeeded or not + * for each event. It also returns the schema ID that was used to create + * the ProducerEvents in the PublishRequest. + */ +message PublishResponse { + // Publish results + repeated PublishResult results = 1; + // Schema fingerprint for this event, which is a hash of the schema + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request for the ManagedSubscribe streaming RPC method. This request is used to: + * 1. Establish the initial managed subscribe stream. + * 2. Request more events from the subscription stream. + * 3. Commit a Replay ID using CommitReplayRequest. + */ +message ManagedFetchRequest { + /* + * Managed subscription ID or developer name. This value corresponds to the + * ID or developer name of the ManagedEventSubscription Tooling API record. + * This value is consumed as part of the first ManagedFetchRequest only. + * The subscription_id cannot change in subsequent ManagedFetchRequests + * within the same subscribe stream, but can be omitted for efficiency. */ - rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); - } - - // Style guide: https://developers.google.com/protocol-buffers/docs/style \ No newline at end of file + string subscription_id = 1; + string developer_name = 2; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 3; + string auth_refresh = 4; + CommitReplayRequest commit_replay_id_request = 5; + +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Response for the ManagedSubscribe streaming RPC method. This can return + * ConsumerEvent(s) or CommitReplayResponse along with other metadata. + */ +message ManagedFetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + // commit response + CommitReplayResponse commit_response = 5; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request to commit a Replay ID for the last processed event or for the latest + * replay ID received in an empty batch of events. + */ +message CommitReplayRequest { + // commit_request_id to identify commit responses + string commit_request_id = 1; + // replayId to commit + bytes replay_id = 2; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. + * N CommitReplayRequest(s) can get compressed in a batch resulting in a single + * CommitReplayResponse which reflects the latest values of last + * CommitReplayRequest in that batch. + */ +message CommitReplayResponse { + // commit_request_id to identify commit responses. + string commit_request_id = 1; + // replayId that may have been committed + bytes replay_id = 2; + // for failed commits + Error error = 3; + // time when server received request in epoch ms + int64 process_time = 4; +} + +/* + * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time + * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. + * + * A session token is needed to authenticate. Any of the Salesforce supported + * OAuth flows can be used to obtain a session token: + * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 + * + * For each RPC, a client needs to pass authentication information + * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. + * + * For Salesforce session token authentication, use: + * accesstoken : access token + * instanceurl : Salesforce instance URL + * tenantid : tenant/org id of the client + * + * StatusException is thrown in case of response failure for any request. + */ +service PubSub { + /* + * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request + * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. + * + * Typical flow: + * 1. Client requests for X number of events via FetchRequest. + * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. + * 3. Client consumes the FetchResponse messages as they come. + * 4. Client issues new FetchRequest for Y more number of events. This request can + * come before the server has delivered the earlier requested X number of events + * so the client gets a continuous stream of events if any. + * + * If a client requests more events before the server finishes the last + * requested amount, the server appends the new amount to the current amount of + * events it still needs to fetch and deliver. + * + * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. + * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a + * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription + * at a new point in the stream. + * + * The first FetchRequest of the stream identifies the topic to subscribe to. + * If any subsequent FetchRequest provides topic_name, it must match what + * was provided in the first FetchRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + */ + rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); + + // Get the event schema for a topic based on a schema ID. + rpc GetSchema (SchemaRequest) returns (SchemaInfo); + + /* + * Get the topic Information related to the specified topic. + */ + rpc GetTopic (TopicRequest) returns (TopicInfo); + + /* + * Send a publish request to synchronously publish events to a topic. + */ + rpc Publish (PublishRequest) returns (PublishResponse); + + /* + * Bidirectional Streaming RPC to publish events to the event bus. + * PublishRequest contains the batch of events to publish. + * + * The first PublishRequest of the stream identifies the topic to publish on. + * If any subsequent PublishRequest provides topic_name, it must match what + * was provided in the first PublishRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + * + * The server returns a PublishResponse for each PublishRequest when publish is + * complete for the batch. A client does not have to wait for a PublishResponse + * before sending a new PublishRequest, i.e. multiple publish batches can be queued + * up, which allows for higher publish rate as a client can asynchronously + * publish more events while publishes are still in flight on the server side. + * + * PublishResponse holds a PublishResult for each event published that indicates success + * or failure of the publish. A client can then retry the publish as needed before sending + * more PublishRequests for new events to publish. + * + * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. + * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, + * it must make a new PublishStream call to resume publishing. + */ + rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Same as Subscribe, but for Managed Subscription clients. + * This feature is part of an open beta release. + */ + rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); +} + +// Style guide: https://developers.google.com/protocol-buffers/docs/style diff --git a/java/src/main/resources/arguments.yaml b/java/src/main/resources/arguments.yaml index 569ce86..1fb38f5 100644 --- a/java/src/main/resources/arguments.yaml +++ b/java/src/main/resources/arguments.yaml @@ -14,16 +14,22 @@ PUBSUB_PORT: 7443 # Your Salesforce Login URL LOGIN_URL: null -# For authentication, you can use either username/password or accessToken/tenantId types. -# Either one of the combinations is required. Please specify `null` values to the unused type. +# For authentication, you can use username/password, accessToken/tenantId, or the OAuth2 client +# credentials flow (consumerKey/consumerSecret/tenantId). Exactly one of the combinations is +# required. Please specify `null` values to the unused types. # Your Salesforce Username USERNAME: null # Your Salesforce Password PASSWORD: null -# Your Salesforce org Tenant ID +# Your Salesforce org Tenant ID. Required for both the session token and OAuth2 client credentials flows. TENANT_ID: null # Your Salesforce Session Token ACCESS_TOKEN: null +# OAuth2 client credentials flow (used together with TENANT_ID). The consumer key and secret of the +# org's External Client App / Connected App. When the app is configured to issue JWT-based (orgJWT) +# access tokens, the Subscribe example refreshes the token periodically to keep a long-lived stream alive. +CONSUMER_KEY: null +CONSUMER_SECRET: null # ========================= # Optional Configurations: From 10e215f2ebf3ba922f8cf408a07a89e98186f971 Mon Sep 17 00:00:00 2001 From: Jason Alaya Date: Fri, 19 Jun 2026 10:47:41 -0700 Subject: [PATCH 2/5] Add OAuth2 client-credentials auth and stream auth-refresh (orgJWT) for Subscribe --- pubsub_api.proto | 813 +++++++++++++++++++++++------------------------ 1 file changed, 405 insertions(+), 408 deletions(-) diff --git a/pubsub_api.proto b/pubsub_api.proto index 0152e77..f200016 100644 --- a/pubsub_api.proto +++ b/pubsub_api.proto @@ -2,412 +2,409 @@ * Salesforce Pub/Sub API Version 1. */ - syntax = "proto3"; - package eventbus.v1; - - option java_multiple_files = true; - option java_package = "com.salesforce.eventbus.protobuf"; - option java_outer_classname = "PubSubProto"; - - option go_package = "github.com/developerforce/pub-sub-api/go/proto"; - - /* - * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. - */ - message TopicInfo { - // Topic name - string topic_name = 1; - // Tenant/org GUID - string tenant_guid = 2; - // Is publishing allowed? - bool can_publish = 3; - // Is subscription allowed? - bool can_subscribe = 4; - /* ID of the current topic schema, which can be used for - * publishing of generically serialized events. - */ - string schema_id = 5; - // RPC ID used to trace errors. - string rpc_id = 6; - } - - /* - * A request message for GetTopic. Note that the tenant/org is not directly referenced - * in the request, but is implicitly identified by the authentication headers. - */ - message TopicRequest { - // The name of the topic to retrieve. - string topic_name = 1; - } - - /* - * Reserved for future use. - * Header that contains information for distributed tracing, filtering, routing, etc. - * For example, X-B3-* headers assigned by a publisher are stored with the event and - * can provide a full distributed trace of the event across its entire lifecycle. - */ - message EventHeader { - string key = 1; - bytes value = 2; - } - - /* - * Represents an event that an event publishing app creates. - */ - message ProducerEvent { - // Either a user-provided ID or a system generated guid - string id = 1; - // Schema fingerprint for this event which is hash of the schema - string schema_id = 2; - // The message data field - bytes payload = 3; - // Reserved for future use. Key-value pairs of headers. - repeated EventHeader headers = 4; - } - - /* - * Represents an event that is consumed in a subscriber client. - * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. - */ - message ConsumerEvent { - // The event with fields identical to ProducerEvent - ProducerEvent event = 1; - /* The replay ID of the event. - * A subscriber app can store the replay ID. When the app restarts, it can resume subscription - * starting from events in the event bus after the event with that replay ID. - */ - bytes replay_id = 2; - } - - /* - * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. - */ - message PublishResult { - // Replay ID of the event - bytes replay_id = 1; - // Publish error if any - Error error = 2; - // Correlation key of the ProducerEvent - string correlation_key = 3; - } - - // Contains error information for an error that an RPC method returns. - message Error { - // Error code - ErrorCode code = 1; - // Error message - string msg = 2; - } - - // Supported error codes - enum ErrorCode { - UNKNOWN = 0; - PUBLISH = 1; - // ErrorCode for unrecoverable commit errors. - COMMIT = 2; - } - - /* - * Supported subscription replay start values. - * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. - */ - enum ReplayPreset { - // Start the subscription at the tip of the stream. - LATEST = 0; - // Start the subscription at the earliest point in the stream. - EARLIEST = 1; - // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. - CUSTOM = 2; - } - - /* - * Request for the Subscribe streaming RPC method. This request is used to: - * 1. Establish the initial subscribe stream. - * 2. Request more events from the subscription stream. - * Flow Control is handled by the subscriber via num_requested. - * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. - * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). - * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If - * a client needs to start at another point in the stream, it must start a new subscription. - */ - message FetchRequest { - /* - * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change - * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. - */ - string topic_name = 1; - - /* - * Subscription starting point. This is consumed only as part of the first FetchRequest - * when the subscription is set up. - */ - ReplayPreset replay_preset = 2; - /* - * If replay_preset of CUSTOM is selected, specify the subscription point to start after. - * This is consumed only as part of the first FetchRequest when the subscription is set up. - */ - bytes replay_id = 3; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 4; - // For internal Salesforce use only. - string auth_refresh = 5; - } - - /* - * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). - * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The - * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the - * server and the latest replay ID. - */ - message FetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track - // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - } - - /* - * Request for the GetSchema RPC method. The schema request is based on the event schema ID. - */ - message SchemaRequest { - // Schema fingerprint for this event, which is a hash of the schema. - string schema_id = 1; - } - - /* - * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. - */ - message SchemaInfo { - // Avro schema in JSON format - string schema_json = 1; - // Schema fingerprint - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - } - - // Request for the Publish and PublishStream RPC method. - message PublishRequest { - // Topic to publish on - string topic_name = 1; - // Batch of ProducerEvent(s) to send - repeated ProducerEvent events = 2; - // For internal Salesforce use only. - string auth_refresh = 3; - } - - /* - * Response for the Publish and PublishStream RPC methods. This returns - * a list of PublishResults for each event that the client attempted to - * publish. PublishResult indicates if publish succeeded or not - * for each event. It also returns the schema ID that was used to create - * the ProducerEvents in the PublishRequest. - */ - message PublishResponse { - // Publish results - repeated PublishResult results = 1; - // Schema fingerprint for this event, which is a hash of the schema - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request for the ManagedSubscribe streaming RPC method. This request is used to: - * 1. Establish the initial managed subscribe stream. - * 2. Request more events from the subscription stream. - * 3. Commit a Replay ID using CommitReplayRequest. - */ - message ManagedFetchRequest { - /* - * Managed subscription ID or developer name. This value corresponds to the - * ID or developer name of the ManagedEventSubscription Tooling API record. - * This value is consumed as part of the first ManagedFetchRequest only. - * The subscription_id cannot change in subsequent ManagedFetchRequests - * within the same subscribe stream, but can be omitted for efficiency. - */ - string subscription_id = 1; - string developer_name = 2; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 3; - // For internal Salesforce use only. - string auth_refresh = 4; - CommitReplayRequest commit_replay_id_request = 5; - - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Response for the ManagedSubscribe streaming RPC method. This can return - * ConsumerEvent(s) or CommitReplayResponse along with other metadata. - */ - message ManagedFetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - // commit response - CommitReplayResponse commit_response = 5; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request to commit a Replay ID for the last processed event or for the latest - * replay ID received in an empty batch of events. - */ - message CommitReplayRequest { - // commit_request_id to identify commit responses - string commit_request_id = 1; - // replayId to commit - bytes replay_id = 2; - } - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. - * N CommitReplayRequest(s) can get compressed in a batch resulting in a single - * CommitReplayResponse which reflects the latest values of last - * CommitReplayRequest in that batch. - */ - message CommitReplayResponse { - // commit_request_id to identify commit responses. - string commit_request_id = 1; - // replayId that may have been committed - bytes replay_id = 2; - // for failed commits - Error error = 3; - // time when server received request in epoch ms - int64 process_time = 4; - } - - /* - * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time - * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. - * - * A session token is needed to authenticate. Any of the Salesforce supported - * OAuth flows can be used to obtain a session token: - * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 - * - * For each RPC, a client needs to pass authentication information - * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. - * - * For Salesforce session token authentication, use: - * accesstoken : access token - * instanceurl : Salesforce instance URL - * tenantid : tenant/org id of the client - * - * StatusException is thrown in case of response failure for any request. - */ - service PubSub { - /* - * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request - * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. - * - * Typical flow: - * 1. Client requests for X number of events via FetchRequest. - * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. - * 3. Client consumes the FetchResponse messages as they come. - * 4. Client issues new FetchRequest for Y more number of events. This request can - * come before the server has delivered the earlier requested X number of events - * so the client gets a continuous stream of events if any. - * - * If a client requests more events before the server finishes the last - * requested amount, the server appends the new amount to the current amount of - * events it still needs to fetch and deliver. - * - * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. - * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a - * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription - * at a new point in the stream. - * - * The first FetchRequest of the stream identifies the topic to subscribe to. - * If any subsequent FetchRequest provides topic_name, it must match what - * was provided in the first FetchRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - */ - rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); - - // Get the event schema for a topic based on a schema ID. - rpc GetSchema (SchemaRequest) returns (SchemaInfo); - - /* - * Get the topic Information related to the specified topic. - */ - rpc GetTopic (TopicRequest) returns (TopicInfo); - - /* - * Send a publish request to synchronously publish events to a topic. - */ - rpc Publish (PublishRequest) returns (PublishResponse); - - /* - * Bidirectional Streaming RPC to publish events to the event bus. - * PublishRequest contains the batch of events to publish. - * - * The first PublishRequest of the stream identifies the topic to publish on. - * If any subsequent PublishRequest provides topic_name, it must match what - * was provided in the first PublishRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - * - * The server returns a PublishResponse for each PublishRequest when publish is - * complete for the batch. A client does not have to wait for a PublishResponse - * before sending a new PublishRequest, i.e. multiple publish batches can be queued - * up, which allows for higher publish rate as a client can asynchronously - * publish more events while publishes are still in flight on the server side. - * - * PublishResponse holds a PublishResult for each event published that indicates success - * or failure of the publish. A client can then retry the publish as needed before sending - * more PublishRequests for new events to publish. - * - * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. - * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, - * it must make a new PublishStream call to resume publishing. - */ - rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Same as Subscribe, but for Managed Subscription clients. - * This feature is part of an open beta release. +syntax = "proto3"; +package eventbus.v1; + +option java_multiple_files = true; +option java_package = "com.salesforce.eventbus.protobuf"; +option java_outer_classname = "PubSubProto"; + +option go_package = "github.com/developerforce/pub-sub-api/go/proto"; + +/* + * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. + */ +message TopicInfo { + // Topic name + string topic_name = 1; + // Tenant/org GUID + string tenant_guid = 2; + // Is publishing allowed? + bool can_publish = 3; + // Is subscription allowed? + bool can_subscribe = 4; + /* ID of the current topic schema, which can be used for + * publishing of generically serialized events. + */ + string schema_id = 5; + // RPC ID used to trace errors. + string rpc_id = 6; +} + +/* + * A request message for GetTopic. Note that the tenant/org is not directly referenced + * in the request, but is implicitly identified by the authentication headers. + */ +message TopicRequest { + // The name of the topic to retrieve. + string topic_name = 1; +} + +/* + * Reserved for future use. + * Header that contains information for distributed tracing, filtering, routing, etc. + * For example, X-B3-* headers assigned by a publisher are stored with the event and + * can provide a full distributed trace of the event across its entire lifecycle. + */ +message EventHeader { + string key = 1; + bytes value = 2; +} + +/* + * Represents an event that an event publishing app creates. + */ +message ProducerEvent { + // Either a user-provided ID or a system generated guid + string id = 1; + // Schema fingerprint for this event which is hash of the schema + string schema_id = 2; + // The message data field + bytes payload = 3; + // Reserved for future use. Key-value pairs of headers. + repeated EventHeader headers = 4; +} + +/* + * Represents an event that is consumed in a subscriber client. + * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. + */ +message ConsumerEvent { + // The event with fields identical to ProducerEvent + ProducerEvent event = 1; + /* The replay ID of the event. + * A subscriber app can store the replay ID. When the app restarts, it can resume subscription + * starting from events in the event bus after the event with that replay ID. + */ + bytes replay_id = 2; +} + +/* + * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. + */ +message PublishResult { + // Replay ID of the event + bytes replay_id = 1; + // Publish error if any + Error error = 2; + // Correlation key of the ProducerEvent + string correlation_key = 3; +} + +// Contains error information for an error that an RPC method returns. +message Error { + // Error code + ErrorCode code = 1; + // Error message + string msg = 2; +} + +// Supported error codes +enum ErrorCode { + UNKNOWN = 0; + PUBLISH = 1; + // ErrorCode for unrecoverable commit errors. + COMMIT = 2; +} + +/* + * Supported subscription replay start values. + * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. + */ +enum ReplayPreset { + // Start the subscription at the tip of the stream. + LATEST = 0; + // Start the subscription at the earliest point in the stream. + EARLIEST = 1; + // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. + CUSTOM = 2; +} + +/* + * Request for the Subscribe streaming RPC method. This request is used to: + * 1. Establish the initial subscribe stream. + * 2. Request more events from the subscription stream. + * Flow Control is handled by the subscriber via num_requested. + * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. + * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). + * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If + * a client needs to start at another point in the stream, it must start a new subscription. + */ +message FetchRequest { + /* + * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change + * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. + */ + string topic_name = 1; + + /* + * Subscription starting point. This is consumed only as part of the first FetchRequest + * when the subscription is set up. + */ + ReplayPreset replay_preset = 2; + /* + * If replay_preset of CUSTOM is selected, specify the subscription point to start after. + * This is consumed only as part of the first FetchRequest when the subscription is set up. + */ + bytes replay_id = 3; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 4; + string auth_refresh = 5; +} + +/* + * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). + * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The + * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the + * server and the latest replay ID. + */ +message FetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track + // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; +} + +/* + * Request for the GetSchema RPC method. The schema request is based on the event schema ID. + */ +message SchemaRequest { + // Schema fingerprint for this event, which is a hash of the schema. + string schema_id = 1; +} + +/* + * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. + */ +message SchemaInfo { + // Avro schema in JSON format + string schema_json = 1; + // Schema fingerprint + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; +} + +// Request for the Publish and PublishStream RPC method. +message PublishRequest { + // Topic to publish on + string topic_name = 1; + // Batch of ProducerEvent(s) to send + repeated ProducerEvent events = 2; + string auth_refresh = 3; +} + +/* + * Response for the Publish and PublishStream RPC methods. This returns + * a list of PublishResults for each event that the client attempted to + * publish. PublishResult indicates if publish succeeded or not + * for each event. It also returns the schema ID that was used to create + * the ProducerEvents in the PublishRequest. + */ +message PublishResponse { + // Publish results + repeated PublishResult results = 1; + // Schema fingerprint for this event, which is a hash of the schema + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request for the ManagedSubscribe streaming RPC method. This request is used to: + * 1. Establish the initial managed subscribe stream. + * 2. Request more events from the subscription stream. + * 3. Commit a Replay ID using CommitReplayRequest. + */ +message ManagedFetchRequest { + /* + * Managed subscription ID or developer name. This value corresponds to the + * ID or developer name of the ManagedEventSubscription Tooling API record. + * This value is consumed as part of the first ManagedFetchRequest only. + * The subscription_id cannot change in subsequent ManagedFetchRequests + * within the same subscribe stream, but can be omitted for efficiency. */ - rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); - } - - // Style guide: https://developers.google.com/protocol-buffers/docs/style \ No newline at end of file + string subscription_id = 1; + string developer_name = 2; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 3; + string auth_refresh = 4; + CommitReplayRequest commit_replay_id_request = 5; + +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Response for the ManagedSubscribe streaming RPC method. This can return + * ConsumerEvent(s) or CommitReplayResponse along with other metadata. + */ +message ManagedFetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + // commit response + CommitReplayResponse commit_response = 5; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request to commit a Replay ID for the last processed event or for the latest + * replay ID received in an empty batch of events. + */ +message CommitReplayRequest { + // commit_request_id to identify commit responses + string commit_request_id = 1; + // replayId to commit + bytes replay_id = 2; +} + +/* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. + * N CommitReplayRequest(s) can get compressed in a batch resulting in a single + * CommitReplayResponse which reflects the latest values of last + * CommitReplayRequest in that batch. + */ +message CommitReplayResponse { + // commit_request_id to identify commit responses. + string commit_request_id = 1; + // replayId that may have been committed + bytes replay_id = 2; + // for failed commits + Error error = 3; + // time when server received request in epoch ms + int64 process_time = 4; +} + +/* + * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time + * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. + * + * A session token is needed to authenticate. Any of the Salesforce supported + * OAuth flows can be used to obtain a session token: + * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 + * + * For each RPC, a client needs to pass authentication information + * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. + * + * For Salesforce session token authentication, use: + * accesstoken : access token + * instanceurl : Salesforce instance URL + * tenantid : tenant/org id of the client + * + * StatusException is thrown in case of response failure for any request. + */ +service PubSub { + /* + * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request + * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. + * + * Typical flow: + * 1. Client requests for X number of events via FetchRequest. + * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. + * 3. Client consumes the FetchResponse messages as they come. + * 4. Client issues new FetchRequest for Y more number of events. This request can + * come before the server has delivered the earlier requested X number of events + * so the client gets a continuous stream of events if any. + * + * If a client requests more events before the server finishes the last + * requested amount, the server appends the new amount to the current amount of + * events it still needs to fetch and deliver. + * + * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. + * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a + * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription + * at a new point in the stream. + * + * The first FetchRequest of the stream identifies the topic to subscribe to. + * If any subsequent FetchRequest provides topic_name, it must match what + * was provided in the first FetchRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + */ + rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); + + // Get the event schema for a topic based on a schema ID. + rpc GetSchema (SchemaRequest) returns (SchemaInfo); + + /* + * Get the topic Information related to the specified topic. + */ + rpc GetTopic (TopicRequest) returns (TopicInfo); + + /* + * Send a publish request to synchronously publish events to a topic. + */ + rpc Publish (PublishRequest) returns (PublishResponse); + + /* + * Bidirectional Streaming RPC to publish events to the event bus. + * PublishRequest contains the batch of events to publish. + * + * The first PublishRequest of the stream identifies the topic to publish on. + * If any subsequent PublishRequest provides topic_name, it must match what + * was provided in the first PublishRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + * + * The server returns a PublishResponse for each PublishRequest when publish is + * complete for the batch. A client does not have to wait for a PublishResponse + * before sending a new PublishRequest, i.e. multiple publish batches can be queued + * up, which allows for higher publish rate as a client can asynchronously + * publish more events while publishes are still in flight on the server side. + * + * PublishResponse holds a PublishResult for each event published that indicates success + * or failure of the publish. A client can then retry the publish as needed before sending + * more PublishRequests for new events to publish. + * + * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. + * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, + * it must make a new PublishStream call to resume publishing. + */ + rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Same as Subscribe, but for Managed Subscription clients. + * This feature is part of an open beta release. + */ + rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); +} + +// Style guide: https://developers.google.com/protocol-buffers/docs/style From 59457823c1d26b9537e45c3895b43c68ec31aaa7 Mon Sep 17 00:00:00 2001 From: Jason Alaya Date: Fri, 19 Jun 2026 15:35:22 -0700 Subject: [PATCH 3/5] Add OAuth2 client-credentials for Subscribe --- java/README.md | 2 +- .../main/java/genericpubsub/Subscribe.java | 44 +++---------------- java/src/main/resources/arguments.yaml | 2 +- 3 files changed, 9 insertions(+), 39 deletions(-) diff --git a/java/README.md b/java/README.md index 0ad371d..7b23554 100644 --- a/java/README.md +++ b/java/README.md @@ -38,7 +38,7 @@ In the `src/main` directory of the project, you will find several sub-directorie * `LOGIN_URL`: Specify the login url of the Salesforce org being used to run the examples. * `USERNAME` & `PASSWORD`: For authentication via username and password, you will need to specify the username and password of the Salesforce org. * `ACCESS_TOKEN` & `TENANT_ID`: For authentication via session token and tenant ID, you will need to specify the sessionToken and tenant ID of the Salesforce org. - * `CONSUMER_KEY` & `CONSUMER_SECRET` & `TENANT_ID`: For authentication via the OAuth2 client credentials flow, specify the consumer key and consumer secret of the org's External Client App (or Connected App) along with the tenant ID. When the app is configured to issue JWT-based (orgJWT) access tokens, the `Subscribe` example periodically mints a fresh token and sends it to the server via the `auth_refresh` field of a `FetchRequest` to keep a long-lived subscription stream alive. See the `refreshAuth()` method in `Subscribe.java` and the `scheduleAuthRefresh()`/`getFreshAuthToken()` helpers in `CommonContext.java`. + * `CONSUMER_KEY` & `CONSUMER_SECRET` & `TENANT_ID`: For authentication via the OAuth2 client credentials flow, specify the consumer key and consumer secret of the org's External Client App along with the tenant ID. When the app is configured to issue JWT-based (orgJWT) access tokens, the `Subscribe` example periodically mints a fresh token and sends it to the server via the `auth_refresh` field of a `FetchRequest` to keep a long-lived subscription stream alive. See the `refreshAuth()` method in `Subscribe.java` and the `scheduleAuthRefresh()`/`getFreshAuthToken()` helpers in `CommonContext.java`. * When using managed event subscriptions (beta), one of these configurations is required. * `MANAGED_SUB_DEVELOPER_NAME`: Specify the developer name of ManagedEventSubscription. This parameter is used in ManagedSubscribe.java. * `MANAGED_SUB_ID`: Specify the ID of the ManagedEventSubscription Tooling API record. This parameter is used in ManagedSubscribe.java. diff --git a/java/src/main/java/genericpubsub/Subscribe.java b/java/src/main/java/genericpubsub/Subscribe.java index 08e2282..bd44ad1 100644 --- a/java/src/main/java/genericpubsub/Subscribe.java +++ b/java/src/main/java/genericpubsub/Subscribe.java @@ -42,13 +42,6 @@ public class Subscribe extends CommonContext { public static ExampleConfigurations exampleConfigurations; public static AtomicBoolean isActive = new AtomicBoolean(false); public static AtomicInteger retriesLeft = new AtomicInteger(MAX_RETRIES); - // serverStream is written/read from multiple threads: the main thread and retry scheduler thread - // (fetch()), the gRPC response-callback thread (fetchMore()), and the auth-refresh scheduler thread - // (refreshAuth()). gRPC forbids concurrent onNext()/onCompleted() on the same call, so ALL access is - // serialized on streamLock. streamActive (guarded by streamLock) tracks whether the current stream is - // open, so we never call onNext() after onCompleted(). - private final Object streamLock = new Object(); - private boolean streamActive = false; private StreamObserver serverStream; private Map schemaCache = new ConcurrentHashMap<>(); private AtomicInteger receivedEvents = new AtomicInteger(0); @@ -110,6 +103,7 @@ public void startSubscription() { * @param providedReplayId */ public void fetch(int providedBatchSize, String providedTopicName, ReplayPreset providedReplayPreset, ByteString providedReplayId) { + serverStream = asyncStub.subscribe(this.responseStreamObserver); FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder() .setNumRequested(providedBatchSize) .setTopicName(providedTopicName) @@ -118,13 +112,7 @@ public void fetch(int providedBatchSize, String providedTopicName, ReplayPreset logger.info("Subscription has Replay Preset set to CUSTOM. In this case, the events will be delivered from provided ReplayId."); fetchRequestBuilder.setReplayId(providedReplayId); } - // Establish the new stream and send the first request under streamLock so concurrent senders - // (fetchMore on the gRPC thread, refreshAuth on the auth-refresh thread) never race the swap. - synchronized (streamLock) { - serverStream = asyncStub.subscribe(this.responseStreamObserver); - streamActive = true; - serverStream.onNext(fetchRequestBuilder.build()); - } + serverStream.onNext(fetchRequestBuilder.build()); } /** @@ -200,12 +188,7 @@ public void onError(Throwable t) { String errorCode = (trailers != null && trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER)) != null) ? trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER)) : null; - // Closing the old stream for sanity. Mark inactive under streamLock so a concurrent - // refreshAuth()/fetchMore() does not call onNext() on the completed stream. - synchronized (streamLock) { - streamActive = false; - serverStream.onCompleted(); - } + serverStream.onCompleted(); ReplayPreset retryReplayPreset = ReplayPreset.LATEST; ByteString retryReplayId = null; @@ -299,11 +282,7 @@ public Schema getSchema(String schemaId) { public void fetchMore(int numEvents) { FetchRequest fetchRequest = FetchRequest.newBuilder().setTopicName(this.busTopicName) .setNumRequested(numEvents).build(); - synchronized (streamLock) { - if (streamActive && serverStream != null) { - serverStream.onNext(fetchRequest); - } - } + serverStream.onNext(fetchRequest); } /** @@ -324,13 +303,7 @@ protected void refreshAuth() { return; } FetchRequest fetchRequest = FetchRequest.newBuilder().setAuthRefresh(freshToken).build(); - // Send under streamLock and only while the stream is open, so we never call onNext() - // concurrently with fetchMore()/fetch() or after onCompleted(). - synchronized (streamLock) { - if (streamActive && serverStream != null) { - serverStream.onNext(fetchRequest); - } - } + serverStream.onNext(fetchRequest); } catch (Exception e) { logger.warn("Failed to refresh auth; will retry on next scheduled interval.", e); } @@ -367,11 +340,8 @@ public synchronized void close() { // onNext() on the stream we are about to (or have just) completed. stopAuthRefresh(); try { - synchronized (streamLock) { - if (serverStream != null && streamActive) { - streamActive = false; - serverStream.onCompleted(); - } + if (serverStream != null) { + serverStream.onCompleted(); } if (retryScheduler != null) { retryScheduler.shutdown(); diff --git a/java/src/main/resources/arguments.yaml b/java/src/main/resources/arguments.yaml index 1fb38f5..2f9fbff 100644 --- a/java/src/main/resources/arguments.yaml +++ b/java/src/main/resources/arguments.yaml @@ -26,7 +26,7 @@ TENANT_ID: null # Your Salesforce Session Token ACCESS_TOKEN: null # OAuth2 client credentials flow (used together with TENANT_ID). The consumer key and secret of the -# org's External Client App / Connected App. When the app is configured to issue JWT-based (orgJWT) +# org's External Client App. When the app is configured to issue JWT-based (orgJWT) # access tokens, the Subscribe example refreshes the token periodically to keep a long-lived stream alive. CONSUMER_KEY: null CONSUMER_SECRET: null From 5257ffacd7009212aa300f1ba68412ee9c9770b8 Mon Sep 17 00:00:00 2001 From: Jason Alaya Date: Mon, 22 Jun 2026 17:35:33 -0700 Subject: [PATCH 4/5] update proto file --- java/src/main/proto/pubsub_api.proto | 810 +++++++++++++-------------- pubsub_api.proto | 810 +++++++++++++-------------- 2 files changed, 810 insertions(+), 810 deletions(-) diff --git a/java/src/main/proto/pubsub_api.proto b/java/src/main/proto/pubsub_api.proto index f200016..337eb39 100644 --- a/java/src/main/proto/pubsub_api.proto +++ b/java/src/main/proto/pubsub_api.proto @@ -2,409 +2,409 @@ * Salesforce Pub/Sub API Version 1. */ -syntax = "proto3"; -package eventbus.v1; - -option java_multiple_files = true; -option java_package = "com.salesforce.eventbus.protobuf"; -option java_outer_classname = "PubSubProto"; - -option go_package = "github.com/developerforce/pub-sub-api/go/proto"; - -/* - * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. - */ -message TopicInfo { - // Topic name - string topic_name = 1; - // Tenant/org GUID - string tenant_guid = 2; - // Is publishing allowed? - bool can_publish = 3; - // Is subscription allowed? - bool can_subscribe = 4; - /* ID of the current topic schema, which can be used for - * publishing of generically serialized events. - */ - string schema_id = 5; - // RPC ID used to trace errors. - string rpc_id = 6; -} - -/* - * A request message for GetTopic. Note that the tenant/org is not directly referenced - * in the request, but is implicitly identified by the authentication headers. - */ -message TopicRequest { - // The name of the topic to retrieve. - string topic_name = 1; -} - -/* - * Reserved for future use. - * Header that contains information for distributed tracing, filtering, routing, etc. - * For example, X-B3-* headers assigned by a publisher are stored with the event and - * can provide a full distributed trace of the event across its entire lifecycle. - */ -message EventHeader { - string key = 1; - bytes value = 2; -} - -/* - * Represents an event that an event publishing app creates. - */ -message ProducerEvent { - // Either a user-provided ID or a system generated guid - string id = 1; - // Schema fingerprint for this event which is hash of the schema - string schema_id = 2; - // The message data field - bytes payload = 3; - // Reserved for future use. Key-value pairs of headers. - repeated EventHeader headers = 4; -} - -/* - * Represents an event that is consumed in a subscriber client. - * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. - */ -message ConsumerEvent { - // The event with fields identical to ProducerEvent - ProducerEvent event = 1; - /* The replay ID of the event. - * A subscriber app can store the replay ID. When the app restarts, it can resume subscription - * starting from events in the event bus after the event with that replay ID. - */ - bytes replay_id = 2; -} - -/* - * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. - */ -message PublishResult { - // Replay ID of the event - bytes replay_id = 1; - // Publish error if any - Error error = 2; - // Correlation key of the ProducerEvent - string correlation_key = 3; -} - -// Contains error information for an error that an RPC method returns. -message Error { - // Error code - ErrorCode code = 1; - // Error message - string msg = 2; -} - -// Supported error codes -enum ErrorCode { - UNKNOWN = 0; - PUBLISH = 1; - // ErrorCode for unrecoverable commit errors. - COMMIT = 2; -} - -/* - * Supported subscription replay start values. - * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. - */ -enum ReplayPreset { - // Start the subscription at the tip of the stream. - LATEST = 0; - // Start the subscription at the earliest point in the stream. - EARLIEST = 1; - // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. - CUSTOM = 2; -} - -/* - * Request for the Subscribe streaming RPC method. This request is used to: - * 1. Establish the initial subscribe stream. - * 2. Request more events from the subscription stream. - * Flow Control is handled by the subscriber via num_requested. - * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. - * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). - * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If - * a client needs to start at another point in the stream, it must start a new subscription. - */ -message FetchRequest { - /* - * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change - * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. - */ - string topic_name = 1; - - /* - * Subscription starting point. This is consumed only as part of the first FetchRequest - * when the subscription is set up. - */ - ReplayPreset replay_preset = 2; - /* - * If replay_preset of CUSTOM is selected, specify the subscription point to start after. - * This is consumed only as part of the first FetchRequest when the subscription is set up. - */ - bytes replay_id = 3; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 4; - string auth_refresh = 5; -} - -/* - * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). - * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The - * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the - * server and the latest replay ID. - */ -message FetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track - // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; -} - -/* - * Request for the GetSchema RPC method. The schema request is based on the event schema ID. - */ -message SchemaRequest { - // Schema fingerprint for this event, which is a hash of the schema. - string schema_id = 1; -} - -/* - * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. - */ -message SchemaInfo { - // Avro schema in JSON format - string schema_json = 1; - // Schema fingerprint - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; -} - -// Request for the Publish and PublishStream RPC method. -message PublishRequest { - // Topic to publish on - string topic_name = 1; - // Batch of ProducerEvent(s) to send - repeated ProducerEvent events = 2; - string auth_refresh = 3; -} - -/* - * Response for the Publish and PublishStream RPC methods. This returns - * a list of PublishResults for each event that the client attempted to - * publish. PublishResult indicates if publish succeeded or not - * for each event. It also returns the schema ID that was used to create - * the ProducerEvents in the PublishRequest. - */ -message PublishResponse { - // Publish results - repeated PublishResult results = 1; - // Schema fingerprint for this event, which is a hash of the schema - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request for the ManagedSubscribe streaming RPC method. This request is used to: - * 1. Establish the initial managed subscribe stream. - * 2. Request more events from the subscription stream. - * 3. Commit a Replay ID using CommitReplayRequest. - */ -message ManagedFetchRequest { - /* - * Managed subscription ID or developer name. This value corresponds to the - * ID or developer name of the ManagedEventSubscription Tooling API record. - * This value is consumed as part of the first ManagedFetchRequest only. - * The subscription_id cannot change in subsequent ManagedFetchRequests - * within the same subscribe stream, but can be omitted for efficiency. + syntax = "proto3"; + package eventbus.v1; + + option java_multiple_files = true; + option java_package = "com.salesforce.eventbus.protobuf"; + option java_outer_classname = "PubSubProto"; + + option go_package = "github.com/developerforce/pub-sub-api/go/proto"; + + /* + * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. + */ + message TopicInfo { + // Topic name + string topic_name = 1; + // Tenant/org GUID + string tenant_guid = 2; + // Is publishing allowed? + bool can_publish = 3; + // Is subscription allowed? + bool can_subscribe = 4; + /* ID of the current topic schema, which can be used for + * publishing of generically serialized events. */ - string subscription_id = 1; - string developer_name = 2; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 3; - string auth_refresh = 4; - CommitReplayRequest commit_replay_id_request = 5; - -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Response for the ManagedSubscribe streaming RPC method. This can return - * ConsumerEvent(s) or CommitReplayResponse along with other metadata. - */ -message ManagedFetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - // commit response - CommitReplayResponse commit_response = 5; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request to commit a Replay ID for the last processed event or for the latest - * replay ID received in an empty batch of events. - */ -message CommitReplayRequest { - // commit_request_id to identify commit responses - string commit_request_id = 1; - // replayId to commit - bytes replay_id = 2; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. - * N CommitReplayRequest(s) can get compressed in a batch resulting in a single - * CommitReplayResponse which reflects the latest values of last - * CommitReplayRequest in that batch. - */ -message CommitReplayResponse { - // commit_request_id to identify commit responses. - string commit_request_id = 1; - // replayId that may have been committed - bytes replay_id = 2; - // for failed commits - Error error = 3; - // time when server received request in epoch ms - int64 process_time = 4; -} - -/* - * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time - * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. - * - * A session token is needed to authenticate. Any of the Salesforce supported - * OAuth flows can be used to obtain a session token: - * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 - * - * For each RPC, a client needs to pass authentication information - * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. - * - * For Salesforce session token authentication, use: - * accesstoken : access token - * instanceurl : Salesforce instance URL - * tenantid : tenant/org id of the client - * - * StatusException is thrown in case of response failure for any request. - */ -service PubSub { - /* - * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request - * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. - * - * Typical flow: - * 1. Client requests for X number of events via FetchRequest. - * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. - * 3. Client consumes the FetchResponse messages as they come. - * 4. Client issues new FetchRequest for Y more number of events. This request can - * come before the server has delivered the earlier requested X number of events - * so the client gets a continuous stream of events if any. - * - * If a client requests more events before the server finishes the last - * requested amount, the server appends the new amount to the current amount of - * events it still needs to fetch and deliver. - * - * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. - * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a - * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription - * at a new point in the stream. - * - * The first FetchRequest of the stream identifies the topic to subscribe to. - * If any subsequent FetchRequest provides topic_name, it must match what - * was provided in the first FetchRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - */ - rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); - - // Get the event schema for a topic based on a schema ID. - rpc GetSchema (SchemaRequest) returns (SchemaInfo); - - /* - * Get the topic Information related to the specified topic. - */ - rpc GetTopic (TopicRequest) returns (TopicInfo); - - /* - * Send a publish request to synchronously publish events to a topic. - */ - rpc Publish (PublishRequest) returns (PublishResponse); - - /* - * Bidirectional Streaming RPC to publish events to the event bus. - * PublishRequest contains the batch of events to publish. - * - * The first PublishRequest of the stream identifies the topic to publish on. - * If any subsequent PublishRequest provides topic_name, it must match what - * was provided in the first PublishRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - * - * The server returns a PublishResponse for each PublishRequest when publish is - * complete for the batch. A client does not have to wait for a PublishResponse - * before sending a new PublishRequest, i.e. multiple publish batches can be queued - * up, which allows for higher publish rate as a client can asynchronously - * publish more events while publishes are still in flight on the server side. - * - * PublishResponse holds a PublishResult for each event published that indicates success - * or failure of the publish. A client can then retry the publish as needed before sending - * more PublishRequests for new events to publish. - * - * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. - * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, - * it must make a new PublishStream call to resume publishing. - */ - rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Same as Subscribe, but for Managed Subscription clients. - * This feature is part of an open beta release. - */ - rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); -} - -// Style guide: https://developers.google.com/protocol-buffers/docs/style + string schema_id = 5; + // RPC ID used to trace errors. + string rpc_id = 6; + } + + /* + * A request message for GetTopic. Note that the tenant/org is not directly referenced + * in the request, but is implicitly identified by the authentication headers. + */ + message TopicRequest { + // The name of the topic to retrieve. + string topic_name = 1; + } + + /* + * Reserved for future use. + * Header that contains information for distributed tracing, filtering, routing, etc. + * For example, X-B3-* headers assigned by a publisher are stored with the event and + * can provide a full distributed trace of the event across its entire lifecycle. + */ + message EventHeader { + string key = 1; + bytes value = 2; + } + + /* + * Represents an event that an event publishing app creates. + */ + message ProducerEvent { + // Either a user-provided ID or a system generated guid + string id = 1; + // Schema fingerprint for this event which is hash of the schema + string schema_id = 2; + // The message data field + bytes payload = 3; + // Reserved for future use. Key-value pairs of headers. + repeated EventHeader headers = 4; + } + + /* + * Represents an event that is consumed in a subscriber client. + * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. + */ + message ConsumerEvent { + // The event with fields identical to ProducerEvent + ProducerEvent event = 1; + /* The replay ID of the event. + * A subscriber app can store the replay ID. When the app restarts, it can resume subscription + * starting from events in the event bus after the event with that replay ID. + */ + bytes replay_id = 2; + } + + /* + * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. + */ + message PublishResult { + // Replay ID of the event + bytes replay_id = 1; + // Publish error if any + Error error = 2; + // Correlation key of the ProducerEvent + string correlation_key = 3; + } + + // Contains error information for an error that an RPC method returns. + message Error { + // Error code + ErrorCode code = 1; + // Error message + string msg = 2; + } + + // Supported error codes + enum ErrorCode { + UNKNOWN = 0; + PUBLISH = 1; + // ErrorCode for unrecoverable commit errors. + COMMIT = 2; + } + + /* + * Supported subscription replay start values. + * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. + */ + enum ReplayPreset { + // Start the subscription at the tip of the stream. + LATEST = 0; + // Start the subscription at the earliest point in the stream. + EARLIEST = 1; + // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. + CUSTOM = 2; + } + + /* + * Request for the Subscribe streaming RPC method. This request is used to: + * 1. Establish the initial subscribe stream. + * 2. Request more events from the subscription stream. + * Flow Control is handled by the subscriber via num_requested. + * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. + * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). + * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If + * a client needs to start at another point in the stream, it must start a new subscription. + */ + message FetchRequest { + /* + * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change + * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. + */ + string topic_name = 1; + + /* + * Subscription starting point. This is consumed only as part of the first FetchRequest + * when the subscription is set up. + */ + ReplayPreset replay_preset = 2; + /* + * If replay_preset of CUSTOM is selected, specify the subscription point to start after. + * This is consumed only as part of the first FetchRequest when the subscription is set up. + */ + bytes replay_id = 3; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 4; + string auth_refresh = 5; + } + + /* + * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). + * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The + * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the + * server and the latest replay ID. + */ + message FetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track + // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + } + + /* + * Request for the GetSchema RPC method. The schema request is based on the event schema ID. + */ + message SchemaRequest { + // Schema fingerprint for this event, which is a hash of the schema. + string schema_id = 1; + } + + /* + * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. + */ + message SchemaInfo { + // Avro schema in JSON format + string schema_json = 1; + // Schema fingerprint + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + } + + // Request for the Publish and PublishStream RPC method. + message PublishRequest { + // Topic to publish on + string topic_name = 1; + // Batch of ProducerEvent(s) to send + repeated ProducerEvent events = 2; + string auth_refresh = 3; + } + + /* + * Response for the Publish and PublishStream RPC methods. This returns + * a list of PublishResults for each event that the client attempted to + * publish. PublishResult indicates if publish succeeded or not + * for each event. It also returns the schema ID that was used to create + * the ProducerEvents in the PublishRequest. + */ + message PublishResponse { + // Publish results + repeated PublishResult results = 1; + // Schema fingerprint for this event, which is a hash of the schema + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request for the ManagedSubscribe streaming RPC method. This request is used to: + * 1. Establish the initial managed subscribe stream. + * 2. Request more events from the subscription stream. + * 3. Commit a Replay ID using CommitReplayRequest. + */ + message ManagedFetchRequest { + /* + * Managed subscription ID or developer name. This value corresponds to the + * ID or developer name of the ManagedEventSubscription Tooling API record. + * This value is consumed as part of the first ManagedFetchRequest only. + * The subscription_id cannot change in subsequent ManagedFetchRequests + * within the same subscribe stream, but can be omitted for efficiency. + */ + string subscription_id = 1; + string developer_name = 2; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 3; + string auth_refresh = 4; + CommitReplayRequest commit_replay_id_request = 5; + + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Response for the ManagedSubscribe streaming RPC method. This can return + * ConsumerEvent(s) or CommitReplayResponse along with other metadata. + */ + message ManagedFetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + // commit response + CommitReplayResponse commit_response = 5; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request to commit a Replay ID for the last processed event or for the latest + * replay ID received in an empty batch of events. + */ + message CommitReplayRequest { + // commit_request_id to identify commit responses + string commit_request_id = 1; + // replayId to commit + bytes replay_id = 2; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. + * N CommitReplayRequest(s) can get compressed in a batch resulting in a single + * CommitReplayResponse which reflects the latest values of last + * CommitReplayRequest in that batch. + */ + message CommitReplayResponse { + // commit_request_id to identify commit responses. + string commit_request_id = 1; + // replayId that may have been committed + bytes replay_id = 2; + // for failed commits + Error error = 3; + // time when server received request in epoch ms + int64 process_time = 4; + } + + /* + * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time + * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. + * + * A session token is needed to authenticate. Any of the Salesforce supported + * OAuth flows can be used to obtain a session token: + * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 + * + * For each RPC, a client needs to pass authentication information + * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. + * + * For Salesforce session token authentication, use: + * accesstoken : access token + * instanceurl : Salesforce instance URL + * tenantid : tenant/org id of the client + * + * StatusException is thrown in case of response failure for any request. + */ + service PubSub { + /* + * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request + * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. + * + * Typical flow: + * 1. Client requests for X number of events via FetchRequest. + * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. + * 3. Client consumes the FetchResponse messages as they come. + * 4. Client issues new FetchRequest for Y more number of events. This request can + * come before the server has delivered the earlier requested X number of events + * so the client gets a continuous stream of events if any. + * + * If a client requests more events before the server finishes the last + * requested amount, the server appends the new amount to the current amount of + * events it still needs to fetch and deliver. + * + * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. + * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a + * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription + * at a new point in the stream. + * + * The first FetchRequest of the stream identifies the topic to subscribe to. + * If any subsequent FetchRequest provides topic_name, it must match what + * was provided in the first FetchRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + */ + rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); + + // Get the event schema for a topic based on a schema ID. + rpc GetSchema (SchemaRequest) returns (SchemaInfo); + + /* + * Get the topic Information related to the specified topic. + */ + rpc GetTopic (TopicRequest) returns (TopicInfo); + + /* + * Send a publish request to synchronously publish events to a topic. + */ + rpc Publish (PublishRequest) returns (PublishResponse); + + /* + * Bidirectional Streaming RPC to publish events to the event bus. + * PublishRequest contains the batch of events to publish. + * + * The first PublishRequest of the stream identifies the topic to publish on. + * If any subsequent PublishRequest provides topic_name, it must match what + * was provided in the first PublishRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + * + * The server returns a PublishResponse for each PublishRequest when publish is + * complete for the batch. A client does not have to wait for a PublishResponse + * before sending a new PublishRequest, i.e. multiple publish batches can be queued + * up, which allows for higher publish rate as a client can asynchronously + * publish more events while publishes are still in flight on the server side. + * + * PublishResponse holds a PublishResult for each event published that indicates success + * or failure of the publish. A client can then retry the publish as needed before sending + * more PublishRequests for new events to publish. + * + * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. + * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, + * it must make a new PublishStream call to resume publishing. + */ + rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Same as Subscribe, but for Managed Subscription clients. + * This feature is part of an open beta release. + */ + rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); + } + + // Style guide: https://developers.google.com/protocol-buffers/docs/style \ No newline at end of file diff --git a/pubsub_api.proto b/pubsub_api.proto index f200016..337eb39 100644 --- a/pubsub_api.proto +++ b/pubsub_api.proto @@ -2,409 +2,409 @@ * Salesforce Pub/Sub API Version 1. */ -syntax = "proto3"; -package eventbus.v1; - -option java_multiple_files = true; -option java_package = "com.salesforce.eventbus.protobuf"; -option java_outer_classname = "PubSubProto"; - -option go_package = "github.com/developerforce/pub-sub-api/go/proto"; - -/* - * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. - */ -message TopicInfo { - // Topic name - string topic_name = 1; - // Tenant/org GUID - string tenant_guid = 2; - // Is publishing allowed? - bool can_publish = 3; - // Is subscription allowed? - bool can_subscribe = 4; - /* ID of the current topic schema, which can be used for - * publishing of generically serialized events. - */ - string schema_id = 5; - // RPC ID used to trace errors. - string rpc_id = 6; -} - -/* - * A request message for GetTopic. Note that the tenant/org is not directly referenced - * in the request, but is implicitly identified by the authentication headers. - */ -message TopicRequest { - // The name of the topic to retrieve. - string topic_name = 1; -} - -/* - * Reserved for future use. - * Header that contains information for distributed tracing, filtering, routing, etc. - * For example, X-B3-* headers assigned by a publisher are stored with the event and - * can provide a full distributed trace of the event across its entire lifecycle. - */ -message EventHeader { - string key = 1; - bytes value = 2; -} - -/* - * Represents an event that an event publishing app creates. - */ -message ProducerEvent { - // Either a user-provided ID or a system generated guid - string id = 1; - // Schema fingerprint for this event which is hash of the schema - string schema_id = 2; - // The message data field - bytes payload = 3; - // Reserved for future use. Key-value pairs of headers. - repeated EventHeader headers = 4; -} - -/* - * Represents an event that is consumed in a subscriber client. - * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. - */ -message ConsumerEvent { - // The event with fields identical to ProducerEvent - ProducerEvent event = 1; - /* The replay ID of the event. - * A subscriber app can store the replay ID. When the app restarts, it can resume subscription - * starting from events in the event bus after the event with that replay ID. - */ - bytes replay_id = 2; -} - -/* - * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. - */ -message PublishResult { - // Replay ID of the event - bytes replay_id = 1; - // Publish error if any - Error error = 2; - // Correlation key of the ProducerEvent - string correlation_key = 3; -} - -// Contains error information for an error that an RPC method returns. -message Error { - // Error code - ErrorCode code = 1; - // Error message - string msg = 2; -} - -// Supported error codes -enum ErrorCode { - UNKNOWN = 0; - PUBLISH = 1; - // ErrorCode for unrecoverable commit errors. - COMMIT = 2; -} - -/* - * Supported subscription replay start values. - * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. - */ -enum ReplayPreset { - // Start the subscription at the tip of the stream. - LATEST = 0; - // Start the subscription at the earliest point in the stream. - EARLIEST = 1; - // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. - CUSTOM = 2; -} - -/* - * Request for the Subscribe streaming RPC method. This request is used to: - * 1. Establish the initial subscribe stream. - * 2. Request more events from the subscription stream. - * Flow Control is handled by the subscriber via num_requested. - * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. - * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). - * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If - * a client needs to start at another point in the stream, it must start a new subscription. - */ -message FetchRequest { - /* - * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change - * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. - */ - string topic_name = 1; - - /* - * Subscription starting point. This is consumed only as part of the first FetchRequest - * when the subscription is set up. - */ - ReplayPreset replay_preset = 2; - /* - * If replay_preset of CUSTOM is selected, specify the subscription point to start after. - * This is consumed only as part of the first FetchRequest when the subscription is set up. - */ - bytes replay_id = 3; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 4; - string auth_refresh = 5; -} - -/* - * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). - * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The - * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the - * server and the latest replay ID. - */ -message FetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track - // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; -} - -/* - * Request for the GetSchema RPC method. The schema request is based on the event schema ID. - */ -message SchemaRequest { - // Schema fingerprint for this event, which is a hash of the schema. - string schema_id = 1; -} - -/* - * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. - */ -message SchemaInfo { - // Avro schema in JSON format - string schema_json = 1; - // Schema fingerprint - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; -} - -// Request for the Publish and PublishStream RPC method. -message PublishRequest { - // Topic to publish on - string topic_name = 1; - // Batch of ProducerEvent(s) to send - repeated ProducerEvent events = 2; - string auth_refresh = 3; -} - -/* - * Response for the Publish and PublishStream RPC methods. This returns - * a list of PublishResults for each event that the client attempted to - * publish. PublishResult indicates if publish succeeded or not - * for each event. It also returns the schema ID that was used to create - * the ProducerEvents in the PublishRequest. - */ -message PublishResponse { - // Publish results - repeated PublishResult results = 1; - // Schema fingerprint for this event, which is a hash of the schema - string schema_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request for the ManagedSubscribe streaming RPC method. This request is used to: - * 1. Establish the initial managed subscribe stream. - * 2. Request more events from the subscription stream. - * 3. Commit a Replay ID using CommitReplayRequest. - */ -message ManagedFetchRequest { - /* - * Managed subscription ID or developer name. This value corresponds to the - * ID or developer name of the ManagedEventSubscription Tooling API record. - * This value is consumed as part of the first ManagedFetchRequest only. - * The subscription_id cannot change in subsequent ManagedFetchRequests - * within the same subscribe stream, but can be omitted for efficiency. + syntax = "proto3"; + package eventbus.v1; + + option java_multiple_files = true; + option java_package = "com.salesforce.eventbus.protobuf"; + option java_outer_classname = "PubSubProto"; + + option go_package = "github.com/developerforce/pub-sub-api/go/proto"; + + /* + * Contains information about a topic and uniquely identifies it. TopicInfo is returned by the GetTopic RPC method. + */ + message TopicInfo { + // Topic name + string topic_name = 1; + // Tenant/org GUID + string tenant_guid = 2; + // Is publishing allowed? + bool can_publish = 3; + // Is subscription allowed? + bool can_subscribe = 4; + /* ID of the current topic schema, which can be used for + * publishing of generically serialized events. */ - string subscription_id = 1; - string developer_name = 2; - /* - * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server - * of additional processing capacity available on the client side. There is no guarantee of equal number of - * FetchResponse messages to be sent back. There is not necessarily a correspondence between - * number of requested events in FetchRequest and the number of events returned in subsequent - * FetchResponses. - */ - int32 num_requested = 3; - string auth_refresh = 4; - CommitReplayRequest commit_replay_id_request = 5; - -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Response for the ManagedSubscribe streaming RPC method. This can return - * ConsumerEvent(s) or CommitReplayResponse along with other metadata. - */ -message ManagedFetchResponse { - // Received events for subscription for client consumption - repeated ConsumerEvent events = 1; - // Latest replay ID of a subscription. - bytes latest_replay_id = 2; - // RPC ID used to trace errors. - string rpc_id = 3; - // Number of remaining events to be delivered to the client for a Subscribe RPC call. - int32 pending_num_requested = 4; - // commit response - CommitReplayResponse commit_response = 5; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Request to commit a Replay ID for the last processed event or for the latest - * replay ID received in an empty batch of events. - */ -message CommitReplayRequest { - // commit_request_id to identify commit responses - string commit_request_id = 1; - // replayId to commit - bytes replay_id = 2; -} - -/* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. - * N CommitReplayRequest(s) can get compressed in a batch resulting in a single - * CommitReplayResponse which reflects the latest values of last - * CommitReplayRequest in that batch. - */ -message CommitReplayResponse { - // commit_request_id to identify commit responses. - string commit_request_id = 1; - // replayId that may have been committed - bytes replay_id = 2; - // for failed commits - Error error = 3; - // time when server received request in epoch ms - int64 process_time = 4; -} - -/* - * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time - * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. - * - * A session token is needed to authenticate. Any of the Salesforce supported - * OAuth flows can be used to obtain a session token: - * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 - * - * For each RPC, a client needs to pass authentication information - * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. - * - * For Salesforce session token authentication, use: - * accesstoken : access token - * instanceurl : Salesforce instance URL - * tenantid : tenant/org id of the client - * - * StatusException is thrown in case of response failure for any request. - */ -service PubSub { - /* - * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request - * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. - * - * Typical flow: - * 1. Client requests for X number of events via FetchRequest. - * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. - * 3. Client consumes the FetchResponse messages as they come. - * 4. Client issues new FetchRequest for Y more number of events. This request can - * come before the server has delivered the earlier requested X number of events - * so the client gets a continuous stream of events if any. - * - * If a client requests more events before the server finishes the last - * requested amount, the server appends the new amount to the current amount of - * events it still needs to fetch and deliver. - * - * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. - * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a - * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription - * at a new point in the stream. - * - * The first FetchRequest of the stream identifies the topic to subscribe to. - * If any subsequent FetchRequest provides topic_name, it must match what - * was provided in the first FetchRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - */ - rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); - - // Get the event schema for a topic based on a schema ID. - rpc GetSchema (SchemaRequest) returns (SchemaInfo); - - /* - * Get the topic Information related to the specified topic. - */ - rpc GetTopic (TopicRequest) returns (TopicInfo); - - /* - * Send a publish request to synchronously publish events to a topic. - */ - rpc Publish (PublishRequest) returns (PublishResponse); - - /* - * Bidirectional Streaming RPC to publish events to the event bus. - * PublishRequest contains the batch of events to publish. - * - * The first PublishRequest of the stream identifies the topic to publish on. - * If any subsequent PublishRequest provides topic_name, it must match what - * was provided in the first PublishRequest; otherwise, the RPC returns an error - * with INVALID_ARGUMENT status. - * - * The server returns a PublishResponse for each PublishRequest when publish is - * complete for the batch. A client does not have to wait for a PublishResponse - * before sending a new PublishRequest, i.e. multiple publish batches can be queued - * up, which allows for higher publish rate as a client can asynchronously - * publish more events while publishes are still in flight on the server side. - * - * PublishResponse holds a PublishResult for each event published that indicates success - * or failure of the publish. A client can then retry the publish as needed before sending - * more PublishRequests for new events to publish. - * - * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. - * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, - * it must make a new PublishStream call to resume publishing. - */ - rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); - - /* - * This feature is part of an open beta release and is subject to the applicable - * Beta Services Terms provided at Agreements and Terms - * (https://www.salesforce.com/company/legal/agreements/). - * - * Same as Subscribe, but for Managed Subscription clients. - * This feature is part of an open beta release. - */ - rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); -} - -// Style guide: https://developers.google.com/protocol-buffers/docs/style + string schema_id = 5; + // RPC ID used to trace errors. + string rpc_id = 6; + } + + /* + * A request message for GetTopic. Note that the tenant/org is not directly referenced + * in the request, but is implicitly identified by the authentication headers. + */ + message TopicRequest { + // The name of the topic to retrieve. + string topic_name = 1; + } + + /* + * Reserved for future use. + * Header that contains information for distributed tracing, filtering, routing, etc. + * For example, X-B3-* headers assigned by a publisher are stored with the event and + * can provide a full distributed trace of the event across its entire lifecycle. + */ + message EventHeader { + string key = 1; + bytes value = 2; + } + + /* + * Represents an event that an event publishing app creates. + */ + message ProducerEvent { + // Either a user-provided ID or a system generated guid + string id = 1; + // Schema fingerprint for this event which is hash of the schema + string schema_id = 2; + // The message data field + bytes payload = 3; + // Reserved for future use. Key-value pairs of headers. + repeated EventHeader headers = 4; + } + + /* + * Represents an event that is consumed in a subscriber client. + * In addition to the fields in ProducerEvent, ConsumerEvent has the replay_id field. + */ + message ConsumerEvent { + // The event with fields identical to ProducerEvent + ProducerEvent event = 1; + /* The replay ID of the event. + * A subscriber app can store the replay ID. When the app restarts, it can resume subscription + * starting from events in the event bus after the event with that replay ID. + */ + bytes replay_id = 2; + } + + /* + * Event publish result that the Publish RPC method returns. The result contains replay_id or a publish error. + */ + message PublishResult { + // Replay ID of the event + bytes replay_id = 1; + // Publish error if any + Error error = 2; + // Correlation key of the ProducerEvent + string correlation_key = 3; + } + + // Contains error information for an error that an RPC method returns. + message Error { + // Error code + ErrorCode code = 1; + // Error message + string msg = 2; + } + + // Supported error codes + enum ErrorCode { + UNKNOWN = 0; + PUBLISH = 1; + // ErrorCode for unrecoverable commit errors. + COMMIT = 2; + } + + /* + * Supported subscription replay start values. + * By default, the subscription will start at the tip of the stream if ReplayPreset is not specified. + */ + enum ReplayPreset { + // Start the subscription at the tip of the stream. + LATEST = 0; + // Start the subscription at the earliest point in the stream. + EARLIEST = 1; + // Start the subscription after a custom point in the stream. This must be set with a valid replay_id in the FetchRequest. + CUSTOM = 2; + } + + /* + * Request for the Subscribe streaming RPC method. This request is used to: + * 1. Establish the initial subscribe stream. + * 2. Request more events from the subscription stream. + * Flow Control is handled by the subscriber via num_requested. + * A client can specify a starting point for the subscription with replay_preset and replay_id combinations. + * If no replay_preset is specified, the subscription starts at LATEST (tip of the stream). + * replay_preset and replay_id values are only consumed as part of the first FetchRequest. If + * a client needs to start at another point in the stream, it must start a new subscription. + */ + message FetchRequest { + /* + * Identifies a topic for subscription in the very first FetchRequest of the stream. The topic cannot change + * in subsequent FetchRequests within the same subscribe stream, but can be omitted for efficiency. + */ + string topic_name = 1; + + /* + * Subscription starting point. This is consumed only as part of the first FetchRequest + * when the subscription is set up. + */ + ReplayPreset replay_preset = 2; + /* + * If replay_preset of CUSTOM is selected, specify the subscription point to start after. + * This is consumed only as part of the first FetchRequest when the subscription is set up. + */ + bytes replay_id = 3; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 4; + string auth_refresh = 5; + } + + /* + * Response for the Subscribe streaming RPC method. This returns ConsumerEvent(s). + * If there are no events to deliver, the server sends an empty batch fetch response with the latest replay ID. The + * empty fetch response is sent within 270 seconds. An empty fetch response provides a periodic keepalive from the + * server and the latest replay ID. + */ + message FetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. Enables clients with an updated replay value so that they can keep track + // of their last consumed replay. Clients will not have to start a subscription at a very old replay in the case where a resubscribe is necessary. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + } + + /* + * Request for the GetSchema RPC method. The schema request is based on the event schema ID. + */ + message SchemaRequest { + // Schema fingerprint for this event, which is a hash of the schema. + string schema_id = 1; + } + + /* + * Response for the GetSchema RPC method. This returns the schema ID and schema of an event. + */ + message SchemaInfo { + // Avro schema in JSON format + string schema_json = 1; + // Schema fingerprint + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + } + + // Request for the Publish and PublishStream RPC method. + message PublishRequest { + // Topic to publish on + string topic_name = 1; + // Batch of ProducerEvent(s) to send + repeated ProducerEvent events = 2; + string auth_refresh = 3; + } + + /* + * Response for the Publish and PublishStream RPC methods. This returns + * a list of PublishResults for each event that the client attempted to + * publish. PublishResult indicates if publish succeeded or not + * for each event. It also returns the schema ID that was used to create + * the ProducerEvents in the PublishRequest. + */ + message PublishResponse { + // Publish results + repeated PublishResult results = 1; + // Schema fingerprint for this event, which is a hash of the schema + string schema_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request for the ManagedSubscribe streaming RPC method. This request is used to: + * 1. Establish the initial managed subscribe stream. + * 2. Request more events from the subscription stream. + * 3. Commit a Replay ID using CommitReplayRequest. + */ + message ManagedFetchRequest { + /* + * Managed subscription ID or developer name. This value corresponds to the + * ID or developer name of the ManagedEventSubscription Tooling API record. + * This value is consumed as part of the first ManagedFetchRequest only. + * The subscription_id cannot change in subsequent ManagedFetchRequests + * within the same subscribe stream, but can be omitted for efficiency. + */ + string subscription_id = 1; + string developer_name = 2; + /* + * Number of events a client is ready to accept. Each subsequent FetchRequest informs the server + * of additional processing capacity available on the client side. There is no guarantee of equal number of + * FetchResponse messages to be sent back. There is not necessarily a correspondence between + * number of requested events in FetchRequest and the number of events returned in subsequent + * FetchResponses. + */ + int32 num_requested = 3; + string auth_refresh = 4; + CommitReplayRequest commit_replay_id_request = 5; + + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Response for the ManagedSubscribe streaming RPC method. This can return + * ConsumerEvent(s) or CommitReplayResponse along with other metadata. + */ + message ManagedFetchResponse { + // Received events for subscription for client consumption + repeated ConsumerEvent events = 1; + // Latest replay ID of a subscription. + bytes latest_replay_id = 2; + // RPC ID used to trace errors. + string rpc_id = 3; + // Number of remaining events to be delivered to the client for a Subscribe RPC call. + int32 pending_num_requested = 4; + // commit response + CommitReplayResponse commit_response = 5; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Request to commit a Replay ID for the last processed event or for the latest + * replay ID received in an empty batch of events. + */ + message CommitReplayRequest { + // commit_request_id to identify commit responses + string commit_request_id = 1; + // replayId to commit + bytes replay_id = 2; + } + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * There is no guaranteed 1:1 CommitReplayRequest to CommitReplayResponse. + * N CommitReplayRequest(s) can get compressed in a batch resulting in a single + * CommitReplayResponse which reflects the latest values of last + * CommitReplayRequest in that batch. + */ + message CommitReplayResponse { + // commit_request_id to identify commit responses. + string commit_request_id = 1; + // replayId that may have been committed + bytes replay_id = 2; + // for failed commits + Error error = 3; + // time when server received request in epoch ms + int64 process_time = 4; + } + + /* + * The Pub/Sub API provides a single interface for publishing and subscribing to platform events, including real-time + * event monitoring events, and change data capture events. The Pub/Sub API is a gRPC API that is based on HTTP/2. + * + * A session token is needed to authenticate. Any of the Salesforce supported + * OAuth flows can be used to obtain a session token: + * https://help.salesforce.com/articleView?id=sf.remoteaccess_oauth_flows.htm&type=5 + * + * For each RPC, a client needs to pass authentication information + * as metadata headers (https://www.grpc.io/docs/guides/concepts/#metadata) with their method call. + * + * For Salesforce session token authentication, use: + * accesstoken : access token + * instanceurl : Salesforce instance URL + * tenantid : tenant/org id of the client + * + * StatusException is thrown in case of response failure for any request. + */ + service PubSub { + /* + * Bidirectional streaming RPC to subscribe to a Topic. The subscription is pull-based. A client can request + * for more events as it consumes events. This enables a client to handle flow control based on the client's processing speed. + * + * Typical flow: + * 1. Client requests for X number of events via FetchRequest. + * 2. Server receives request and delivers events until X events are delivered to the client via one or more FetchResponse messages. + * 3. Client consumes the FetchResponse messages as they come. + * 4. Client issues new FetchRequest for Y more number of events. This request can + * come before the server has delivered the earlier requested X number of events + * so the client gets a continuous stream of events if any. + * + * If a client requests more events before the server finishes the last + * requested amount, the server appends the new amount to the current amount of + * events it still needs to fetch and deliver. + * + * A client can subscribe at any point in the stream by providing a replay option in the first FetchRequest. + * The replay option is honored for the first FetchRequest received from a client. Any subsequent FetchRequests with a + * new replay option are ignored. A client needs to call the Subscribe RPC again to restart the subscription + * at a new point in the stream. + * + * The first FetchRequest of the stream identifies the topic to subscribe to. + * If any subsequent FetchRequest provides topic_name, it must match what + * was provided in the first FetchRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + */ + rpc Subscribe (stream FetchRequest) returns (stream FetchResponse); + + // Get the event schema for a topic based on a schema ID. + rpc GetSchema (SchemaRequest) returns (SchemaInfo); + + /* + * Get the topic Information related to the specified topic. + */ + rpc GetTopic (TopicRequest) returns (TopicInfo); + + /* + * Send a publish request to synchronously publish events to a topic. + */ + rpc Publish (PublishRequest) returns (PublishResponse); + + /* + * Bidirectional Streaming RPC to publish events to the event bus. + * PublishRequest contains the batch of events to publish. + * + * The first PublishRequest of the stream identifies the topic to publish on. + * If any subsequent PublishRequest provides topic_name, it must match what + * was provided in the first PublishRequest; otherwise, the RPC returns an error + * with INVALID_ARGUMENT status. + * + * The server returns a PublishResponse for each PublishRequest when publish is + * complete for the batch. A client does not have to wait for a PublishResponse + * before sending a new PublishRequest, i.e. multiple publish batches can be queued + * up, which allows for higher publish rate as a client can asynchronously + * publish more events while publishes are still in flight on the server side. + * + * PublishResponse holds a PublishResult for each event published that indicates success + * or failure of the publish. A client can then retry the publish as needed before sending + * more PublishRequests for new events to publish. + * + * A client must send a valid publish request with one or more events every 70 seconds to hold on to the stream. + * Otherwise, the server closes the stream and notifies the client. Once the client is notified of the stream closure, + * it must make a new PublishStream call to resume publishing. + */ + rpc PublishStream (stream PublishRequest) returns (stream PublishResponse); + + /* + * This feature is part of an open beta release and is subject to the applicable + * Beta Services Terms provided at Agreements and Terms + * (https://www.salesforce.com/company/legal/agreements/). + * + * Same as Subscribe, but for Managed Subscription clients. + * This feature is part of an open beta release. + */ + rpc ManagedSubscribe (stream ManagedFetchRequest) returns (stream ManagedFetchResponse); + } + + // Style guide: https://developers.google.com/protocol-buffers/docs/style \ No newline at end of file From 2eb4df209f8355641f5a5dc8e9dd9beecd4f3a04 Mon Sep 17 00:00:00 2001 From: Jason Alaya Date: Tue, 23 Jun 2026 12:19:32 -0700 Subject: [PATCH 5/5] Update build dependencies to match EventBusAPI versions Bump Java 11->17, grpc 1.64.0->1.79.0, protoc 3.25.3->3.25.8, protoc-gen-grpc-java 1.64.0->1.79.0, and protobuf-maven-plugin 0.5.0->0.6.1 to align with EventBusAPI/pom.xml. --- java/pom.xml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/java/pom.xml b/java/pom.xml index 1a88202..d057171 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -9,13 +9,13 @@ 1.0-SNAPSHOT - 11 - 11 - 11 + 17 + 17 + 17 1.11.0 - 1.64.0 - 3.25.3 - 1.64.0 + 1.79.0 + 3.25.8 + 1.79.0 @@ -83,7 +83,7 @@ org.xolstice.maven.plugins protobuf-maven-plugin - 0.5.0 + 0.6.1 com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} grpc-java