Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ In the `src/main` directory of the project, you will find several sub-directorie
* `LOGIN_URL`: Specify the login url of the Salesforce org being used to run the examples.
* `USERNAME` & `PASSWORD`: For authentication via username and password, you will need to specify the username and password of the Salesforce org.
* `ACCESS_TOKEN` & `TENANT_ID`: For authentication via session token and tenant ID, you will need to specify the sessionToken and tenant ID of the Salesforce org.
* `CONSUMER_KEY` & `CONSUMER_SECRET` & `TENANT_ID`: For authentication via the OAuth2 client credentials flow, specify the consumer key and consumer secret of the org's External Client App along with the tenant ID. When the app is configured to issue JWT-based (orgJWT) access tokens, the `Subscribe` example periodically mints a fresh token and sends it to the server via the `auth_refresh` field of a `FetchRequest` to keep a long-lived subscription stream alive. See the `refreshAuth()` method in `Subscribe.java` and the `scheduleAuthRefresh()`/`getFreshAuthToken()` helpers in `CommonContext.java`.
* When using managed event subscriptions (beta), one of these configurations is required.
* `MANAGED_SUB_DEVELOPER_NAME`: Specify the developer name of ManagedEventSubscription. This parameter is used in ManagedSubscribe.java.
* `MANAGED_SUB_ID`: Specify the ID of the ManagedEventSubscription Tooling API record. This parameter is used in ManagedSubscribe.java.
Expand Down
14 changes: 7 additions & 7 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>11</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<avro.version>1.11.0</avro.version>
<grpc.version>1.64.0</grpc.version>
<protoc.version>3.25.3</protoc.version>
<protocJava.version>1.64.0</protocJava.version>
<grpc.version>1.79.0</grpc.version>
<protoc.version>3.25.8</protoc.version>
<protocJava.version>1.79.0</protocJava.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -83,7 +83,7 @@
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
Expand Down
36 changes: 34 additions & 2 deletions java/src/main/java/genericpubsub/Subscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
* A single-topic subscriber that consumes events using Event Bus API Subscribe RPC. The example demonstrates how to:
* - implement a long-lived subscription to a single topic
* - a basic flow control strategy
* - a basic retry strategy.
* - a basic retry strategy
* - a periodic auth refresh strategy to keep the stream alive when authenticating via the OAuth2
* client credentials flow (which may issue short-lived orgJWT access tokens). See {@link #refreshAuth()}.
*
* Example:
* ./run.sh genericpubsub.Subscribe
Expand Down Expand Up @@ -83,6 +85,10 @@ public Subscribe(ExampleConfigurations exampleConfigurations, StreamObserver<Fet
public void startSubscription() {
logger.info("Subscription started for topic: " + busTopicName + ".");
fetch(BATCH_SIZE, busTopicName, replayPreset, customReplayId);
// Schedule periodic auth refresh to keep the stream alive when using the OAuth2 client
// credentials flow, which may issue short-lived orgJWT access tokens. This is a no-op for
// username/password or session token auth. Must be called after fetch() sets up serverStream.
scheduleAuthRefresh();
// Thread being blocked here for demonstration of this specific example. Blocking the thread in production is not recommended.
while(isActive.get()) {
waitInMillis(5_000);
Expand Down Expand Up @@ -182,7 +188,6 @@ public void onError(Throwable t) {
String errorCode = (trailers != null && trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER)) != null) ?
trailers.get(Metadata.Key.of("error-code", Metadata.ASCII_STRING_MARSHALLER)) : null;

// Closing the old stream for sanity
serverStream.onCompleted();

ReplayPreset retryReplayPreset = ReplayPreset.LATEST;
Expand Down Expand Up @@ -280,6 +285,30 @@ public void fetchMore(int numEvents) {
serverStream.onNext(fetchRequest);
}

/**
* Mints a fresh auth token and sends it to the server over the subscribe stream as an
* {@code auth_refresh}. This keeps a long-lived subscription alive when using the OAuth2 client
* credentials flow with short-lived (orgJWT) access tokens. Scheduled by {@link #startSubscription()}
* via {@link utility.CommonContext#scheduleAuthRefresh()} and only active for OAuth2 auth.
*/
@Override
protected void refreshAuth() {
// Runs on the auth-refresh scheduler thread. Any uncaught exception here would cancel all future
// scheduled refreshes (scheduleAtFixedRate semantics) and let the orgJWT expire, silently dropping
// the subscription. So we catch everything and simply retry on the next tick.
try {
logger.info("Refreshing auth");
String freshToken = getFreshAuthToken();
if (freshToken == null) {
return;
}
FetchRequest fetchRequest = FetchRequest.newBuilder().setAuthRefresh(freshToken).build();
serverStream.onNext(fetchRequest);
} catch (Exception e) {
logger.warn("Failed to refresh auth; will retry on next scheduled interval.", e);
}
}

/**
* General getters and setters.
*/
Expand Down Expand Up @@ -307,6 +336,9 @@ public void setStoredReplay(ByteString storedReplay) {
*/
@Override
public synchronized void close() {
// Stop auth refresh before completing the stream so a scheduled refreshAuth() cannot call
// onNext() on the stream we are about to (or have just) completed.
stopAuthRefresh();
try {
if (serverStream != null) {
serverStream.onCompleted();
Expand Down
92 changes: 91 additions & 1 deletion java/src/main/java/utility/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -45,6 +48,18 @@ public class CommonContext implements AutoCloseable {
protected final SessionTokenService sessionTokenService;
protected final CallCredentials callCredentials;

// Set only when authenticating via the OAuth2 client credentials flow. Used to mint fresh
// access tokens (which may be short-lived orgJWTs) for periodic auth refresh on long-lived streams.
protected OAuthClientCredSessionFlow oAuthClientCredSessionFlow;

private final ThreadFactory authRefreshThreadFactory = runnable -> {
Thread thread = new Thread(runnable, "auth-refresh-thread");
thread.setDaemon(true);
return thread;
};
private final ScheduledExecutorService authRefreshScheduler =
Executors.newScheduledThreadPool(1, authRefreshThreadFactory);

protected String tenantGuid;
protected String busTopicName;
protected TopicInfo topicInfo;
Expand Down Expand Up @@ -119,8 +134,24 @@ public CallCredentials setupCallCredentials(ExampleConfigurations options) {
close();
throw new IllegalArgumentException("cannot log in with username/password", e);
}
} else if (options.getConsumerKey() != null && options.getConsumerSecret() != null) {
if (options.getTenantId() == null || options.getTenantId().isEmpty()) {
close();
throw new IllegalArgumentException("Please provide a Tenant ID for the OAuth2 client credentials flow");
}
try {
oAuthClientCredSessionFlow = new OAuthClientCredSessionFlow(httpClient,
options.getLoginUrl(),
options.getTenantId(),
options.getConsumerKey(),
options.getConsumerSecret());
return oAuthClientCredSessionFlow.loginWithAccessToken();
} catch (Exception e) {
close();
throw new IllegalArgumentException("Unable to obtain OAuth2 token", e);
}
} else {
logger.warn("Please use either username/password or session token for authentication");
logger.warn("Please use username/password, session token, or OAuth2 client credentials for authentication");
close();
return null;
}
Expand Down Expand Up @@ -165,6 +196,60 @@ protected void setupTopicDetails(final String topicName, final boolean pubOrSubM
}
}

/**
* Schedules a periodic auth refresh to keep a long-lived bidirectional stream alive.
*
* This is necessary when authenticating via the OAuth2 client credentials flow, since the
* External Client App may issue short-lived orgJWT access tokens that expire while the stream
* is open. Subscribers that maintain a stream (e.g. Subscribe) should call this after starting
* the subscription, and override {@link #refreshAuth()} to send the refreshed token to the
* server over their stream. Other auth methods (username/password, session token) do not need
* to refresh, so this is a no-op for them.
*/
protected void scheduleAuthRefresh() {
if (oAuthClientCredSessionFlow != null) {
authRefreshScheduler.scheduleAtFixedRate(this::refreshAuth, 10, 60, TimeUnit.SECONDS);
}
}

/**
* Stops the periodic auth refresh and waits for any in-flight refreshAuth() to finish. This is
* idempotent. Streaming examples that close their own stream should call this BEFORE completing the
* stream, so a scheduled refreshAuth() cannot call onNext() on an already-completed stream. It is
* also called from {@link #close()} as a safety net.
*/
protected void stopAuthRefresh() {
authRefreshScheduler.shutdownNow();
try {
authRefreshScheduler.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting to stop auth refresh scheduler", e);
Thread.currentThread().interrupt();
}
}

/**
* Mints a fresh auth token for use in {@code auth_refresh} requests on bidirectional streams.
*
* @return a fresh token string, or null if auth refresh is not applicable for the current auth method
*/
protected String getFreshAuthToken() {
if (oAuthClientCredSessionFlow != null) {
APISessionCredentials freshCreds = oAuthClientCredSessionFlow.loginWithAccessToken();
return freshCreds.getToken();
}
return null;
}

/**
* Sends a refreshed auth token to the server over the example's stream. The base implementation
* is a no-op; streaming examples that support auth refresh (e.g. Subscribe) override this to put
* the fresh token on their request stream via {@code FetchRequest.setAuthRefresh}.
*/
protected void refreshAuth() {
// No-op by default. See Subscribe#refreshAuth for an example implementation.
}

/**
* Helper function to convert the replayId in long to ByteString type.
*
Expand Down Expand Up @@ -362,6 +447,11 @@ public String getSessionToken() {
*/
@Override
public void close() {
// Stop the auth refresh scheduler first so a scheduled refreshAuth() cannot fire against an
// already-stopped HTTP client or a shutting-down gRPC channel. Idempotent: subclasses that manage
// their own stream (e.g. Subscribe) may have already called stopAuthRefresh() before this.
stopAuthRefresh();

if (httpClient != null) {
try {
httpClient.stop();
Expand Down
20 changes: 20 additions & 0 deletions java/src/main/java/utility/ExampleConfigurations.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class ExampleConfigurations {
private String loginUrl;
private String tenantId;
private String accessToken;
private String consumerKey;
private String consumerSecret;
private String pubsubHost;
private Integer pubsubPort;
private String topic;
Expand Down Expand Up @@ -57,6 +59,8 @@ public ExampleConfigurations(String filename) throws IOException {
this.topic = obj.get("TOPIC") == null ? "/event/Order_Event__e" : obj.get("TOPIC").toString();
this.tenantId = obj.get("TENANT_ID") == null ? null : obj.get("TENANT_ID").toString();
this.accessToken = obj.get("ACCESS_TOKEN") == null ? null : obj.get("ACCESS_TOKEN").toString();
this.consumerKey = obj.get("CONSUMER_KEY") == null ? null : obj.get("CONSUMER_KEY").toString();
this.consumerSecret = obj.get("CONSUMER_SECRET") == null ? null : obj.get("CONSUMER_SECRET").toString();
this.numberOfEventsToPublish = obj.get("NUMBER_OF_EVENTS_TO_PUBLISH") == null ?
5 : Integer.parseInt(obj.get("NUMBER_OF_EVENTS_TO_PUBLISH").toString());
this.singlePublishRequest = obj.get("SINGLE_PUBLISH_REQUEST") == null ?
Expand Down Expand Up @@ -156,6 +160,22 @@ public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}

public String getConsumerKey() {
return consumerKey;
}

public void setConsumerKey(String consumerKey) {
this.consumerKey = consumerKey;
}

public String getConsumerSecret() {
return consumerSecret;
}

public void setConsumerSecret(String consumerSecret) {
this.consumerSecret = consumerSecret;
}

public String getPubsubHost() {
return pubsubHost;
}
Expand Down
109 changes: 109 additions & 0 deletions java/src/main/java/utility/OAuthClientCredSessionFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package utility;

import java.net.URL;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.FormContentProvider;
import org.eclipse.jetty.util.Fields;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* OAuth2 client credentials flow used to obtain access tokens for Pub/Sub API.
*
* The access token returned by this flow can be either an opaque session token or a JWT-based
* (orgJWT) token. The token type is determined by the External Client App configuration in the
* Salesforce org. JWT-based access tokens are short-lived and must be refreshed periodically while
* a subscription stays open; the {@link CommonContext#scheduleAuthRefresh()} helper drives that
* refresh by minting a fresh token via {@link #loginWithAccessToken()} and sending it back to the
* server over the bidirectional stream as an {@code auth_refresh}.
*/
public class OAuthClientCredSessionFlow {
private static final Logger LOGGER = LoggerFactory.getLogger(OAuthClientCredSessionFlow.class);

private static final String OAUTH2_TOKEN_ENDPOINT = "/services/oauth2/token";
private final HttpClient httpClient;
private final String loginEndpoint;
private final String tenantId;
private final String clientId;
private final String clientSecret;

public OAuthClientCredSessionFlow(HttpClient httpClient, String loginEndpoint, String tenantId, String clientId,
String clientSecret) {
this.httpClient = httpClient;
this.loginEndpoint = loginEndpoint;
this.tenantId = tenantId;
this.clientId = clientId;
this.clientSecret = clientSecret;
}

/**
* Performs the OAuth2 client credentials exchange and returns gRPC call credentials populated
* with the freshly minted access token and the instance URL returned by the token endpoint.
*/
public APISessionCredentials loginWithAccessToken() {
JSONObject responseJson = requestOAuth2Token(loginEndpoint);
Object accessToken = responseJson.get("access_token");
Object instanceUrl = responseJson.get("instance_url");
if (accessToken == null || instanceUrl == null) {
throw new RuntimeException(
"OAuth2 token response is missing access_token or instance_url: " + responseJson);
}
LOGGER.debug("created OAuth2 session token credentials for tenant {} from {}", tenantId, instanceUrl);
return new APISessionCredentials(tenantId, instanceUrl.toString(), accessToken.toString());
}

/**
* Makes an HTTP POST request to the OAuth2 token endpoint and returns the parsed JSON response.
*
* @param loginEndpoint The OAuth2 token endpoint base URL
* @return JSONObject containing the OAuth2 token response
* @throws RuntimeException if the request fails or the response cannot be parsed
*/
private JSONObject requestOAuth2Token(String loginEndpoint) {
try {
URL endpoint = new URL(loginEndpoint + OAUTH2_TOKEN_ENDPOINT);
Request post = httpClient.POST(endpoint.toURI());

post.header("Content-Type", "application/x-www-form-urlencoded");
Fields fields = new Fields();
fields.add("grant_type", "client_credentials");
fields.add("client_id", clientId);
fields.add("client_secret", clientSecret);

post.content(new FormContentProvider(fields));
ContentResponse response = post.send();

if (response.getStatus() != 200) {
String errorMessage = parseErrorResponse(response.getContentAsString());
throw new RuntimeException(
String.format("OAuth2 client credentials error: %d - %s", response.getStatus(), errorMessage));
}
return (JSONObject) new JSONParser().parse(response.getContentAsString());
} catch (Exception e) {
throw new RuntimeException("Failed to obtain OAuth2 token", e);
}
}

private static String parseErrorResponse(String responseBody) {
try {
JSONObject errorResponse = (JSONObject) new JSONParser().parse(responseBody);

String error = errorResponse.get("error") == null ? null : errorResponse.get("error").toString();
String errorDescription = errorResponse.get("error_description") == null ? null
: errorResponse.get("error_description").toString();
if (error != null && errorDescription != null) {
return String.format("%s: %s", error, errorDescription);
} else if (error != null) {
return error;
}
} catch (Exception e) {
LOGGER.debug("Could not parse error response as JSON", e);
}
return responseBody;
}
}
Loading