diff --git a/java/README.md b/java/README.md index 41be6df..7b23554 100644 --- a/java/README.md +++ b/java/README.md @@ -38,6 +38,7 @@ In the `src/main` directory of the project, you will find several sub-directorie * `LOGIN_URL`: Specify the login url of the Salesforce org being used to run the examples. * `USERNAME` & `PASSWORD`: For authentication via username and password, you will need to specify the username and password of the Salesforce org. * `ACCESS_TOKEN` & `TENANT_ID`: For authentication via session token and tenant ID, you will need to specify the sessionToken and tenant ID of the Salesforce org. + * `CONSUMER_KEY` & `CONSUMER_SECRET` & `TENANT_ID`: For authentication via the OAuth2 client credentials flow, specify the consumer key and consumer secret of the org's External Client App along with the tenant ID. When the app is configured to issue JWT-based (orgJWT) access tokens, the `Subscribe` example periodically mints a fresh token and sends it to the server via the `auth_refresh` field of a `FetchRequest` to keep a long-lived subscription stream alive. See the `refreshAuth()` method in `Subscribe.java` and the `scheduleAuthRefresh()`/`getFreshAuthToken()` helpers in `CommonContext.java`. * When using managed event subscriptions (beta), one of these configurations is required. * `MANAGED_SUB_DEVELOPER_NAME`: Specify the developer name of ManagedEventSubscription. This parameter is used in ManagedSubscribe.java. * `MANAGED_SUB_ID`: Specify the ID of the ManagedEventSubscription Tooling API record. This parameter is used in ManagedSubscribe.java. diff --git a/java/pom.xml b/java/pom.xml index 1a88202..d057171 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -9,13 +9,13 @@ 1.0-SNAPSHOT - 11 - 11 - 11 + 17 + 17 + 17 1.11.0 - 1.64.0 - 3.25.3 - 1.64.0 + 1.79.0 + 3.25.8 + 1.79.0 @@ -83,7 +83,7 @@ org.xolstice.maven.plugins protobuf-maven-plugin - 0.5.0 + 0.6.1 com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} grpc-java diff --git a/java/src/main/java/genericpubsub/Subscribe.java b/java/src/main/java/genericpubsub/Subscribe.java index b07bdb3..bd44ad1 100644 --- a/java/src/main/java/genericpubsub/Subscribe.java +++ b/java/src/main/java/genericpubsub/Subscribe.java @@ -22,7 +22,9 @@ * A single-topic subscriber that consumes events using Event Bus API Subscribe RPC. The example demonstrates how to: * - implement a long-lived subscription to a single topic * - a basic flow control strategy - * - a basic retry strategy. + * - a basic retry strategy + * - a periodic auth refresh strategy to keep the stream alive when authenticating via the OAuth2 + * client credentials flow (which may issue short-lived orgJWT access tokens). See {@link #refreshAuth()}. * * Example: * ./run.sh genericpubsub.Subscribe @@ -83,6 +85,10 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver { + Thread thread = new Thread(runnable, "auth-refresh-thread"); + thread.setDaemon(true); + return thread; + }; + private final ScheduledExecutorService authRefreshScheduler = + Executors.newScheduledThreadPool(1, authRefreshThreadFactory); + protected String tenantGuid; protected String busTopicName; protected TopicInfo topicInfo; @@ -119,8 +134,24 @@ public CallCredentials setupCallCredentials(ExampleConfigurations options) { close(); throw new IllegalArgumentException("cannot log in with username/password", e); } + } else if (options.getConsumerKey() != null && options.getConsumerSecret() != null) { + if (options.getTenantId() == null || options.getTenantId().isEmpty()) { + close(); + throw new IllegalArgumentException("Please provide a Tenant ID for the OAuth2 client credentials flow"); + } + try { + oAuthClientCredSessionFlow = new OAuthClientCredSessionFlow(httpClient, + options.getLoginUrl(), + options.getTenantId(), + options.getConsumerKey(), + options.getConsumerSecret()); + return oAuthClientCredSessionFlow.loginWithAccessToken(); + } catch (Exception e) { + close(); + throw new IllegalArgumentException("Unable to obtain OAuth2 token", e); + } } else { - logger.warn("Please use either username/password or session token for authentication"); + logger.warn("Please use username/password, session token, or OAuth2 client credentials for authentication"); close(); return null; } @@ -165,6 +196,60 @@ protected void setupTopicDetails(final String topicName, final boolean pubOrSubM } } + /** + * Schedules a periodic auth refresh to keep a long-lived bidirectional stream alive. + * + * This is necessary when authenticating via the OAuth2 client credentials flow, since the + * External Client App may issue short-lived orgJWT access tokens that expire while the stream + * is open. Subscribers that maintain a stream (e.g. Subscribe) should call this after starting + * the subscription, and override {@link #refreshAuth()} to send the refreshed token to the + * server over their stream. Other auth methods (username/password, session token) do not need + * to refresh, so this is a no-op for them. + */ + protected void scheduleAuthRefresh() { + if (oAuthClientCredSessionFlow != null) { + authRefreshScheduler.scheduleAtFixedRate(this::refreshAuth, 10, 60, TimeUnit.SECONDS); + } + } + + /** + * Stops the periodic auth refresh and waits for any in-flight refreshAuth() to finish. This is + * idempotent. Streaming examples that close their own stream should call this BEFORE completing the + * stream, so a scheduled refreshAuth() cannot call onNext() on an already-completed stream. It is + * also called from {@link #close()} as a safety net. + */ + protected void stopAuthRefresh() { + authRefreshScheduler.shutdownNow(); + try { + authRefreshScheduler.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting to stop auth refresh scheduler", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Mints a fresh auth token for use in {@code auth_refresh} requests on bidirectional streams. + * + * @return a fresh token string, or null if auth refresh is not applicable for the current auth method + */ + protected String getFreshAuthToken() { + if (oAuthClientCredSessionFlow != null) { + APISessionCredentials freshCreds = oAuthClientCredSessionFlow.loginWithAccessToken(); + return freshCreds.getToken(); + } + return null; + } + + /** + * Sends a refreshed auth token to the server over the example's stream. The base implementation + * is a no-op; streaming examples that support auth refresh (e.g. Subscribe) override this to put + * the fresh token on their request stream via {@code FetchRequest.setAuthRefresh}. + */ + protected void refreshAuth() { + // No-op by default. See Subscribe#refreshAuth for an example implementation. + } + /** * Helper function to convert the replayId in long to ByteString type. * @@ -362,6 +447,11 @@ public String getSessionToken() { */ @Override public void close() { + // Stop the auth refresh scheduler first so a scheduled refreshAuth() cannot fire against an + // already-stopped HTTP client or a shutting-down gRPC channel. Idempotent: subclasses that manage + // their own stream (e.g. Subscribe) may have already called stopAuthRefresh() before this. + stopAuthRefresh(); + if (httpClient != null) { try { httpClient.stop(); diff --git a/java/src/main/java/utility/ExampleConfigurations.java b/java/src/main/java/utility/ExampleConfigurations.java index ee0128a..7e020da 100644 --- a/java/src/main/java/utility/ExampleConfigurations.java +++ b/java/src/main/java/utility/ExampleConfigurations.java @@ -21,6 +21,8 @@ public class ExampleConfigurations { private String loginUrl; private String tenantId; private String accessToken; + private String consumerKey; + private String consumerSecret; private String pubsubHost; private Integer pubsubPort; private String topic; @@ -57,6 +59,8 @@ public ExampleConfigurations(String filename) throws IOException { this.topic = obj.get("TOPIC") == null ? "/event/Order_Event__e" : obj.get("TOPIC").toString(); this.tenantId = obj.get("TENANT_ID") == null ? null : obj.get("TENANT_ID").toString(); this.accessToken = obj.get("ACCESS_TOKEN") == null ? null : obj.get("ACCESS_TOKEN").toString(); + this.consumerKey = obj.get("CONSUMER_KEY") == null ? null : obj.get("CONSUMER_KEY").toString(); + this.consumerSecret = obj.get("CONSUMER_SECRET") == null ? null : obj.get("CONSUMER_SECRET").toString(); this.numberOfEventsToPublish = obj.get("NUMBER_OF_EVENTS_TO_PUBLISH") == null ? 5 : Integer.parseInt(obj.get("NUMBER_OF_EVENTS_TO_PUBLISH").toString()); this.singlePublishRequest = obj.get("SINGLE_PUBLISH_REQUEST") == null ? @@ -156,6 +160,22 @@ public void setAccessToken(String accessToken) { this.accessToken = accessToken; } + public String getConsumerKey() { + return consumerKey; + } + + public void setConsumerKey(String consumerKey) { + this.consumerKey = consumerKey; + } + + public String getConsumerSecret() { + return consumerSecret; + } + + public void setConsumerSecret(String consumerSecret) { + this.consumerSecret = consumerSecret; + } + public String getPubsubHost() { return pubsubHost; } diff --git a/java/src/main/java/utility/OAuthClientCredSessionFlow.java b/java/src/main/java/utility/OAuthClientCredSessionFlow.java new file mode 100644 index 0000000..78d5f87 --- /dev/null +++ b/java/src/main/java/utility/OAuthClientCredSessionFlow.java @@ -0,0 +1,109 @@ +package utility; + +import java.net.URL; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.util.FormContentProvider; +import org.eclipse.jetty.util.Fields; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * OAuth2 client credentials flow used to obtain access tokens for Pub/Sub API. + * + * The access token returned by this flow can be either an opaque session token or a JWT-based + * (orgJWT) token. The token type is determined by the External Client App configuration in the + * Salesforce org. JWT-based access tokens are short-lived and must be refreshed periodically while + * a subscription stays open; the {@link CommonContext#scheduleAuthRefresh()} helper drives that + * refresh by minting a fresh token via {@link #loginWithAccessToken()} and sending it back to the + * server over the bidirectional stream as an {@code auth_refresh}. + */ +public class OAuthClientCredSessionFlow { + private static final Logger LOGGER = LoggerFactory.getLogger(OAuthClientCredSessionFlow.class); + + private static final String OAUTH2_TOKEN_ENDPOINT = "/services/oauth2/token"; + private final HttpClient httpClient; + private final String loginEndpoint; + private final String tenantId; + private final String clientId; + private final String clientSecret; + + public OAuthClientCredSessionFlow(HttpClient httpClient, String loginEndpoint, String tenantId, String clientId, + String clientSecret) { + this.httpClient = httpClient; + this.loginEndpoint = loginEndpoint; + this.tenantId = tenantId; + this.clientId = clientId; + this.clientSecret = clientSecret; + } + + /** + * Performs the OAuth2 client credentials exchange and returns gRPC call credentials populated + * with the freshly minted access token and the instance URL returned by the token endpoint. + */ + public APISessionCredentials loginWithAccessToken() { + JSONObject responseJson = requestOAuth2Token(loginEndpoint); + Object accessToken = responseJson.get("access_token"); + Object instanceUrl = responseJson.get("instance_url"); + if (accessToken == null || instanceUrl == null) { + throw new RuntimeException( + "OAuth2 token response is missing access_token or instance_url: " + responseJson); + } + LOGGER.debug("created OAuth2 session token credentials for tenant {} from {}", tenantId, instanceUrl); + return new APISessionCredentials(tenantId, instanceUrl.toString(), accessToken.toString()); + } + + /** + * Makes an HTTP POST request to the OAuth2 token endpoint and returns the parsed JSON response. + * + * @param loginEndpoint The OAuth2 token endpoint base URL + * @return JSONObject containing the OAuth2 token response + * @throws RuntimeException if the request fails or the response cannot be parsed + */ + private JSONObject requestOAuth2Token(String loginEndpoint) { + try { + URL endpoint = new URL(loginEndpoint + OAUTH2_TOKEN_ENDPOINT); + Request post = httpClient.POST(endpoint.toURI()); + + post.header("Content-Type", "application/x-www-form-urlencoded"); + Fields fields = new Fields(); + fields.add("grant_type", "client_credentials"); + fields.add("client_id", clientId); + fields.add("client_secret", clientSecret); + + post.content(new FormContentProvider(fields)); + ContentResponse response = post.send(); + + if (response.getStatus() != 200) { + String errorMessage = parseErrorResponse(response.getContentAsString()); + throw new RuntimeException( + String.format("OAuth2 client credentials error: %d - %s", response.getStatus(), errorMessage)); + } + return (JSONObject) new JSONParser().parse(response.getContentAsString()); + } catch (Exception e) { + throw new RuntimeException("Failed to obtain OAuth2 token", e); + } + } + + private static String parseErrorResponse(String responseBody) { + try { + JSONObject errorResponse = (JSONObject) new JSONParser().parse(responseBody); + + String error = errorResponse.get("error") == null ? null : errorResponse.get("error").toString(); + String errorDescription = errorResponse.get("error_description") == null ? null + : errorResponse.get("error_description").toString(); + if (error != null && errorDescription != null) { + return String.format("%s: %s", error, errorDescription); + } else if (error != null) { + return error; + } + } catch (Exception e) { + LOGGER.debug("Could not parse error response as JSON", e); + } + return responseBody; + } +} diff --git a/java/src/main/proto/pubsub_api.proto b/java/src/main/proto/pubsub_api.proto index 0152e77..337eb39 100644 --- a/java/src/main/proto/pubsub_api.proto +++ b/java/src/main/proto/pubsub_api.proto @@ -155,7 +155,6 @@ * FetchResponses. */ int32 num_requested = 4; - // For internal Salesforce use only. string auth_refresh = 5; } @@ -203,7 +202,6 @@ string topic_name = 1; // Batch of ProducerEvent(s) to send repeated ProducerEvent events = 2; - // For internal Salesforce use only. string auth_refresh = 3; } @@ -251,7 +249,6 @@ * FetchResponses. */ int32 num_requested = 3; - // For internal Salesforce use only. string auth_refresh = 4; CommitReplayRequest commit_replay_id_request = 5; diff --git a/java/src/main/resources/arguments.yaml b/java/src/main/resources/arguments.yaml index 569ce86..2f9fbff 100644 --- a/java/src/main/resources/arguments.yaml +++ b/java/src/main/resources/arguments.yaml @@ -14,16 +14,22 @@ PUBSUB_PORT: 7443 # Your Salesforce Login URL LOGIN_URL: null -# For authentication, you can use either username/password or accessToken/tenantId types. -# Either one of the combinations is required. Please specify `null` values to the unused type. +# For authentication, you can use username/password, accessToken/tenantId, or the OAuth2 client +# credentials flow (consumerKey/consumerSecret/tenantId). Exactly one of the combinations is +# required. Please specify `null` values to the unused types. # Your Salesforce Username USERNAME: null # Your Salesforce Password PASSWORD: null -# Your Salesforce org Tenant ID +# Your Salesforce org Tenant ID. Required for both the session token and OAuth2 client credentials flows. TENANT_ID: null # Your Salesforce Session Token ACCESS_TOKEN: null +# OAuth2 client credentials flow (used together with TENANT_ID). The consumer key and secret of the +# org's External Client App. When the app is configured to issue JWT-based (orgJWT) +# access tokens, the Subscribe example refreshes the token periodically to keep a long-lived stream alive. +CONSUMER_KEY: null +CONSUMER_SECRET: null # ========================= # Optional Configurations: diff --git a/pubsub_api.proto b/pubsub_api.proto index 0152e77..337eb39 100644 --- a/pubsub_api.proto +++ b/pubsub_api.proto @@ -155,7 +155,6 @@ * FetchResponses. */ int32 num_requested = 4; - // For internal Salesforce use only. string auth_refresh = 5; } @@ -203,7 +202,6 @@ string topic_name = 1; // Batch of ProducerEvent(s) to send repeated ProducerEvent events = 2; - // For internal Salesforce use only. string auth_refresh = 3; } @@ -251,7 +249,6 @@ * FetchResponses. */ int32 num_requested = 3; - // For internal Salesforce use only. string auth_refresh = 4; CommitReplayRequest commit_replay_id_request = 5;