diff --git a/azure-blob-payloads/build.gradle b/azure-blob-payloads/build.gradle deleted file mode 100644 index 691d0931..00000000 --- a/azure-blob-payloads/build.gradle +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Build file for the azure-blob-payloads module. - * Provides a BlobPayloadStore implementation for externalizing large payloads to Azure Blob Storage. - */ - -plugins { - id 'java-library' - id 'maven-publish' - id 'signing' - id 'com.github.spotbugs' version '6.4.8' -} - -group 'com.microsoft' -version = '1.7.0' -archivesBaseName = 'durabletask-azure-blob-payloads' - -def azureStorageBlobVersion = '12.30.0' -def azureCoreVersion = '1.57.1' -def azureIdentityVersion = '1.18.1' - -dependencies { - api project(':client') - implementation "com.azure:azure-storage-blob:${azureStorageBlobVersion}" - implementation "com.azure:azure-core:${azureCoreVersion}" - implementation "com.azure:azure-identity:${azureIdentityVersion}" - - testImplementation(platform('org.junit:junit-bom:5.14.2')) - testImplementation('org.junit.jupiter:junit-jupiter') - testRuntimeOnly('org.junit.platform:junit-platform-launcher') - testImplementation 'org.mockito:mockito-core:5.21.0' - testImplementation 'org.mockito:mockito-junit-jupiter:5.21.0' -} - -sourceCompatibility = JavaVersion.VERSION_1_8 -targetCompatibility = JavaVersion.VERSION_1_8 - -test { - useJUnitPlatform() -} - -publishing { - repositories { - maven { - url "file://$project.rootDir/repo" - } - } - publications { - mavenJava(MavenPublication) { - from components.java - artifactId = archivesBaseName - pom { - name = 'Durable Task Azure Blob Payloads for Java' - description = 'This package provides an Azure Blob Storage implementation of PayloadStore for externalizing large payloads in the Durable Task Java SDK.' - url = "https://github.com/microsoft/durabletask-java/tree/main/azure-blob-payloads" - licenses { - license { - name = "MIT License" - url = "https://opensource.org/licenses/MIT" - distribution = "repo" - } - } - developers { - developer { - id = "Microsoft" - name = "Microsoft Corporation" - } - } - scm { - connection = "scm:git:https://github.com/microsoft/durabletask-java" - developerConnection = "scm:git:git@github.com:microsoft/durabletask-java" - url = "https://github.com/microsoft/durabletask-java/tree/main/azure-blob-payloads" - } - } - } - } -} - -signing { - required { !project.hasProperty("skipSigning") && gradle.taskGraph.hasTask("publish") } - sign publishing.publications.mavenJava -} - -spotbugs { - ignoreFailures = false - effort = 'max' - reportLevel = 'medium' - excludeFilter = file('spotbugs-exclude.xml') -} diff --git a/azure-blob-payloads/spotbugs-exclude.xml b/azure-blob-payloads/spotbugs-exclude.xml deleted file mode 100644 index d3221dc9..00000000 --- a/azure-blob-payloads/spotbugs-exclude.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - - - - diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java deleted file mode 100644 index df8fb841..00000000 --- a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask.azureblobpayloads; - -import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; -import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; -import com.microsoft.durabletask.LargePayloadOptions; - -/** - * Extension methods for configuring Azure Blob Storage-based payload externalization - * on Durable Task builders. - *

- * Example: - *

{@code
- * BlobPayloadStoreOptions storeOptions = new BlobPayloadStoreOptions.Builder()
- *     .setConnectionString("DefaultEndpointsProtocol=https;...")
- *     .build();
- *
- * DurableTaskGrpcWorkerBuilder workerBuilder = new DurableTaskGrpcWorkerBuilder();
- * AzureBlobPayloadsExtensions.useBlobStoragePayloads(workerBuilder, storeOptions);
- *
- * DurableTaskGrpcClientBuilder clientBuilder = new DurableTaskGrpcClientBuilder();
- * AzureBlobPayloadsExtensions.useBlobStoragePayloads(clientBuilder, storeOptions);
- * }
- * - * @see BlobPayloadStore - * @see BlobPayloadStoreOptions - */ -public final class AzureBlobPayloadsExtensions { - - private AzureBlobPayloadsExtensions() { - } - - /** - * Configures the worker builder to use Azure Blob Storage for large payload externalization - * with default {@link LargePayloadOptions}. - * - * @param builder the worker builder to configure - * @param storeOptions the blob payload store configuration - */ - public static void useBlobStoragePayloads( - DurableTaskGrpcWorkerBuilder builder, - BlobPayloadStoreOptions storeOptions) { - if (builder == null) { - throw new IllegalArgumentException("builder must not be null"); - } - if (storeOptions == null) { - throw new IllegalArgumentException("storeOptions must not be null"); - } - builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions)); - } - - /** - * Configures the worker builder to use Azure Blob Storage for large payload externalization - * with custom {@link LargePayloadOptions}. - * - * @param builder the worker builder to configure - * @param storeOptions the blob payload store configuration - * @param payloadOptions the large payload threshold options - */ - public static void useBlobStoragePayloads( - DurableTaskGrpcWorkerBuilder builder, - BlobPayloadStoreOptions storeOptions, - LargePayloadOptions payloadOptions) { - if (builder == null) { - throw new IllegalArgumentException("builder must not be null"); - } - if (storeOptions == null) { - throw new IllegalArgumentException("storeOptions must not be null"); - } - builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions), payloadOptions); - } - - /** - * Configures the client builder to use Azure Blob Storage for large payload externalization - * with default {@link LargePayloadOptions}. - * - * @param builder the client builder to configure - * @param storeOptions the blob payload store configuration - */ - public static void useBlobStoragePayloads( - DurableTaskGrpcClientBuilder builder, - BlobPayloadStoreOptions storeOptions) { - if (builder == null) { - throw new IllegalArgumentException("builder must not be null"); - } - if (storeOptions == null) { - throw new IllegalArgumentException("storeOptions must not be null"); - } - builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions)); - } - - /** - * Configures the client builder to use Azure Blob Storage for large payload externalization - * with custom {@link LargePayloadOptions}. - * - * @param builder the client builder to configure - * @param storeOptions the blob payload store configuration - * @param payloadOptions the large payload threshold options - */ - public static void useBlobStoragePayloads( - DurableTaskGrpcClientBuilder builder, - BlobPayloadStoreOptions storeOptions, - LargePayloadOptions payloadOptions) { - if (builder == null) { - throw new IllegalArgumentException("builder must not be null"); - } - if (storeOptions == null) { - throw new IllegalArgumentException("storeOptions must not be null"); - } - builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions), payloadOptions); - } -} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java deleted file mode 100644 index c6716a47..00000000 --- a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask.azureblobpayloads; - -import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobDownloadContentResponse; -import com.azure.storage.blob.models.BlobHttpHeaders; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.options.BlobParallelUploadOptions; -import com.microsoft.durabletask.PayloadStore; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.UUID; -import java.util.logging.Logger; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -/** - * Azure Blob Storage implementation of {@link PayloadStore}. - *

- * This class uploads large payloads to Azure Blob Storage and returns tokens - * in the format {@code blob:v1::} that can be recognized - * and resolved by this store. - *

- * The store automatically creates the container if it does not exist. - * Optionally compresses payloads with gzip (enabled by default). - * - * @see BlobPayloadStoreOptions - * @see PayloadStore - */ -public final class BlobPayloadStore implements PayloadStore { - - private static final Logger logger = Logger.getLogger(BlobPayloadStore.class.getName()); - private static final String BLOB_EXTENSION = ".json"; - private static final String TOKEN_PREFIX = "blob:v1:"; - private static final String GZIP_CONTENT_ENCODING = "gzip"; - - private final BlobContainerClient containerClient; - private final String blobPrefix; - private final String containerName; - private final boolean compressPayloads; - private volatile boolean containerEnsured; - - /** - * Creates a new BlobPayloadStore with the given options. - * - * @param options the blob payload store configuration - */ - public BlobPayloadStore(BlobPayloadStoreOptions options) { - if (options == null) { - throw new IllegalArgumentException("options must not be null"); - } - - BlobServiceClient serviceClient; - if (options.getBlobServiceClient() != null) { - serviceClient = options.getBlobServiceClient(); - } else if (options.getConnectionString() != null) { - serviceClient = new BlobServiceClientBuilder() - .connectionString(options.getConnectionString()) - .buildClient(); - } else { - serviceClient = new BlobServiceClientBuilder() - .endpoint(options.getBlobServiceEndpoint()) - .credential(options.getCredential()) - .buildClient(); - } - - this.containerClient = serviceClient.getBlobContainerClient(options.getContainerName()); - this.blobPrefix = options.getBlobPrefix(); - this.containerName = options.getContainerName(); - this.compressPayloads = options.isCompressPayloads(); - } - - @Override - public String upload(String payload) { - if (payload == null) { - throw new IllegalArgumentException("payload must not be null"); - } - - ensureContainerExists(); - - String blobName = this.blobPrefix + UUID.randomUUID().toString().replace("-", "") + BLOB_EXTENSION; - BlobClient blobClient = this.containerClient.getBlobClient(blobName); - - byte[] rawData = payload.getBytes(StandardCharsets.UTF_8); - byte[] data; - - if (this.compressPayloads) { - data = gzipCompress(rawData); - BlobHttpHeaders headers = new BlobHttpHeaders().setContentEncoding(GZIP_CONTENT_ENCODING); - BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions( - new ByteArrayInputStream(data), data.length) - .setHeaders(headers); - blobClient.uploadWithResponse(uploadOptions, null, null); - } else { - data = rawData; - blobClient.upload(new ByteArrayInputStream(data), data.length, true); - } - - String token = TOKEN_PREFIX + this.containerName + ":" + blobName; - logger.fine(() -> String.format("Uploaded payload (%d bytes, compressed=%s) to %s", - rawData.length, this.compressPayloads, token)); - return token; - } - - @Override - public String download(String token) { - if (token == null || token.isEmpty()) { - throw new IllegalArgumentException("token must not be null or empty"); - } - - String blobName = extractBlobName(token); - BlobClient blobClient = this.containerClient.getBlobClient(blobName); - - BlobDownloadContentResponse response = blobClient.downloadContentWithResponse(null, null, null, null); - byte[] rawBytes = response.getValue().toBytes(); - String contentEncoding = response.getDeserializedHeaders().getContentEncoding(); - if (GZIP_CONTENT_ENCODING.equalsIgnoreCase(contentEncoding)) { - rawBytes = gzipDecompress(rawBytes); - } - - String payload = new String(rawBytes, StandardCharsets.UTF_8); - logger.fine(() -> String.format("Downloaded payload (%d bytes) from %s", payload.length(), token)); - return payload; - } - - @Override - public boolean isKnownPayloadToken(String value) { - if (value == null || value.isEmpty()) { - return false; - } - return value.startsWith(TOKEN_PREFIX); - } - - /** - * Ensures the blob container exists, creating it if necessary. - * The check-then-act on {@code containerEnsured} is intentionally non-atomic: - * concurrent callers may race through to {@code create()}, but the 409 Conflict - * handler makes this benign. - */ - private void ensureContainerExists() { - if (this.containerEnsured) { - return; - } - try { - if (!this.containerClient.exists()) { - this.containerClient.create(); - logger.info(() -> String.format("Created blob container: %s", this.containerClient.getBlobContainerName())); - } - this.containerEnsured = true; - } catch (BlobStorageException e) { - // Container might have been created concurrently (409 Conflict) - if (e.getStatusCode() != 409) { - throw e; - } - this.containerEnsured = true; - } - } - - /** - * Extracts the blob name from a {@code blob:v1::} token - * and validates that the container matches the configured container. - */ - private String extractBlobName(String token) { - if (!token.startsWith(TOKEN_PREFIX)) { - throw new IllegalArgumentException( - "Token does not have the expected format (blob:v1:...): " + token); - } - // Format: blob:v1:: - String remainder = token.substring(TOKEN_PREFIX.length()); - int colonIndex = remainder.indexOf(':'); - if (colonIndex < 0) { - throw new IllegalArgumentException( - "Token does not have the expected format (blob:v1::): " + token); - } - String tokenContainer = remainder.substring(0, colonIndex); - if (!this.containerName.equals(tokenContainer)) { - throw new IllegalArgumentException(String.format( - "Token container '%s' does not match configured container '%s'", - tokenContainer, this.containerName)); - } - return remainder.substring(colonIndex + 1); - } - - private static byte[] gzipCompress(byte[] data) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) { - gzipOut.write(data); - } - return baos.toByteArray(); - } catch (IOException e) { - throw new RuntimeException("Failed to gzip compress payload", e); - } - } - - /** - * Maximum decompressed size (20 MiB) to guard against decompression bombs. - * This is 2x the default max externalized payload size of 10 MiB. - */ - private static final int MAX_DECOMPRESSED_BYTES = 20 * 1024 * 1024; - - private static byte[] gzipDecompress(byte[] compressed) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed))) { - byte[] buffer = new byte[8192]; - int len; - int totalRead = 0; - while ((len = gzipIn.read(buffer)) != -1) { - totalRead += len; - if (totalRead > MAX_DECOMPRESSED_BYTES) { - throw new IOException( - "Decompressed payload exceeds safety limit of " + MAX_DECOMPRESSED_BYTES + " bytes"); - } - baos.write(buffer, 0, len); - } - } - return baos.toByteArray(); - } catch (IOException e) { - throw new RuntimeException("Failed to gzip decompress payload", e); - } - } -} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java deleted file mode 100644 index 5b5246d7..00000000 --- a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java +++ /dev/null @@ -1,276 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask.azureblobpayloads; - -import com.azure.core.credential.TokenCredential; -import com.azure.storage.blob.BlobServiceClient; - -/** - * Configuration options for {@link BlobPayloadStore}. - *

- * Use the {@link Builder} to construct instances. Either a connection string or a - * blob service endpoint with a credential must be provided. - *

- * Example using connection string: - *

{@code
- * BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
- *     .setConnectionString("DefaultEndpointsProtocol=https;...")
- *     .setContainerName("large-payloads")
- *     .build();
- * }
- *

- * Example using endpoint with managed identity: - *

{@code
- * BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
- *     .setBlobServiceEndpoint("https://myaccount.blob.core.windows.net")
- *     .setCredential(new DefaultAzureCredentialBuilder().build())
- *     .setContainerName("large-payloads")
- *     .build();
- * }
- * - * @see BlobPayloadStore - */ -public final class BlobPayloadStoreOptions { - - /** - * Default container name for storing externalized payloads. - */ - static final String DEFAULT_CONTAINER_NAME = "durabletask-payloads"; - - /** - * Default prefix for blob names. - */ - static final String DEFAULT_BLOB_PREFIX = "payloads/"; - - private final String connectionString; - private final String blobServiceEndpoint; - private final TokenCredential credential; - private final BlobServiceClient blobServiceClient; - private final String containerName; - private final String blobPrefix; - private final boolean compressPayloads; - - private BlobPayloadStoreOptions(Builder builder) { - this.connectionString = builder.connectionString; - this.blobServiceEndpoint = builder.blobServiceEndpoint; - this.credential = builder.credential; - this.blobServiceClient = builder.blobServiceClient; - this.containerName = builder.containerName; - this.blobPrefix = builder.blobPrefix; - this.compressPayloads = builder.compressPayloads; - } - - /** - * Gets the Azure Storage connection string, if configured. - * - * @return the connection string, or null if endpoint-based auth is used - */ - public String getConnectionString() { - return this.connectionString; - } - - /** - * Gets the Azure Blob Storage service endpoint, if configured. - * - * @return the blob service endpoint, or null if connection string auth is used - */ - public String getBlobServiceEndpoint() { - return this.blobServiceEndpoint; - } - - /** - * Gets the token credential for authenticating to Azure Blob Storage, if configured. - * - * @return the token credential, or null - */ - public TokenCredential getCredential() { - return this.credential; - } - - /** - * Gets a pre-configured BlobServiceClient, if provided. - * - * @return the blob service client, or null - */ - public BlobServiceClient getBlobServiceClient() { - return this.blobServiceClient; - } - - /** - * Gets the container name for storing externalized payloads. - * - * @return the container name - */ - public String getContainerName() { - return this.containerName; - } - - /** - * Gets the blob name prefix for externalized payloads. - * - * @return the blob prefix - */ - public String getBlobPrefix() { - return this.blobPrefix; - } - - /** - * Gets whether payloads should be compressed with gzip before uploading. - * - * @return true if compression is enabled, false otherwise - */ - public boolean isCompressPayloads() { - return this.compressPayloads; - } - - /** - * Builder for constructing {@link BlobPayloadStoreOptions} instances. - */ - public static final class Builder { - private String connectionString; - private String blobServiceEndpoint; - private TokenCredential credential; - private BlobServiceClient blobServiceClient; - private String containerName = DEFAULT_CONTAINER_NAME; - private String blobPrefix = DEFAULT_BLOB_PREFIX; - private boolean compressPayloads = true; - - /** - * Sets the Azure Storage connection string. Mutually exclusive with - * {@link #setBlobServiceEndpoint(String)} and {@link #setBlobServiceClient(BlobServiceClient)}. - * Setting this clears any previously set endpoint or pre-configured client. - * - * @param connectionString the connection string - * @return this builder - */ - public Builder setConnectionString(String connectionString) { - if (connectionString == null || connectionString.isEmpty()) { - throw new IllegalArgumentException("connectionString must not be null or empty"); - } - this.connectionString = connectionString; - this.blobServiceEndpoint = null; - this.credential = null; - this.blobServiceClient = null; - return this; - } - - /** - * Sets the Azure Blob Storage service endpoint. Use with {@link #setCredential(TokenCredential)}. - * Mutually exclusive with {@link #setConnectionString(String)} and {@link #setBlobServiceClient(BlobServiceClient)}. - * Setting this clears any previously set connection string or pre-configured client. - * - * @param blobServiceEndpoint the blob service endpoint URL - * @return this builder - */ - public Builder setBlobServiceEndpoint(String blobServiceEndpoint) { - if (blobServiceEndpoint == null || blobServiceEndpoint.isEmpty()) { - throw new IllegalArgumentException("blobServiceEndpoint must not be null or empty"); - } - this.blobServiceEndpoint = blobServiceEndpoint; - this.connectionString = null; - this.blobServiceClient = null; - return this; - } - - /** - * Sets the token credential for authenticating to Azure Blob Storage. - * Used with {@link #setBlobServiceEndpoint(String)}. - * - * @param credential the token credential - * @return this builder - */ - public Builder setCredential(TokenCredential credential) { - if (credential == null) { - throw new IllegalArgumentException("credential must not be null"); - } - this.credential = credential; - return this; - } - - /** - * Sets a pre-configured BlobServiceClient. Mutually exclusive with - * {@link #setConnectionString(String)} and {@link #setBlobServiceEndpoint(String)}. - * Setting this clears any previously set connection string or endpoint. - * - * @param blobServiceClient the pre-configured client - * @return this builder - */ - public Builder setBlobServiceClient(BlobServiceClient blobServiceClient) { - if (blobServiceClient == null) { - throw new IllegalArgumentException("blobServiceClient must not be null"); - } - this.blobServiceClient = blobServiceClient; - this.connectionString = null; - this.blobServiceEndpoint = null; - this.credential = null; - return this; - } - - /** - * Sets the container name. Defaults to {@value DEFAULT_CONTAINER_NAME}. - * - * @param containerName the container name - * @return this builder - */ - public Builder setContainerName(String containerName) { - if (containerName == null || containerName.isEmpty()) { - throw new IllegalArgumentException("containerName must not be null or empty"); - } - this.containerName = containerName; - return this; - } - - /** - * Sets the blob name prefix. Defaults to {@value DEFAULT_BLOB_PREFIX}. - * - * @param blobPrefix the blob name prefix - * @return this builder - */ - public Builder setBlobPrefix(String blobPrefix) { - if (blobPrefix == null) { - throw new IllegalArgumentException("blobPrefix must not be null"); - } - this.blobPrefix = blobPrefix; - return this; - } - - /** - * Sets whether payloads should be compressed with gzip before uploading. - * Defaults to {@code true}. - * - * @param compressPayloads true to enable gzip compression - * @return this builder - */ - public Builder setCompressPayloads(boolean compressPayloads) { - this.compressPayloads = compressPayloads; - return this; - } - - /** - * Builds a new {@link BlobPayloadStoreOptions} instance. - * - * @return the options instance - * @throws IllegalStateException if no authentication method is configured - */ - public BlobPayloadStoreOptions build() { - int authMethods = 0; - if (connectionString != null) authMethods++; - if (blobServiceEndpoint != null) authMethods++; - if (blobServiceClient != null) authMethods++; - - if (authMethods == 0) { - throw new IllegalStateException( - "One of connectionString, blobServiceEndpoint, or blobServiceClient must be set"); - } - if (authMethods > 1) { - throw new IllegalStateException( - "Only one of connectionString, blobServiceEndpoint, or blobServiceClient may be set"); - } - if (blobServiceEndpoint != null && credential == null) { - throw new IllegalStateException( - "credential must be set when using blobServiceEndpoint"); - } - return new BlobPayloadStoreOptions(this); - } - } -} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java deleted file mode 100644 index 92bd6588..00000000 --- a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask.azureblobpayloads; - -import com.microsoft.durabletask.PayloadStore; -import com.microsoft.durabletask.PayloadStoreProvider; - -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * {@link PayloadStoreProvider} implementation that creates a {@link BlobPayloadStore} - * when the {@code DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING} environment variable is set. - *

- * This provider is discovered automatically via {@link java.util.ServiceLoader}. - */ -public final class BlobPayloadStoreProvider implements PayloadStoreProvider { - - private static final Logger logger = Logger.getLogger(BlobPayloadStoreProvider.class.getName()); - private static final String ENV_STORAGE_CONNECTION_STRING = "DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING"; - - @Override - public PayloadStore create() { - String connectionString = System.getenv(ENV_STORAGE_CONNECTION_STRING); - if (connectionString == null || connectionString.isEmpty()) { - return null; - } - - try { - BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() - .setConnectionString(connectionString) - .build(); - logger.info("Large payload externalization enabled using Azure Blob Storage"); - return new BlobPayloadStore(options); - } catch (Exception e) { - logger.log(Level.WARNING, - "Failed to initialize BlobPayloadStore; large payloads will not be externalized", e); - return null; - } - } -} diff --git a/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider b/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider deleted file mode 100644 index 573fb7ff..00000000 --- a/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider +++ /dev/null @@ -1 +0,0 @@ -com.microsoft.durabletask.azureblobpayloads.BlobPayloadStoreProvider diff --git a/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java b/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java deleted file mode 100644 index dba0e1ca..00000000 --- a/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask.azureblobpayloads; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Unit tests for BlobPayloadStoreOptions. - */ -public class BlobPayloadStoreOptionsTest { - - @Test - void connectionString_setsCorrectly() { - BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() - .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") - .build(); - assertNotNull(options.getConnectionString()); - assertEquals("durabletask-payloads", options.getContainerName()); - assertEquals("payloads/", options.getBlobPrefix()); - assertTrue(options.isCompressPayloads()); - } - - @Test - void customContainerAndPrefix() { - BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() - .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") - .setContainerName("my-container") - .setBlobPrefix("my-prefix/") - .build(); - assertEquals("my-container", options.getContainerName()); - assertEquals("my-prefix/", options.getBlobPrefix()); - } - - @Test - void noAuthMethod_throws() { - assertThrows(IllegalStateException.class, - () -> new BlobPayloadStoreOptions.Builder().build()); - } - - @Test - void multipleAuthMethods_throws() { - assertThrows(IllegalStateException.class, - () -> new BlobPayloadStoreOptions.Builder() - .setConnectionString("conn-string") - .setBlobServiceEndpoint("https://test.blob.core.windows.net") - .build()); - } - - @Test - void endpointWithoutCredential_throws() { - assertThrows(IllegalStateException.class, - () -> new BlobPayloadStoreOptions.Builder() - .setBlobServiceEndpoint("https://test.blob.core.windows.net") - .build()); - } - - @Test - void nullConnectionString_throws() { - assertThrows(IllegalArgumentException.class, - () -> new BlobPayloadStoreOptions.Builder().setConnectionString(null)); - } - - @Test - void emptyContainerName_throws() { - assertThrows(IllegalArgumentException.class, - () -> new BlobPayloadStoreOptions.Builder().setContainerName("")); - } - - @Test - void nullBlobPrefix_throws() { - assertThrows(IllegalArgumentException.class, - () -> new BlobPayloadStoreOptions.Builder().setBlobPrefix(null)); - } - - @Test - void compressPayloads_defaultsToTrue() { - BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() - .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") - .build(); - assertTrue(options.isCompressPayloads()); - } - - @Test - void compressPayloads_canBeDisabled() { - BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() - .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") - .setCompressPayloads(false) - .build(); - assertFalse(options.isCompressPayloads()); - } -} diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java index f1ed2d68..02be55f4 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java @@ -12,16 +12,10 @@ import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.DataConverter; import com.microsoft.durabletask.OrchestrationRunner; -import com.microsoft.durabletask.PayloadStore; -import com.microsoft.durabletask.PayloadStoreProvider; import com.microsoft.durabletask.TaskFailedException; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.OrchestratorBlockedException; -import java.util.ServiceLoader; -import java.util.logging.Level; -import java.util.logging.Logger; - /** * Durable Function Orchestration Middleware * @@ -31,13 +25,6 @@ public class OrchestrationMiddleware implements Middleware { private static final String ORCHESTRATION_TRIGGER = "DurableOrchestrationTrigger"; - private static final Logger logger = Logger.getLogger(OrchestrationMiddleware.class.getName()); - - private final PayloadStore payloadStore; - - public OrchestrationMiddleware() { - this.payloadStore = initializePayloadStore(); - } @Override public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exception { @@ -83,24 +70,7 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce // requires update on OrchestratorFunction API. throw new RuntimeException("Unexpected failure in the task execution", e); } - }, this.payloadStore); + }); context.updateReturnValue(orchestratorOutputEncodedProtoBytes); } - - private static PayloadStore initializePayloadStore() { - ServiceLoader loader = ServiceLoader.load(PayloadStoreProvider.class); - for (PayloadStoreProvider provider : loader) { - try { - PayloadStore store = provider.create(); - if (store != null) { - return store; - } - } catch (Exception e) { - logger.log(Level.WARNING, - "PayloadStoreProvider " + provider.getClass().getName() + " failed to create store", e); - } - } - logger.fine("No PayloadStoreProvider found or configured; large payload externalization is disabled"); - return null; - } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index b3a86c95..14955120 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -32,14 +32,10 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceBlockingStub sidecarClient; private final String defaultVersion; - private final PayloadHelper payloadHelper; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.defaultVersion = builder.defaultVersion; - this.payloadHelper = builder.payloadStore != null - ? new PayloadHelper(builder.payloadStore, builder.largePayloadOptions) - : null; Channel sidecarGrpcChannel; if (builder.channel != null) { @@ -67,7 +63,6 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { DurableTaskGrpcClient(int port, String defaultVersion) { this.dataConverter = new JacksonDataConverter(); this.defaultVersion = defaultVersion; - this.payloadHelper = null; // Need to keep track of this channel so we can dispose it on close() this.managedSidecarChannel = ManagedChannelBuilder @@ -130,9 +125,6 @@ public String scheduleNewOrchestrationInstance( Object input = options.getInput(); if (input != null) { String serializedInput = this.dataConverter.serialize(input); - if (this.payloadHelper != null) { - serializedInput = this.payloadHelper.maybeExternalize(serializedInput); - } builder.setInput(StringValue.of(serializedInput)); } @@ -185,9 +177,6 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) .setName(eventName); if (eventPayload != null) { String serializedPayload = this.dataConverter.serialize(eventPayload); - if (this.payloadHelper != null) { - serializedPayload = this.payloadHelper.maybeExternalize(serializedPayload); - } builder.setInput(StringValue.of(serializedPayload)); } @@ -202,9 +191,6 @@ public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getI .setGetInputsAndOutputs(getInputsAndOutputs) .build(); GetInstanceResponse response = this.sidecarClient.getInstance(request); - if (this.payloadHelper != null) { - response = resolveGetInstanceResponsePayloads(response); - } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -232,9 +218,6 @@ public OrchestrationMetadata waitForInstanceStart(String instanceId, Duration ti } throw e; } - if (this.payloadHelper != null) { - response = resolveGetInstanceResponsePayloads(response); - } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -262,9 +245,6 @@ public OrchestrationMetadata waitForInstanceCompletion(String instanceId, Durati } throw e; } - if (this.payloadHelper != null) { - response = resolveGetInstanceResponsePayloads(response); - } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -278,11 +258,7 @@ public void terminate(String instanceId, @Nullable Object output) { serializeOutput != null ? serializeOutput : "(null)")); TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId); if (serializeOutput != null){ - String outputToSend = serializeOutput; - if (this.payloadHelper != null) { - outputToSend = this.payloadHelper.maybeExternalize(outputToSend); - } - builder.setOutput(StringValue.of(outputToSend)); + builder.setOutput(StringValue.of(serializeOutput)); } this.sidecarClient.terminateInstance(builder.build()); } @@ -305,11 +281,7 @@ public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery qu private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs){ List metadataList = new ArrayList<>(); queryInstancesResponse.getOrchestrationStateList().forEach(state -> { - OrchestrationState resolvedState = state; - if (this.payloadHelper != null) { - resolvedState = resolveOrchestrationStatePayloads(state); - } - metadataList.add(new OrchestrationMetadata(resolvedState, this.dataConverter, fetchInputsAndOutputs)); + metadataList.add(new OrchestrationMetadata(state, this.dataConverter, fetchInputsAndOutputs)); }); return new OrchestrationStatusQueryResult(metadataList, queryInstancesResponse.getContinuationToken().getValue()); } @@ -368,11 +340,7 @@ public void suspendInstance(String instanceId, @Nullable String reason) { SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); suspendRequestBuilder.setInstanceId(instanceId); if (reason != null) { - String reasonToSend = reason; - if (this.payloadHelper != null) { - reasonToSend = this.payloadHelper.maybeExternalize(reasonToSend); - } - suspendRequestBuilder.setReason(StringValue.of(reasonToSend)); + suspendRequestBuilder.setReason(StringValue.of(reason)); } this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); } @@ -382,11 +350,7 @@ public void resumeInstance(String instanceId, @Nullable String reason) { ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder(); resumeRequestBuilder.setInstanceId(instanceId); if (reason != null) { - String reasonToSend = reason; - if (this.payloadHelper != null) { - reasonToSend = this.payloadHelper.maybeExternalize(reasonToSend); - } - resumeRequestBuilder.setReason(StringValue.of(reasonToSend)); + resumeRequestBuilder.setReason(StringValue.of(reason)); } this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } @@ -435,53 +399,4 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } - - /** - * Resolves externalized payload URI tokens in a GetInstanceResponse. - */ - private GetInstanceResponse resolveGetInstanceResponsePayloads(GetInstanceResponse response) { - if (!response.getExists()) { - return response; - } - OrchestrationState state = response.getOrchestrationState(); - OrchestrationState resolvedState = resolveOrchestrationStatePayloads(state); - if (resolvedState == state) { - return response; - } - return response.toBuilder().setOrchestrationState(resolvedState).build(); - } - - /** - * Resolves externalized payload URI tokens in an OrchestrationState. - */ - private OrchestrationState resolveOrchestrationStatePayloads(OrchestrationState state) { - boolean changed = false; - OrchestrationState.Builder builder = state.toBuilder(); - - if (state.hasInput() && !state.getInput().getValue().isEmpty()) { - String resolved = this.payloadHelper.maybeResolve(state.getInput().getValue()); - if (!resolved.equals(state.getInput().getValue())) { - builder.setInput(StringValue.of(resolved)); - changed = true; - } - } - - if (state.hasOutput() && !state.getOutput().getValue().isEmpty()) { - String resolved = this.payloadHelper.maybeResolve(state.getOutput().getValue()); - if (!resolved.equals(state.getOutput().getValue())) { - builder.setOutput(StringValue.of(resolved)); - changed = true; - } - } - - if (state.hasCustomStatus() && !state.getCustomStatus().getValue().isEmpty()) { - String resolved = this.payloadHelper.maybeResolve(state.getCustomStatus().getValue()); - if (!resolved.equals(state.getCustomStatus().getValue())) { - builder.setCustomStatus(StringValue.of(resolved)); - changed = true; - } - } - - return changed ? builder.build() : state; - } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java index 97b49919..1a1cb6f2 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java @@ -13,8 +13,6 @@ public final class DurableTaskGrpcClientBuilder { int port; Channel channel; String defaultVersion; - PayloadStore payloadStore; - LargePayloadOptions largePayloadOptions; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -67,41 +65,6 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) { return this; } - /** - * Enables large payload externalization with default options. - *

- * When enabled, payloads exceeding the default threshold will be uploaded to the - * provided {@link PayloadStore} and replaced with opaque token references. - * - * @param payloadStore the store to use for externalizing large payloads - * @return this builder object - */ - public DurableTaskGrpcClientBuilder useExternalizedPayloads(PayloadStore payloadStore) { - return this.useExternalizedPayloads(payloadStore, new LargePayloadOptions.Builder().build()); - } - - /** - * Enables large payload externalization with custom options. - *

- * When enabled, payloads exceeding the configured threshold will be uploaded to the - * provided {@link PayloadStore} and replaced with opaque token references. - * - * @param payloadStore the store to use for externalizing large payloads - * @param options the large payload configuration options - * @return this builder object - */ - public DurableTaskGrpcClientBuilder useExternalizedPayloads(PayloadStore payloadStore, LargePayloadOptions options) { - if (payloadStore == null) { - throw new IllegalArgumentException("payloadStore must not be null"); - } - if (options == null) { - throw new IllegalArgumentException("options must not be null"); - } - this.payloadStore = payloadStore; - this.largePayloadOptions = options; - return this; - } - /** * Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object. * @return a new {@link DurableTaskClient} object diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 5462478d..1824f195 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -43,9 +43,6 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final WorkItemFilter workItemFilter; private final GetWorkItemsRequest getWorkItemsRequest; - private final PayloadHelper payloadHelper; - private final int chunkSizeBytes; - private final TaskHubSidecarServiceBlockingStub sidecarClient; DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder, WorkItemFilter workItemFilter) { @@ -78,10 +75,6 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.versioningOptions = builder.versioningOptions; this.workItemFilter = workItemFilter; this.getWorkItemsRequest = buildGetWorkItemsRequest(); - this.payloadHelper = builder.payloadStore != null - ? new PayloadHelper(builder.payloadStore, builder.largePayloadOptions) - : null; - this.chunkSizeBytes = builder.chunkSizeBytes; } /** @@ -152,11 +145,6 @@ public void startAndBlock() { if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); - // Resolve externalized payload URI tokens in history events - if (this.payloadHelper != null) { - orchestratorRequest = resolveOrchestratorRequestPayloads(orchestratorRequest); - } - // If versioning is set, process it first to see if the orchestration should be executed. boolean versioningFailed = false; if (versioningOptions != null && versioningOptions.getVersion() != null) { @@ -306,34 +294,7 @@ public void startAndBlock() { .setCompletionToken(workItem.getCompletionToken()) .build(); - // Externalize large payloads and send (with optional chunking). - // If externalization or chunking fails, report as orchestration failure. - try { - if (this.payloadHelper != null) { - response = externalizeOrchestratorResponsePayloads(response); - } - sendOrchestratorResponse(response); - } catch (PayloadTooLargeException e) { - logger.log(Level.WARNING, - "Failed to send orchestrator response for instance '" + - orchestratorRequest.getInstanceId() + "': " + e.getMessage(), e); - CompleteOrchestrationAction failAction = CompleteOrchestrationAction.newBuilder() - .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) - .setFailureDetails(TaskFailureDetails.newBuilder() - .setErrorType(e.getClass().getName()) - .setErrorMessage(e.getMessage()) - .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) - .build()) - .build(); - OrchestratorResponse failResponse = OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .setCompletionToken(workItem.getCompletionToken()) - .addActions(OrchestratorAction.newBuilder() - .setCompleteOrchestration(failAction) - .build()) - .build(); - this.sidecarClient.completeOrchestratorTask(failResponse); - } + this.sidecarClient.completeOrchestratorTask(response); } else { switch(versioningOptions.getFailureStrategy()) { case FAIL: @@ -355,7 +316,7 @@ public void startAndBlock() { .addActions(action) .build(); - sendOrchestratorResponse(response); + this.sidecarClient.completeOrchestratorTask(response); break; // Reject and default share the same behavior as it does not change the orchestration to a terminal state. case REJECT: @@ -367,12 +328,6 @@ public void startAndBlock() { } } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); - - // Resolve externalized payload URI token in activity input - if (this.payloadHelper != null) { - activityRequest = resolveActivityRequestPayloads(activityRequest); - } - String activityInstanceId = activityRequest.getOrchestrationInstance().getInstanceId(); // Start a tracing span for this activity execution @@ -417,10 +372,6 @@ public void startAndBlock() { .setCompletionToken(workItem.getCompletionToken()); if (output != null) { - // Externalize activity output if it exceeds threshold - if (this.payloadHelper != null) { - output = this.payloadHelper.maybeExternalize(output); - } responseBuilder.setResult(StringValue.of(output)); } @@ -475,9 +426,6 @@ private GetWorkItemsRequest buildGetWorkItemsRequest() { if (this.workItemFilter != null) { builder.setWorkItemFilters(toProtoWorkItemFilters(this.workItemFilter)); } - if (this.payloadHelper != null) { - builder.addCapabilities(WorkerCapability.WORKER_CAPABILITY_LARGE_PAYLOADS); - } return builder.build(); } @@ -499,101 +447,4 @@ static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) { } return builder.build(); } - - /** - * Sends an orchestrator response, chunking it if it exceeds the configured chunk size. - */ - private void sendOrchestratorResponse(OrchestratorResponse response) { - int serializedSize = response.getSerializedSize(); - if (serializedSize <= this.chunkSizeBytes) { - this.sidecarClient.completeOrchestratorTask(response); - return; - } - - List allActions = response.getActionsList(); - if (allActions.isEmpty()) { - // No actions to chunk and the serialized response already exceeds the configured - // chunk size. Sending this response as-is would likely exceed the gRPC message - // size limit and fail at runtime, so fail fast with a clear error message. - throw new PayloadTooLargeException( - "OrchestratorResponse without actions exceeds the configured chunk size (" + - this.chunkSizeBytes + " bytes). Enable large-payload externalization to Azure " + - "Blob Storage or reduce the size of non-action fields."); - } - - // Compute envelope overhead (response without actions, but with chunk metadata fields). - // Include isPartial and chunkIndex since those are added to every chunk. - OrchestratorResponse envelope = response.toBuilder() - .clearActions() - .setIsPartial(true) - .setChunkIndex(com.google.protobuf.Int32Value.of(0)) - .build(); - int envelopeSize = envelope.getSerializedSize(); - int maxActionsSize = this.chunkSizeBytes - envelopeSize; - - if (maxActionsSize <= 0) { - throw new PayloadTooLargeException( - "OrchestratorResponse envelope exceeds gRPC message size limit. Cannot chunk."); - } - - // Build chunks - List> chunks = new ArrayList<>(); - List currentChunk = new ArrayList<>(); - int currentChunkSize = 0; - - for (OrchestratorAction action : allActions) { - int actionSize = action.getSerializedSize(); - // Account for protobuf framing overhead per repeated field entry: - // field tag (1 byte) + length varint - int framingOverhead = 1 + com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(actionSize); - int actionWireSize = actionSize + framingOverhead; - if (actionWireSize > maxActionsSize) { - throw new PayloadTooLargeException( - "A single orchestrator action exceeds the gRPC message size limit (" + - actionWireSize + " bytes). Enable large-payload externalization to Azure Blob Storage " + - "to handle payloads of this size."); - } - - if (currentChunkSize + actionWireSize > maxActionsSize && !currentChunk.isEmpty()) { - chunks.add(currentChunk); - currentChunk = new ArrayList<>(); - currentChunkSize = 0; - } - currentChunk.add(action); - currentChunkSize += actionWireSize; - } - if (!currentChunk.isEmpty()) { - chunks.add(currentChunk); - } - - // Send chunks - for (int i = 0; i < chunks.size(); i++) { - boolean isLast = (i == chunks.size() - 1); - OrchestratorResponse.Builder chunkBuilder = response.toBuilder() - .clearActions() - .addAllActions(chunks.get(i)) - .setIsPartial(!isLast) - .setChunkIndex(com.google.protobuf.Int32Value.of(i)); - - if (i > 0) { - // Only the first chunk carries numEventsProcessed; subsequent chunks - // leave it unset (matching .NET behavior) - chunkBuilder.clearNumEventsProcessed(); - } - - this.sidecarClient.completeOrchestratorTask(chunkBuilder.build()); - } - } - - private OrchestratorRequest resolveOrchestratorRequestPayloads(OrchestratorRequest request) { - return PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, this.payloadHelper); - } - - private ActivityRequest resolveActivityRequestPayloads(ActivityRequest request) { - return PayloadInterceptionHelper.resolveActivityRequestPayloads(request, this.payloadHelper); - } - - private OrchestratorResponse externalizeOrchestratorResponsePayloads(OrchestratorResponse response) { - return PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, this.payloadHelper); - } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index 42256eab..19d5d059 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -14,25 +14,6 @@ * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. */ public final class DurableTaskGrpcWorkerBuilder { - - /** - * Minimum allowed chunk size for orchestrator response messages (1 MiB). - */ - static final int MIN_CHUNK_SIZE_BYTES = 1_048_576; - - /** - * Maximum allowed chunk size for orchestrator response messages (~3.9 MiB). - * This is the largest payload that can fit within a 4 MiB gRPC message after accounting - * for protobuf overhead. - */ - static final int MAX_CHUNK_SIZE_BYTES = 4_089_446; - - /** - * Default chunk size for orchestrator response messages. - * Matches {@link #MAX_CHUNK_SIZE_BYTES}. - */ - static final int DEFAULT_CHUNK_SIZE_BYTES = MAX_CHUNK_SIZE_BYTES; - final HashMap orchestrationFactories = new HashMap<>(); final HashMap activityFactories = new HashMap<>(); int port; @@ -42,9 +23,6 @@ public final class DurableTaskGrpcWorkerBuilder { DurableTaskGrpcWorkerVersioningOptions versioningOptions; private WorkItemFilter workItemFilter; private boolean autoGenerateWorkItemFilters; - PayloadStore payloadStore; - LargePayloadOptions largePayloadOptions; - int chunkSizeBytes = DEFAULT_CHUNK_SIZE_BYTES; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -170,43 +148,6 @@ public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFi return this; } - /** - * Enables large payload externalization with default options. - *

- * When enabled, payloads exceeding the default threshold will be uploaded to the - * provided {@link PayloadStore} and replaced with opaque token references. The worker - * will also announce {@code WORKER_CAPABILITY_LARGE_PAYLOADS} to the sidecar. - * - * @param payloadStore the store to use for externalizing large payloads - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder useExternalizedPayloads(PayloadStore payloadStore) { - return this.useExternalizedPayloads(payloadStore, new LargePayloadOptions.Builder().build()); - } - - /** - * Enables large payload externalization with custom options. - *

- * When enabled, payloads exceeding the configured threshold will be uploaded to the - * provided {@link PayloadStore} and replaced with opaque token references. The worker - * will also announce {@code WORKER_CAPABILITY_LARGE_PAYLOADS} to the sidecar. - * - * @param payloadStore the store to use for externalizing large payloads - * @param options the large payload configuration options - * @return this builder object - */ - public DurableTaskGrpcWorkerBuilder useExternalizedPayloads(PayloadStore payloadStore, LargePayloadOptions options) { - if (payloadStore == null) { - throw new IllegalArgumentException("payloadStore must not be null"); - } - if (options == null) { - throw new IllegalArgumentException("options must not be null"); - } - this.payloadStore = payloadStore; - this.largePayloadOptions = options; - return this; - } - /** * Enables automatic work item filtering by generating filters from the registered * orchestrations and activities. When enabled, the backend will only dispatch work items @@ -228,36 +169,6 @@ public DurableTaskGrpcWorkerBuilder useWorkItemFilters() { return this; } - /** - * Sets the maximum size in bytes for a single orchestrator response chunk sent over gRPC. - * Responses larger than this will be automatically split into multiple chunks. - *

- * The value must be between {@value #MIN_CHUNK_SIZE_BYTES} and {@value #MAX_CHUNK_SIZE_BYTES} bytes. - * Defaults to {@value #DEFAULT_CHUNK_SIZE_BYTES} bytes. - * - * @param chunkSizeBytes the maximum chunk size in bytes - * @return this builder object - * @throws IllegalArgumentException if the value is outside the allowed range - */ - public DurableTaskGrpcWorkerBuilder setCompleteOrchestratorResponseChunkSizeBytes(int chunkSizeBytes) { - if (chunkSizeBytes < MIN_CHUNK_SIZE_BYTES || chunkSizeBytes > MAX_CHUNK_SIZE_BYTES) { - throw new IllegalArgumentException(String.format( - "chunkSizeBytes must be between %d and %d, but was %d", - MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES, chunkSizeBytes)); - } - this.chunkSizeBytes = chunkSizeBytes; - return this; - } - - /** - * Gets the current chunk size setting. - * - * @return the chunk size in bytes - */ - int getChunkSizeBytes() { - return this.chunkSizeBytes; - } - /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object diff --git a/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java b/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java deleted file mode 100644 index 8bec1145..00000000 --- a/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -/** - * Configuration options for large payload externalization. - *

- * This class defines the size thresholds that control when payloads are externalized - * to a {@link PayloadStore}. It is a pure configuration class and does not hold a - * reference to any {@link PayloadStore} implementation. - *

- * Use the {@link Builder} to create instances: - *

{@code
- * LargePayloadOptions options = new LargePayloadOptions.Builder()
- *     .setThresholdBytes(900_000)
- *     .setMaxExternalizedPayloadBytes(10 * 1024 * 1024)
- *     .build();
- * }
- * - * @see PayloadStore - */ -public final class LargePayloadOptions { - - /** - * Default externalization threshold in bytes (900,000 bytes, matching .NET SDK). - */ - public static final int DEFAULT_THRESHOLD_BYTES = 900_000; - - /** - * Default maximum externalized payload size in bytes (10 MiB, matching .NET SDK). - */ - public static final int DEFAULT_MAX_EXTERNALIZED_PAYLOAD_BYTES = 10 * 1024 * 1024; - - private final int thresholdBytes; - private final int maxExternalizedPayloadBytes; - - private LargePayloadOptions(Builder builder) { - this.thresholdBytes = builder.thresholdBytes; - this.maxExternalizedPayloadBytes = builder.maxExternalizedPayloadBytes; - } - - /** - * Gets the size threshold in bytes at or above which payloads will be externalized. - * Payloads below this size are sent inline. The comparison uses UTF-8 byte length. - * - * @return the externalization threshold in bytes - */ - public int getThresholdBytes() { - return this.thresholdBytes; - } - - /** - * Gets the maximum payload size in bytes that can be externalized. - * Payloads exceeding this size will cause an error to be thrown. - * - * @return the maximum externalized payload size in bytes - */ - public int getMaxExternalizedPayloadBytes() { - return this.maxExternalizedPayloadBytes; - } - - /** - * Builder for constructing {@link LargePayloadOptions} instances. - */ - public static final class Builder { - private int thresholdBytes = DEFAULT_THRESHOLD_BYTES; - private int maxExternalizedPayloadBytes = DEFAULT_MAX_EXTERNALIZED_PAYLOAD_BYTES; - - /** - * Sets the size threshold in bytes above which payloads will be externalized. - * Must not exceed 1 MiB (1,048,576 bytes). - * - * @param thresholdBytes the externalization threshold in bytes - * @return this builder - * @throws IllegalArgumentException if thresholdBytes is negative or exceeds 1 MiB - */ - public Builder setThresholdBytes(int thresholdBytes) { - if (thresholdBytes < 0) { - throw new IllegalArgumentException("thresholdBytes must not be negative"); - } - if (thresholdBytes > 1_048_576) { - throw new IllegalArgumentException("thresholdBytes must not exceed 1 MiB (1,048,576 bytes)"); - } - this.thresholdBytes = thresholdBytes; - return this; - } - - /** - * Sets the maximum payload size in bytes that can be externalized. - * Payloads exceeding this size will cause an error. - * - * @param maxExternalizedPayloadBytes the maximum externalized payload size in bytes - * @return this builder - * @throws IllegalArgumentException if maxExternalizedPayloadBytes is not positive - */ - public Builder setMaxExternalizedPayloadBytes(int maxExternalizedPayloadBytes) { - if (maxExternalizedPayloadBytes <= 0) { - throw new IllegalArgumentException("maxExternalizedPayloadBytes must be positive"); - } - this.maxExternalizedPayloadBytes = maxExternalizedPayloadBytes; - return this; - } - - /** - * Builds a new {@link LargePayloadOptions} instance from the current builder settings. - * - * @return a new {@link LargePayloadOptions} instance - * @throws IllegalStateException if thresholdBytes is not less than maxExternalizedPayloadBytes - */ - public LargePayloadOptions build() { - if (this.thresholdBytes >= this.maxExternalizedPayloadBytes) { - throw new IllegalStateException( - "thresholdBytes (" + this.thresholdBytes + - ") must be less than maxExternalizedPayloadBytes (" + this.maxExternalizedPayloadBytes + ")"); - } - return new LargePayloadOptions(this); - } - } -} diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java index 36bf4ec8..e5819b23 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java @@ -44,44 +44,9 @@ private OrchestrationRunner() { public static String loadAndRun( String base64EncodedOrchestratorRequest, OrchestratorFunction orchestratorFunc) { - return loadAndRun(base64EncodedOrchestratorRequest, orchestratorFunc, null, null); - } - - /** - * Loads orchestration history from {@code base64EncodedOrchestratorRequest} and uses it to execute the - * orchestrator function with large payload externalization support. - * - * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload - * @param orchestratorFunc a function that implements the orchestrator logic - * @param store the payload store for externalizing/resolving large payloads - * @param the type of the orchestrator function output - * @return a base64-encoded protobuf payload of orchestrator actions - */ - public static String loadAndRun( - String base64EncodedOrchestratorRequest, - OrchestratorFunction orchestratorFunc, - PayloadStore store) { - return loadAndRun(base64EncodedOrchestratorRequest, orchestratorFunc, store, new LargePayloadOptions.Builder().build()); - } - - /** - * Loads orchestration history from {@code base64EncodedOrchestratorRequest} and uses it to execute the - * orchestrator function with large payload externalization support and custom options. - * - * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload - * @param orchestratorFunc a function that implements the orchestrator logic - * @param store the payload store for externalizing/resolving large payloads - * @param options the large payload configuration options - * @param the type of the orchestrator function output - * @return a base64-encoded protobuf payload of orchestrator actions - */ - public static String loadAndRun( - String base64EncodedOrchestratorRequest, - OrchestratorFunction orchestratorFunc, - PayloadStore store, - LargePayloadOptions options) { + // Example string: CiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YSIaCP///////////wESCwi254qRBhDk+rgocgAicgj///////////8BEgwIs+eKkQYQzMXjnQMaVwoLSGVsbG9DaXRpZXMSACJGCiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YRIiCiA3ODEwOTA2N2Q4Y2Q0ODg1YWU4NjQ0OTNlMmRlMGQ3OA== byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest); - byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc, store, options); + byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc); return Base64.getEncoder().encodeToString(resultBytes); } @@ -98,42 +63,6 @@ public static String loadAndRun( public static byte[] loadAndRun( byte[] orchestratorRequestBytes, OrchestratorFunction orchestratorFunc) { - return loadAndRun(orchestratorRequestBytes, orchestratorFunc, null, null); - } - - /** - * Loads orchestration history from {@code orchestratorRequestBytes} and uses it to execute the - * orchestrator function with large payload externalization support. - * - * @param orchestratorRequestBytes the protobuf payload - * @param orchestratorFunc a function that implements the orchestrator logic - * @param store the payload store for externalizing/resolving large payloads - * @param the type of the orchestrator function output - * @return a protobuf-encoded payload of orchestrator actions - */ - public static byte[] loadAndRun( - byte[] orchestratorRequestBytes, - OrchestratorFunction orchestratorFunc, - PayloadStore store) { - return loadAndRun(orchestratorRequestBytes, orchestratorFunc, store, new LargePayloadOptions.Builder().build()); - } - - /** - * Loads orchestration history from {@code orchestratorRequestBytes} and uses it to execute the - * orchestrator function with large payload externalization support and custom options. - * - * @param orchestratorRequestBytes the protobuf payload - * @param orchestratorFunc a function that implements the orchestrator logic - * @param store the payload store for externalizing/resolving large payloads - * @param options the large payload configuration options - * @param the type of the orchestrator function output - * @return a protobuf-encoded payload of orchestrator actions - */ - public static byte[] loadAndRun( - byte[] orchestratorRequestBytes, - OrchestratorFunction orchestratorFunc, - PayloadStore store, - LargePayloadOptions options) { if (orchestratorFunc == null) { throw new IllegalArgumentException("orchestratorFunc must not be null"); } @@ -144,7 +73,7 @@ public static byte[] loadAndRun( ctx.complete(output); }; - return loadAndRun(orchestratorRequestBytes, orchestration, store, options); + return loadAndRun(orchestratorRequestBytes, orchestration); } /** @@ -159,41 +88,8 @@ public static byte[] loadAndRun( public static String loadAndRun( String base64EncodedOrchestratorRequest, TaskOrchestration orchestration) { - return loadAndRun(base64EncodedOrchestratorRequest, orchestration, null, null); - } - - /** - * Loads orchestration history and executes the orchestration with large payload externalization support. - * - * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload - * @param orchestration the orchestration to execute - * @param store the payload store for externalizing/resolving large payloads - * @return a base64-encoded protobuf payload of orchestrator actions - */ - public static String loadAndRun( - String base64EncodedOrchestratorRequest, - TaskOrchestration orchestration, - PayloadStore store) { - return loadAndRun(base64EncodedOrchestratorRequest, orchestration, store, new LargePayloadOptions.Builder().build()); - } - - /** - * Loads orchestration history and executes the orchestration with large payload externalization - * support and custom options. - * - * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload - * @param orchestration the orchestration to execute - * @param store the payload store for externalizing/resolving large payloads - * @param options the large payload configuration options - * @return a base64-encoded protobuf payload of orchestrator actions - */ - public static String loadAndRun( - String base64EncodedOrchestratorRequest, - TaskOrchestration orchestration, - PayloadStore store, - LargePayloadOptions options) { byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest); - byte[] resultBytes = loadAndRun(decodedBytes, orchestration, store, options); + byte[] resultBytes = loadAndRun(decodedBytes, orchestration); return Base64.getEncoder().encodeToString(resultBytes); } @@ -207,33 +103,6 @@ public static String loadAndRun( * @throws IllegalArgumentException if either parameter is {@code null} or if {@code orchestratorRequestBytes} is not valid protobuf */ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration) { - return loadAndRun(orchestratorRequestBytes, orchestration, null, null); - } - - /** - * Loads orchestration history and executes the orchestration with large payload externalization support. - * - * @param orchestratorRequestBytes the protobuf payload - * @param orchestration the orchestration to execute - * @param store the payload store for externalizing/resolving large payloads - * @return a protobuf-encoded payload of orchestrator actions - */ - public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration, PayloadStore store) { - return loadAndRun(orchestratorRequestBytes, orchestration, store, new LargePayloadOptions.Builder().build()); - } - - /** - * Loads orchestration history and executes the orchestration with large payload externalization - * support and custom options. - * - * @param orchestratorRequestBytes the protobuf payload - * @param orchestration the orchestration to execute - * @param store the payload store for externalizing/resolving large payloads - * @param options the large payload configuration options - * @return a protobuf-encoded payload of orchestrator actions - */ - public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration, - PayloadStore store, LargePayloadOptions options) { if (orchestratorRequestBytes == null || orchestratorRequestBytes.length == 0) { throw new IllegalArgumentException("triggerStateProtoBytes must not be null or empty"); } @@ -249,15 +118,6 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati throw new IllegalArgumentException("triggerStateProtoBytes was not valid protobuf", e); } - // Resolve externalized payload URI tokens in incoming request - PayloadHelper payloadHelper = null; - if (store != null) { - LargePayloadOptions resolvedOptions = options != null ? options : new LargePayloadOptions.Builder().build(); - payloadHelper = new PayloadHelper(store, resolvedOptions); - orchestratorRequest = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads( - orchestratorRequest, payloadHelper); - } - // Register the passed orchestration as the default ("*") orchestration HashMap orchestrationFactories = new HashMap<>(); orchestrationFactories.put("*", new TaskOrchestrationFactory() { @@ -336,13 +196,6 @@ public TaskOrchestration create() { .addAllActions(taskOrchestratorResult.getActions()) .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) .build(); - - // Externalize large payloads in outgoing response - if (payloadHelper != null) { - response = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads( - response, payloadHelper); - } - return response.toByteArray(); } } diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java b/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java deleted file mode 100644 index adae0067..00000000 --- a/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -/** - * Internal utility for externalizing and resolving payloads using a {@link PayloadStore}. - *

- * This class encapsulates the threshold-checking and store-delegation logic that is shared - * across the worker, client, and orchestration runner code paths. - */ -final class PayloadHelper { - - private final PayloadStore store; - private final LargePayloadOptions options; - - /** - * Creates a new PayloadHelper. - * - * @param store the payload store to use for upload/download operations - * @param options the large payload configuration options - */ - PayloadHelper(PayloadStore store, LargePayloadOptions options) { - if (store == null) { - throw new IllegalArgumentException("store must not be null"); - } - if (options == null) { - throw new IllegalArgumentException("options must not be null"); - } - this.store = store; - this.options = options; - } - - /** - * Externalizes the given value if it exceeds the configured threshold. - *

- * The check order matches .NET: (1) null/empty guard, (2) below-threshold guard, - * (3) above-max-cap rejection, (4) upload. - * - * @param value the payload string to potentially externalize - * @return the original value if below threshold, or an opaque token if externalized - * @throws PayloadTooLargeException if the payload exceeds the maximum externalized payload size - */ - String maybeExternalize(String value) { - // (1) null/empty guard - if (value == null || value.isEmpty()) { - return value; - } - - // Fast path: each Java char contributes at least 1 UTF-8 byte, - // so length() is always <= UTF-8 byte length. - if (value.length() < this.options.getThresholdBytes()) { - return value; - } - - int byteSize = utf8ByteLength(value); - - // (2) below-threshold guard (strict less-than, matching .NET) - if (byteSize < this.options.getThresholdBytes()) { - return value; - } - - // (3) above-max-cap rejection - if (byteSize > this.options.getMaxExternalizedPayloadBytes()) { - throw new PayloadTooLargeException(String.format( - "Payload size %d KB exceeds maximum of %d KB. " + - "Reduce the payload size or increase maxExternalizedPayloadBytes.", - byteSize / 1024, - this.options.getMaxExternalizedPayloadBytes() / 1024)); - } - - // (4) upload - return this.store.upload(value); - } - - /** - * Resolves the given value if it is a known payload token. - * - * @param value the string to potentially resolve - * @return the resolved payload data if the value was a token, or the original value otherwise - */ - String maybeResolve(String value) { - if (value == null || value.isEmpty()) { - return value; - } - - if (!this.store.isKnownPayloadToken(value)) { - return value; - } - - String resolved = this.store.download(value); - if (resolved == null) { - throw new IllegalStateException( - "PayloadStore.download() returned null for token: " + value); - } - return resolved; - } - - /** - * Counts the number of UTF-8 bytes needed to encode the given string, - * without allocating a byte array (unlike {@code String.getBytes(UTF_8).length}). - */ - private static int utf8ByteLength(String s) { - int count = 0; - for (int i = 0; i < s.length(); i++) { - char c = s.charAt(i); - if (c <= 0x7F) { - count++; - } else if (c <= 0x7FF) { - count += 2; - } else if (Character.isHighSurrogate(c)) { - count += 4; - i++; // skip low surrogate - } else { - count += 3; - } - } - return count; - } -} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java b/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java deleted file mode 100644 index 591dba51..00000000 --- a/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java +++ /dev/null @@ -1,326 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -import com.google.protobuf.StringValue; -import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; - -import java.util.ArrayList; -import java.util.List; - -/** - * Internal helper for intercepting protobuf messages to resolve/externalize payload fields. - * Shared between {@link DurableTaskGrpcWorker} and {@link OrchestrationRunner}. - */ -final class PayloadInterceptionHelper { - - private PayloadInterceptionHelper() { - } - - /** - * Resolves externalized payload URI tokens in an OrchestratorRequest's history events. - */ - static OrchestratorRequest resolveOrchestratorRequestPayloads( - OrchestratorRequest request, PayloadHelper payloadHelper) { - List resolvedPastEvents = resolveHistoryEvents(request.getPastEventsList(), payloadHelper); - List resolvedNewEvents = resolveHistoryEvents(request.getNewEventsList(), payloadHelper); - - boolean pastChanged = resolvedPastEvents != request.getPastEventsList(); - boolean newChanged = resolvedNewEvents != request.getNewEventsList(); - - if (!pastChanged && !newChanged) { - return request; - } - - OrchestratorRequest.Builder builder = request.toBuilder(); - if (pastChanged) { - builder.clearPastEvents().addAllPastEvents(resolvedPastEvents); - } - if (newChanged) { - builder.clearNewEvents().addAllNewEvents(resolvedNewEvents); - } - return builder.build(); - } - - /** - * Resolves externalized payload URI token in an ActivityRequest's input. - */ - static ActivityRequest resolveActivityRequestPayloads( - ActivityRequest request, PayloadHelper payloadHelper) { - if (!request.hasInput() || request.getInput().getValue().isEmpty()) { - return request; - } - String resolved = payloadHelper.maybeResolve(request.getInput().getValue()); - if (resolved.equals(request.getInput().getValue())) { - return request; - } - return request.toBuilder().setInput(StringValue.of(resolved)).build(); - } - - /** - * Externalizes large payloads in an OrchestratorResponse's actions and custom status. - */ - static OrchestratorResponse externalizeOrchestratorResponsePayloads( - OrchestratorResponse response, PayloadHelper payloadHelper) { - boolean changed = false; - OrchestratorResponse.Builder responseBuilder = response.toBuilder(); - - // Externalize customStatus - if (response.hasCustomStatus() && !response.getCustomStatus().getValue().isEmpty()) { - String externalized = payloadHelper.maybeExternalize(response.getCustomStatus().getValue()); - if (!externalized.equals(response.getCustomStatus().getValue())) { - responseBuilder.setCustomStatus(StringValue.of(externalized)); - changed = true; - } - } - - // Externalize action payloads - List actions = response.getActionsList(); - List newActions = null; - for (int i = 0; i < actions.size(); i++) { - OrchestratorAction action = actions.get(i); - OrchestratorAction externalizedAction = externalizeAction(action, payloadHelper); - if (externalizedAction != action) { - if (newActions == null) { - newActions = new ArrayList<>(actions); - } - newActions.set(i, externalizedAction); - } - } - - if (newActions != null) { - responseBuilder.clearActions().addAllActions(newActions); - changed = true; - } - - return changed ? responseBuilder.build() : response; - } - - // --- Private helpers --- - - private static List resolveHistoryEvents( - List events, PayloadHelper payloadHelper) { - List resolved = null; - for (int i = 0; i < events.size(); i++) { - HistoryEvent event = events.get(i); - HistoryEvent resolvedEvent = resolveHistoryEvent(event, payloadHelper); - if (resolvedEvent != event) { - if (resolved == null) { - resolved = new ArrayList<>(events); - } - resolved.set(i, resolvedEvent); - } - } - return resolved != null ? resolved : events; - } - - private static HistoryEvent resolveHistoryEvent(HistoryEvent event, PayloadHelper payloadHelper) { - switch (event.getEventTypeCase()) { - case EXECUTIONSTARTED: - return resolveStringValueField(event, event.getExecutionStarted().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionStarted( - e.getExecutionStarted().toBuilder().setInput(v)).build()); - case EXECUTIONCOMPLETED: - return resolveStringValueField(event, event.getExecutionCompleted().getResult(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionCompleted( - e.getExecutionCompleted().toBuilder().setResult(v)).build()); - case TASKCOMPLETED: - return resolveStringValueField(event, event.getTaskCompleted().getResult(), payloadHelper, - (e, v) -> e.toBuilder().setTaskCompleted( - e.getTaskCompleted().toBuilder().setResult(v)).build()); - case TASKSCHEDULED: - return resolveStringValueField(event, event.getTaskScheduled().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setTaskScheduled( - e.getTaskScheduled().toBuilder().setInput(v)).build()); - case SUBORCHESTRATIONINSTANCECREATED: - return resolveStringValueField(event, event.getSubOrchestrationInstanceCreated().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setSubOrchestrationInstanceCreated( - e.getSubOrchestrationInstanceCreated().toBuilder().setInput(v)).build()); - case SUBORCHESTRATIONINSTANCECOMPLETED: - return resolveStringValueField(event, event.getSubOrchestrationInstanceCompleted().getResult(), payloadHelper, - (e, v) -> e.toBuilder().setSubOrchestrationInstanceCompleted( - e.getSubOrchestrationInstanceCompleted().toBuilder().setResult(v)).build()); - case EVENTRAISED: - return resolveStringValueField(event, event.getEventRaised().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setEventRaised( - e.getEventRaised().toBuilder().setInput(v)).build()); - case EVENTSENT: - return resolveStringValueField(event, event.getEventSent().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setEventSent( - e.getEventSent().toBuilder().setInput(v)).build()); - case GENERICEVENT: - return resolveStringValueField(event, event.getGenericEvent().getData(), payloadHelper, - (e, v) -> e.toBuilder().setGenericEvent( - e.getGenericEvent().toBuilder().setData(v)).build()); - case CONTINUEASNEW: - return resolveStringValueField(event, event.getContinueAsNew().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setContinueAsNew( - e.getContinueAsNew().toBuilder().setInput(v)).build()); - case EXECUTIONTERMINATED: - return resolveStringValueField(event, event.getExecutionTerminated().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionTerminated( - e.getExecutionTerminated().toBuilder().setInput(v)).build()); - case EXECUTIONSUSPENDED: - return resolveStringValueField(event, event.getExecutionSuspended().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionSuspended( - e.getExecutionSuspended().toBuilder().setInput(v)).build()); - case EXECUTIONRESUMED: - return resolveStringValueField(event, event.getExecutionResumed().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionResumed( - e.getExecutionResumed().toBuilder().setInput(v)).build()); - case EXECUTIONREWOUND: - return resolveStringValueField(event, event.getExecutionRewound().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setExecutionRewound( - e.getExecutionRewound().toBuilder().setInput(v)).build()); - default: - return event; - } - } - - @FunctionalInterface - interface HistoryEventUpdater { - HistoryEvent apply(HistoryEvent event, StringValue newValue); - } - - private static HistoryEvent resolveStringValueField(HistoryEvent event, StringValue field, - PayloadHelper payloadHelper, - HistoryEventUpdater updater) { - if (field.getValue().isEmpty()) { - return event; - } - String resolved = payloadHelper.maybeResolve(field.getValue()); - if (resolved.equals(field.getValue())) { - return event; - } - return updater.apply(event, StringValue.of(resolved)); - } - - private static OrchestratorAction externalizeAction(OrchestratorAction action, PayloadHelper payloadHelper) { - switch (action.getOrchestratorActionTypeCase()) { - case SCHEDULETASK: { - ScheduleTaskAction inner = action.getScheduleTask(); - if (inner.hasInput()) { - String ext = payloadHelper.maybeExternalize(inner.getInput().getValue()); - if (!ext.equals(inner.getInput().getValue())) { - return action.toBuilder().setScheduleTask( - inner.toBuilder().setInput(StringValue.of(ext))).build(); - } - } - return action; - } - case CREATESUBORCHESTRATION: { - CreateSubOrchestrationAction inner = action.getCreateSubOrchestration(); - if (inner.hasInput()) { - String ext = payloadHelper.maybeExternalize(inner.getInput().getValue()); - if (!ext.equals(inner.getInput().getValue())) { - return action.toBuilder().setCreateSubOrchestration( - inner.toBuilder().setInput(StringValue.of(ext))).build(); - } - } - return action; - } - case COMPLETEORCHESTRATION: { - CompleteOrchestrationAction inner = action.getCompleteOrchestration(); - CompleteOrchestrationAction.Builder innerBuilder = null; - if (inner.hasResult()) { - String ext = payloadHelper.maybeExternalize(inner.getResult().getValue()); - if (!ext.equals(inner.getResult().getValue())) { - innerBuilder = inner.toBuilder().setResult(StringValue.of(ext)); - } - } - if (inner.hasDetails()) { - String ext = payloadHelper.maybeExternalize(inner.getDetails().getValue()); - if (!ext.equals(inner.getDetails().getValue())) { - if (innerBuilder == null) innerBuilder = inner.toBuilder(); - innerBuilder.setDetails(StringValue.of(ext)); - } - } - // Externalize carryover events - List carryoverEvents = inner.getCarryoverEventsList(); - if (!carryoverEvents.isEmpty()) { - List externalizedEvents = externalizeHistoryEvents(carryoverEvents, payloadHelper); - if (externalizedEvents != carryoverEvents) { - if (innerBuilder == null) innerBuilder = inner.toBuilder(); - innerBuilder.clearCarryoverEvents().addAllCarryoverEvents(externalizedEvents); - } - } - if (innerBuilder != null) { - return action.toBuilder().setCompleteOrchestration(innerBuilder).build(); - } - return action; - } - case TERMINATEORCHESTRATION: { - TerminateOrchestrationAction inner = action.getTerminateOrchestration(); - if (inner.hasReason()) { - String ext = payloadHelper.maybeExternalize(inner.getReason().getValue()); - if (!ext.equals(inner.getReason().getValue())) { - return action.toBuilder().setTerminateOrchestration( - inner.toBuilder().setReason(StringValue.of(ext))).build(); - } - } - return action; - } - case SENDEVENT: { - SendEventAction inner = action.getSendEvent(); - if (inner.hasData()) { - String ext = payloadHelper.maybeExternalize(inner.getData().getValue()); - if (!ext.equals(inner.getData().getValue())) { - return action.toBuilder().setSendEvent( - inner.toBuilder().setData(StringValue.of(ext))).build(); - } - } - return action; - } - default: - return action; - } - } - - private static List externalizeHistoryEvents( - List events, PayloadHelper payloadHelper) { - List result = null; - for (int i = 0; i < events.size(); i++) { - HistoryEvent event = events.get(i); - HistoryEvent externalized = externalizeHistoryEvent(event, payloadHelper); - if (externalized != event) { - if (result == null) { - result = new ArrayList<>(events); - } - result.set(i, externalized); - } - } - return result != null ? result : events; - } - - private static HistoryEvent externalizeHistoryEvent(HistoryEvent event, PayloadHelper payloadHelper) { - switch (event.getEventTypeCase()) { - case EVENTRAISED: - return externalizeStringValueField(event, event.getEventRaised().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setEventRaised( - e.getEventRaised().toBuilder().setInput(v)).build()); - case EVENTSENT: - return externalizeStringValueField(event, event.getEventSent().getInput(), payloadHelper, - (e, v) -> e.toBuilder().setEventSent( - e.getEventSent().toBuilder().setInput(v)).build()); - case GENERICEVENT: - return externalizeStringValueField(event, event.getGenericEvent().getData(), payloadHelper, - (e, v) -> e.toBuilder().setGenericEvent( - e.getGenericEvent().toBuilder().setData(v)).build()); - default: - return event; - } - } - - private static HistoryEvent externalizeStringValueField(HistoryEvent event, StringValue field, - PayloadHelper payloadHelper, - HistoryEventUpdater updater) { - if (field.getValue().isEmpty()) { - return event; - } - String externalized = payloadHelper.maybeExternalize(field.getValue()); - if (externalized.equals(field.getValue())) { - return event; - } - return updater.apply(event, StringValue.of(externalized)); - } -} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadStore.java b/client/src/main/java/com/microsoft/durabletask/PayloadStore.java deleted file mode 100644 index 148bdf91..00000000 --- a/client/src/main/java/com/microsoft/durabletask/PayloadStore.java +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -/** - * Interface for externalizing and resolving large payloads to/from an external store. - *

- * Implementations of this interface handle uploading large payloads to external storage - * (e.g., Azure Blob Storage) and returning opaque token references that can be resolved - * back to the original data. This enables the Durable Task framework to handle payloads - * that exceed gRPC message size limits. - *

- * The store implementation is solely responsible for generating blob names/keys and - * defining the token format. The core framework treats tokens as opaque strings and - * delegates token recognition to {@link #isKnownPayloadToken(String)}. - *

- * Payload retention: This interface does not define a deletion mechanism. - * Externalized payloads persist until removed by external means. When using Azure - * Blob Storage, configure - * - * lifecycle management policies to automatically expire old payloads. - *

- * Performance note: All methods on this interface are synchronous. Implementations - * that perform network I/O (e.g., Azure Blob Storage) should ensure that latency is - * acceptable on the calling thread. The framework calls these methods on the worker's - * processing thread. - * - * @see LargePayloadOptions - */ -public interface PayloadStore { - /** - * Uploads a payload string to external storage and returns an opaque token reference. - * - * @param payload the payload data to upload - * @return an opaque token string that can be used to retrieve the payload via {@link #download(String)} - */ - String upload(String payload); - - /** - * Downloads a payload from external storage using the given token reference. - * - * @param token the opaque token returned by a previous {@link #upload(String)} call - * @return the original payload string - */ - String download(String token); - - /** - * Determines whether the given value is a token that was produced by this store. - *

- * This method is used by the framework to distinguish between regular payload data - * and externalized payload references. Only values recognized as tokens will be - * resolved via {@link #download(String)}. - * - * @param value the string value to check - * @return {@code true} if the value is a known payload token from this store; {@code false} otherwise - */ - boolean isKnownPayloadToken(String value); -} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java b/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java deleted file mode 100644 index 99e358a2..00000000 --- a/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -/** - * Service provider interface for discovering {@link PayloadStore} implementations at runtime. - *

- * Implementations are discovered via {@link java.util.ServiceLoader}. To register a provider, - * create a file {@code META-INF/services/com.microsoft.durabletask.PayloadStoreProvider} - * containing the fully qualified class name of the implementation. - *

- * The provider is responsible for reading its own configuration (e.g., environment variables) - * and determining whether it can create a functional {@link PayloadStore}. - * - * @see PayloadStore - */ -public interface PayloadStoreProvider { - - /** - * Attempts to create a {@link PayloadStore} based on available configuration. - *

- * Implementations should inspect environment variables or other configuration sources - * to determine if they can provide a store. If the required configuration is not present, - * this method should return {@code null} rather than throwing an exception. - * - * @return a configured {@link PayloadStore}, or {@code null} if the required configuration - * is not available - */ - PayloadStore create(); -} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java b/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java deleted file mode 100644 index ba385121..00000000 --- a/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -/** - * Thrown when a payload exceeds the maximum allowed size for externalization. - * - * @see LargePayloadOptions#getMaxExternalizedPayloadBytes() - */ -public class PayloadTooLargeException extends RuntimeException { - - /** - * Creates a new PayloadTooLargeException with the specified message. - * - * @param message the detail message - */ - public PayloadTooLargeException(String message) { - super(message); - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java b/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java deleted file mode 100644 index 36da8f7f..00000000 --- a/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Unit tests for DurableTaskGrpcWorkerBuilder — chunk size validation - * and worker capability announcement. - */ -public class DurableTaskGrpcWorkerBuilderTest { - - // ---- Chunk size validation tests (matches .NET GrpcDurableTaskWorkerOptionsTests) ---- - - @Test - void defaultChunkSize_isWithinRange() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - int chunkSize = builder.getChunkSizeBytes(); - assertTrue(chunkSize >= DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES, - "Default chunk size should be >= MIN"); - assertTrue(chunkSize <= DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES, - "Default chunk size should be <= MAX"); - } - - @Test - void chunkSize_belowMin_throws() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - assertThrows(IllegalArgumentException.class, - () -> builder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES - 1)); - } - - @Test - void chunkSize_aboveMax_throws() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - assertThrows(IllegalArgumentException.class, - () -> builder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES + 1)); - } - - @Test - void chunkSize_atMinBoundary_succeeds() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - builder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); - assertEquals(DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES, builder.getChunkSizeBytes()); - } - - @Test - void chunkSize_atMaxBoundary_succeeds() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - builder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES); - assertEquals(DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES, builder.getChunkSizeBytes()); - } - - @Test - void chunkSize_withinRange_succeeds() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - int midpoint = 2_000_000; - builder.setCompleteOrchestratorResponseChunkSizeBytes(midpoint); - assertEquals(midpoint, builder.getChunkSizeBytes()); - } - - // ---- Worker capability tests (matches .NET WorkerCapabilitiesTests) ---- - - @Test - void useExternalizedPayloads_setsPayloadStore() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - PayloadStore store = new InMemoryPayloadStore(); - builder.useExternalizedPayloads(store); - assertNotNull(builder.payloadStore, "payloadStore should be set after useExternalizedPayloads"); - assertNotNull(builder.largePayloadOptions, "largePayloadOptions should be set with defaults"); - } - - @Test - void useExternalizedPayloads_withOptions_setsPayloadStoreAndOptions() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - PayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(500) - .setMaxExternalizedPayloadBytes(5000) - .build(); - builder.useExternalizedPayloads(store, options); - assertSame(store, builder.payloadStore); - assertSame(options, builder.largePayloadOptions); - } - - @Test - void useExternalizedPayloads_nullStore_throws() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - assertThrows(IllegalArgumentException.class, - () -> builder.useExternalizedPayloads(null)); - } - - @Test - void useExternalizedPayloads_nullOptions_throws() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - PayloadStore store = new InMemoryPayloadStore(); - assertThrows(IllegalArgumentException.class, - () -> builder.useExternalizedPayloads(store, null)); - } - - @Test - void withoutExternalizedPayloads_payloadStoreIsNull() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - assertNull(builder.payloadStore, "payloadStore should be null by default"); - } - - @Test - void useExternalizedPayloads_preservesOtherSettings() { - DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); - PayloadStore store = new InMemoryPayloadStore(); - - builder.setCompleteOrchestratorResponseChunkSizeBytes(2_000_000); - builder.useExternalizedPayloads(store); - - assertEquals(2_000_000, builder.getChunkSizeBytes(), - "Chunk size should be preserved after useExternalizedPayloads"); - assertNotNull(builder.payloadStore); - } - - @Test - void settingsAreIndependentPerBuilder() { - DurableTaskGrpcWorkerBuilder builder1 = new DurableTaskGrpcWorkerBuilder(); - DurableTaskGrpcWorkerBuilder builder2 = new DurableTaskGrpcWorkerBuilder(); - - PayloadStore store = new InMemoryPayloadStore(); - builder1.useExternalizedPayloads(store); - builder2.setCompleteOrchestratorResponseChunkSizeBytes(2_000_000); - - assertNotNull(builder1.payloadStore, "Builder1 should have payloadStore"); - assertNull(builder2.payloadStore, "Builder2 should NOT have payloadStore"); - assertEquals(DurableTaskGrpcWorkerBuilder.DEFAULT_CHUNK_SIZE_BYTES, builder1.getChunkSizeBytes(), - "Builder1 should have default chunk size"); - assertEquals(2_000_000, builder2.getChunkSizeBytes(), - "Builder2 should have custom chunk size"); - } - - /** - * Simple in-memory PayloadStore for builder-level tests. - */ - private static class InMemoryPayloadStore implements PayloadStore { - @Override - public String upload(String payload) { - return "test://token"; - } - - @Override - public String download(String token) { - return "payload"; - } - - @Override - public boolean isKnownPayloadToken(String value) { - return value != null && value.startsWith("test://"); - } - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java deleted file mode 100644 index f6a03eaf..00000000 --- a/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java +++ /dev/null @@ -1,853 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -package com.microsoft.durabletask; - -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Integration tests for the large payload externalization feature. - *

- * These tests require a DTS emulator to be running on localhost:4001. - * They verify end-to-end round-trip of large payloads through externalization. - */ -@Tag("integration") -public class LargePayloadIntegrationTests extends IntegrationTestBase { - - static final Duration defaultTimeout = Duration.ofSeconds(100); - - /** - * In-memory implementation of {@link PayloadStore} for integration testing. - * Stores payloads in a thread-safe map with token format {@code test:}. - */ - static class InMemoryPayloadStore implements PayloadStore { - private static final String TOKEN_PREFIX = "test:"; - private final ConcurrentHashMap payloads = new ConcurrentHashMap<>(); - private final AtomicInteger uploadCount = new AtomicInteger(); - private final AtomicInteger downloadCount = new AtomicInteger(); - - @Override - public String upload(String payload) { - String key = TOKEN_PREFIX + uploadCount.incrementAndGet(); - payloads.put(key, payload); - return key; - } - - @Override - public String download(String token) { - downloadCount.incrementAndGet(); - String payload = payloads.get(token); - if (payload == null) { - throw new IllegalArgumentException("Unknown token: " + token); - } - return payload; - } - - @Override - public boolean isKnownPayloadToken(String value) { - return value != null && value.startsWith(TOKEN_PREFIX); - } - - int getUploadCount() { - return uploadCount.get(); - } - - int getDownloadCount() { - return downloadCount.get(); - } - } - - private static String generateLargeString(int sizeBytes) { - StringBuilder sb = new StringBuilder(sizeBytes + 2); - sb.append('"'); - for (int i = 0; i < sizeBytes - 2; i++) { - sb.append('A'); - } - sb.append('"'); - return sb.toString(); - } - - @Test - void largeOrchestrationInput_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeInputOrch"; - // Create a payload larger than the default 900KB threshold - final String largeInput = generateLargeString(1_000_000); - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - String input = ctx.getInput(String.class); - ctx.complete(input.length()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, largeInput); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(largeInput.length(), instance.readOutputAs(Integer.class)); - assertTrue(store.getUploadCount() > 0, "Should have externalized at least one payload"); - } - } - - @Test - void largeActivityOutput_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeActivityOutputOrch"; - final String activityName = "GenerateLargeOutput"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - String result = ctx.callActivity(activityName, null, String.class).await(); - ctx.complete(result.length()); - }); - workerBuilder.addActivity(activityName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('B'); - } - return sb.toString(); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(payloadSize, instance.readOutputAs(Integer.class)); - assertTrue(store.getUploadCount() > 0, "Should have externalized at least one payload"); - } - } - - @Test - void smallPayload_isNotExternalized() throws TimeoutException { - final String orchestratorName = "SmallPayloadOrch"; - final String smallInput = "Hello, World!"; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - ctx.complete(ctx.getInput(String.class)); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, smallInput); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(smallInput, instance.readOutputAs(String.class)); - assertEquals(0, store.getUploadCount(), "Small payloads should not be externalized"); - } - } - - @Test - void largeOrchestrationOutput_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeOutputOrch"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('C'); - } - ctx.complete(sb.toString()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - String output = instance.readOutputAs(String.class); - assertEquals(payloadSize, output.length()); - assertTrue(store.getUploadCount() > 0, "Large output should be externalized"); - } - } - - @Test - void largeTerminateReason_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeTerminateOrch"; - // Create a reason larger than the threshold - final String largeReason = generateLargeString(1_000_000); - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - // Wait indefinitely — this will be terminated - ctx.createTimer(Duration.ofHours(1)).await(); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - client.waitForInstanceStart(instanceId, defaultTimeout); - - client.terminate(instanceId, largeReason); - - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus()); - assertTrue(store.getUploadCount() > 0, "Large terminate reason should be externalized"); - } - } - - @Test - void largeExternalEvent_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeEventOrch"; - final String eventName = "MyEvent"; - final String largeEventData = generateLargeString(1_000_000); - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - String eventData = ctx.waitForExternalEvent(eventName, String.class).await(); - ctx.complete(eventData.length()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - client.waitForInstanceStart(instanceId, defaultTimeout); - - client.raiseEvent(instanceId, eventName, largeEventData); - - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(largeEventData.length(), instance.readOutputAs(Integer.class)); - assertTrue(store.getUploadCount() > 0, "Large event data should be externalized"); - } - } - - @Test - void largeSubOrchestrationInput_isExternalized() throws TimeoutException { - final String parentOrchName = "ParentOrch"; - final String childOrchName = "ChildOrch"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(parentOrchName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('D'); - } - String result = ctx.callSubOrchestrator(childOrchName, sb.toString(), String.class).await(); - ctx.complete(result); - }); - workerBuilder.addOrchestrator(childOrchName, ctx -> { - String input = ctx.getInput(String.class); - ctx.complete("length=" + input.length()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(parentOrchName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals("length=" + payloadSize, instance.readOutputAs(String.class)); - assertTrue(store.getUploadCount() > 0, "Large sub-orchestration payload should be externalized"); - } - } - - @Test - void largeActivityInput_isExternalized() throws TimeoutException { - final String orchestratorName = "LargeActivityInputOrch"; - final String activityName = "ProcessLargeInput"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('E'); - } - Integer result = ctx.callActivity(activityName, sb.toString(), Integer.class).await(); - ctx.complete(result); - }); - workerBuilder.addActivity(activityName, ctx -> { - String input = ctx.getInput(String.class); - return input.length(); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(payloadSize, instance.readOutputAs(Integer.class)); - assertTrue(store.getUploadCount() > 0, "Large activity input should be externalized"); - } - } - - @Test - void continueAsNew_withLargeInput_isExternalized() throws TimeoutException { - final String orchestratorName = "ContinueAsNewOrch"; - final int maxIterations = 3; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - int iteration = ctx.getInput(Integer.class); - if (iteration >= maxIterations) { - ctx.complete("done-" + iteration); - return; - } - ctx.continueAsNew(iteration + 1); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 1); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals("done-" + maxIterations, instance.readOutputAs(String.class)); - } - } - - @Test - void queryInstances_resolvesLargePayloads() throws TimeoutException { - final String orchestratorName = "QueryLargePayloadOrch"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('F'); - } - ctx.complete(sb.toString()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata completedInstance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - assertNotNull(completedInstance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, completedInstance.getRuntimeStatus()); - - // Query the instance to verify payload resolution - OrchestrationMetadata queried = client.getInstanceMetadata(instanceId, true); - assertNotNull(queried); - String output = queried.readOutputAs(String.class); - assertEquals(payloadSize, output.length()); - } - } - - @Test - void suspendAndResume_withLargeReason_works() throws TimeoutException { - final String orchestratorName = "SuspendResumeOrch"; - final String largeReason = generateLargeString(1_000_000); - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - ctx.waitForExternalEvent("continue").await(); - ctx.complete("resumed"); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - client.waitForInstanceStart(instanceId, defaultTimeout); - - // Suspend with large reason - client.suspendInstance(instanceId, largeReason); - Thread.sleep(2000); // allow time for suspend to take effect - - OrchestrationMetadata suspended = client.getInstanceMetadata(instanceId, false); - assertNotNull(suspended); - assertEquals(OrchestrationRuntimeStatus.SUSPENDED, suspended.getRuntimeStatus()); - - // Resume with large reason - client.resumeInstance(instanceId, largeReason); - Thread.sleep(2000); - - // Send event to complete - client.raiseEvent(instanceId, "continue", null); - - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertTrue(store.getUploadCount() > 0, "Large suspend/resume reasons should be externalized"); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Test interrupted"); - } - } - - // ---- Autochunk tests (matches .NET AutochunkTests) ---- - - @Test - void autochunk_multipleChunks_completesSuccessfully() throws TimeoutException { - final String orchestratorName = "AutochunkMultipleOrch"; - final String activityName = "GeneratePayload"; - // Use 36 activities, each returning ~30KB. At 1MB chunk size this forces multiple chunks. - final int activityCount = 36; - final int payloadSizePerActivity = 30_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - // Set a small chunk size to force chunking - workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); // 1 MiB - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - List> tasks = IntStream.range(0, activityCount) - .mapToObj(i -> ctx.callActivity(activityName, i, String.class)) - .collect(Collectors.toList()); - ctx.allOf(tasks).await(); - ctx.complete(activityCount); - }); - workerBuilder.addActivity(activityName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSizePerActivity); - for (int i = 0; i < payloadSizePerActivity; i++) { - sb.append('X'); - } - return sb.toString(); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - assertEquals(activityCount, instance.readOutputAs(Integer.class)); - } - } - - @Test - void autochunk_mixedActions_completesSuccessfully() throws TimeoutException { - final String orchestratorName = "AutochunkMixedOrch"; - final String activityName = "MixedActivity"; - final String childOrchName = "MixedChildOrch"; - // 30 activities + many timers + sub-orchestrations - final int activityCount = 30; - final int timerCount = 20; - final int subOrchCount = 10; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - int total = 0; - - // Schedule activities - List> activityTasks = IntStream.range(0, activityCount) - .mapToObj(i -> ctx.callActivity(activityName, i, Integer.class)) - .collect(Collectors.toList()); - - // Schedule timers - for (int i = 0; i < timerCount; i++) { - ctx.createTimer(Duration.ofMillis(1)).await(); - } - - // Schedule sub-orchestrations - List> subOrchTasks = IntStream.range(0, subOrchCount) - .mapToObj(i -> ctx.callSubOrchestrator(childOrchName, i, Integer.class)) - .collect(Collectors.toList()); - - // Await all activities - ctx.allOf(activityTasks).await(); - for (Task t : activityTasks) { - total += t.await(); - } - - // Await all sub-orchestrations - ctx.allOf(subOrchTasks).await(); - for (Task t : subOrchTasks) { - total += t.await(); - } - - ctx.complete(total); - }); - workerBuilder.addOrchestrator(childOrchName, ctx -> { - int input = ctx.getInput(Integer.class); - ctx.complete(input); - }); - workerBuilder.addActivity(activityName, ctx -> { - return ctx.getInput(Integer.class); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - - int expectedTotal = 0; - for (int i = 0; i < activityCount; i++) expectedTotal += i; - for (int i = 0; i < subOrchCount; i++) expectedTotal += i; - assertEquals(expectedTotal, instance.readOutputAs(Integer.class)); - } - } - - @Test - void autochunk_singleActionExceedsChunkSize_failsWithClearError() throws TimeoutException { - final String orchestratorName = "AutochunkOversizedOrch"; - // Create an orchestrator that completes with a payload larger than 1MB chunk size. - // Externalization is NOT configured so the large payload stays inline in the - // CompleteOrchestration action, which exceeds the chunk size. - final int payloadSize = 1_200_000; - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( - DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - StringBuilder sb = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - sb.append('Z'); - } - ctx.complete(sb.toString()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - // The orchestration should fail because a single action exceeds the chunk size - assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - } - } - - // ---- Combined scenario tests (matches .NET combined tests) ---- - - @Test - void largeInputOutputAndCustomStatus_allExternalized() throws TimeoutException { - final String orchestratorName = "LargeAllFieldsOrch"; - final int payloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - String input = ctx.getInput(String.class); - - // Set a large custom status - StringBuilder customStatus = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - customStatus.append('S'); - } - ctx.setCustomStatus(customStatus.toString()); - - // Return large output - ctx.complete(input + input); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String largeInput = generateLargeString(payloadSize); - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, largeInput); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - String output = instance.readOutputAs(String.class); - assertEquals(largeInput + largeInput, output); - assertTrue(store.getUploadCount() >= 1, "Should have externalized input, output, and/or custom status"); - assertTrue(store.getDownloadCount() >= 1, "Should have resolved at least one payload"); - } - } - - @Test - void continueAsNew_withLargeCustomStatusAndFinalOutput() throws TimeoutException { - final String orchestratorName = "ContinueAsNewAllOrch"; - final int payloadSize = 1_000_000; - final int iterations = 3; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - int iteration = ctx.getInput(Integer.class); - - // Set large custom status on every iteration - StringBuilder status = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - status.append((char) ('A' + (iteration % 26))); - } - ctx.setCustomStatus(status.toString()); - - if (iteration >= iterations) { - // Large final output - StringBuilder finalOutput = new StringBuilder(payloadSize); - for (int i = 0; i < payloadSize; i++) { - finalOutput.append('F'); - } - ctx.complete(finalOutput.toString()); - return; - } - ctx.continueAsNew(iteration + 1); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 1); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - String output = instance.readOutputAs(String.class); - assertEquals(payloadSize, output.length()); - assertTrue(store.getUploadCount() > 0, "Should have externalized custom status and/or final output"); - } - } - - @Test - void largeSubOrchestrationAndActivityOutput_combined() throws TimeoutException { - final String parentOrchName = "CombinedParentOrch"; - final String childOrchName = "CombinedChildOrch"; - final String activityName = "CombinedActivity"; - final int subOrchPayloadSize = 1_000_000; - final int activityPayloadSize = 1_000_000; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(parentOrchName, ctx -> { - // Large sub-orchestration input - StringBuilder subInput = new StringBuilder(subOrchPayloadSize); - for (int i = 0; i < subOrchPayloadSize; i++) { - subInput.append('G'); - } - String subResult = ctx.callSubOrchestrator(childOrchName, subInput.toString(), String.class).await(); - - // Activity that returns large output - String actResult = ctx.callActivity(activityName, null, String.class).await(); - - ctx.complete(subResult.length() + actResult.length()); - }); - workerBuilder.addOrchestrator(childOrchName, ctx -> { - String input = ctx.getInput(String.class); - ctx.complete("child-" + input.length()); - }); - workerBuilder.addActivity(activityName, ctx -> { - StringBuilder sb = new StringBuilder(activityPayloadSize); - for (int i = 0; i < activityPayloadSize; i++) { - sb.append('H'); - } - return sb.toString(); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(parentOrchName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); - int expectedLength = ("child-" + subOrchPayloadSize).length() + activityPayloadSize; - assertEquals(expectedLength, instance.readOutputAs(Integer.class)); - assertTrue(store.getUploadCount() >= 1, "Should have externalized large payloads"); - assertTrue(store.getDownloadCount() >= 1, "Should have resolved large payloads"); - } - } - - // ---- Max payload rejection integration test ---- - - @Test - void exceedingMaxPayload_isRejected() throws TimeoutException { - final String orchestratorName = "MaxPayloadRejectionOrch"; - // Use a very small max so we can trigger rejection without massive strings - final int threshold = 100; - final int maxPayload = 200; - - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(threshold) - .setMaxExternalizedPayloadBytes(maxPayload) - .build(); - - TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); - workerBuilder.innerBuilder.useExternalizedPayloads(store, options); - workerBuilder.addOrchestrator(orchestratorName, ctx -> { - // Generate output that exceeds max externalized payload size - StringBuilder sb = new StringBuilder(500); - for (int i = 0; i < 500; i++) { - sb.append('R'); - } - ctx.complete(sb.toString()); - }); - DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); - - DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); - clientBuilder.useExternalizedPayloads(store, options); - DurableTaskClient client = clientBuilder.build(); - - try (worker; client) { - String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); - OrchestrationMetadata instance = client.waitForInstanceCompletion( - instanceId, defaultTimeout, true); - - assertNotNull(instance); - // The orchestration should fail because the payload exceeds max - assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); - } - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java b/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java deleted file mode 100644 index ca9be95b..00000000 --- a/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Unit tests for LargePayloadOptions. - */ -public class LargePayloadOptionsTest { - - @Test - void defaults_matchExpectedValues() { - LargePayloadOptions options = new LargePayloadOptions.Builder().build(); - assertEquals(900_000, options.getThresholdBytes()); - assertEquals(10 * 1024 * 1024, options.getMaxExternalizedPayloadBytes()); - } - - @Test - void customValues_areRespected() { - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(500) - .setMaxExternalizedPayloadBytes(5000) - .build(); - assertEquals(500, options.getThresholdBytes()); - assertEquals(5000, options.getMaxExternalizedPayloadBytes()); - } - - @Test - void negativeThreshold_throws() { - assertThrows(IllegalArgumentException.class, - () -> new LargePayloadOptions.Builder().setThresholdBytes(-1)); - } - - @Test - void thresholdExceeds1MiB_throws() { - assertThrows(IllegalArgumentException.class, - () -> new LargePayloadOptions.Builder().setThresholdBytes(1_048_577)); - } - - @Test - void nonPositiveMax_throws() { - assertThrows(IllegalArgumentException.class, - () -> new LargePayloadOptions.Builder().setMaxExternalizedPayloadBytes(0)); - } - - @Test - void thresholdEqualToMax_throws() { - assertThrows(IllegalStateException.class, - () -> new LargePayloadOptions.Builder() - .setThresholdBytes(100) - .setMaxExternalizedPayloadBytes(100) - .build()); - } - - @Test - void thresholdGreaterThanMax_throws() { - assertThrows(IllegalStateException.class, - () -> new LargePayloadOptions.Builder() - .setThresholdBytes(200) - .setMaxExternalizedPayloadBytes(100) - .build()); - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java b/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java deleted file mode 100644 index 29daf4ad..00000000 --- a/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask; - -import com.google.protobuf.StringValue; -import com.google.protobuf.Timestamp; -import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; -import org.junit.jupiter.api.Test; - -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Tests for OrchestrationRunner with large payload externalization. - */ -public class OrchestrationRunnerPayloadTest { - - private static final String TOKEN_PREFIX = "test://payload/"; - - private static class TestPayloadStore implements PayloadStore { - private final Map blobs = new HashMap<>(); - - @Override - public String upload(String payload) { - String token = TOKEN_PREFIX + UUID.randomUUID().toString(); - blobs.put(token, payload); - return token; - } - - @Override - public String download(String token) { - String value = blobs.get(token); - if (value == null) { - throw new IllegalArgumentException("Unknown token: " + token); - } - return value; - } - - @Override - public boolean isKnownPayloadToken(String value) { - return value != null && value.startsWith(TOKEN_PREFIX); - } - - void seed(String token, String payload) { - blobs.put(token, payload); - } - } - - @Test - void loadAndRun_withoutStore_worksNormally() { - // Build a minimal orchestration request - byte[] requestBytes = buildSimpleOrchestrationRequest("test-input"); - - byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { - String input = ctx.getInput(String.class); - ctx.complete("output: " + input); - }); - - assertNotNull(resultBytes); - assertTrue(resultBytes.length > 0); - } - - @Test - void loadAndRun_withStore_resolvesInputAndExternalizes() { - TestPayloadStore store = new TestPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(10) - .setMaxExternalizedPayloadBytes(100_000) - .build(); - - // Seed a token for the input so it gets resolved - String largeInput = "\"large-input-data-exceeding-threshold-for-test\""; - String inputToken = TOKEN_PREFIX + "input-token"; - store.seed(inputToken, largeInput); - - // Build request with token as the input - byte[] requestBytes = buildSimpleOrchestrationRequest(inputToken); - - byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { - // The token should have been resolved before the orchestration runs - String input = ctx.getInput(String.class); - assertEquals("large-input-data-exceeding-threshold-for-test", input); - ctx.complete("done"); - }, store, options); - - assertNotNull(resultBytes); - assertTrue(resultBytes.length > 0); - - // Parse the response to verify it was externalized - OrchestratorResponse response; - try { - response = OrchestratorResponse.parseFrom(resultBytes); - } catch (Exception e) { - throw new RuntimeException(e); - } - - // The output "done" (6 bytes including quotes) is below our 10 byte threshold, - // so it should NOT be externalized. This verifies the threshold logic works. - OrchestratorAction completeAction = response.getActionsList().stream() - .filter(OrchestratorAction::hasCompleteOrchestration) - .findFirst() - .orElseThrow(() -> new AssertionError("Expected CompleteOrchestration action")); - - String resultValue = completeAction.getCompleteOrchestration().getResult().getValue(); - assertFalse(store.isKnownPayloadToken(resultValue), - "Small result should not be externalized"); - } - - @Test - void loadAndRun_base64_withStore_works() { - TestPayloadStore store = new TestPayloadStore(); - - byte[] requestBytes = buildSimpleOrchestrationRequest("\"hello\""); - String base64 = Base64.getEncoder().encodeToString(requestBytes); - - String resultBase64 = OrchestrationRunner.loadAndRun(base64, ctx -> { - ctx.complete("world"); - }, store); - - assertNotNull(resultBase64); - assertFalse(resultBase64.isEmpty()); - } - - @Test - void loadAndRun_nullStore_treatedAsNoExternalization() { - byte[] requestBytes = buildSimpleOrchestrationRequest("\"test\""); - - byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { - ctx.complete("result"); - }, null, null); - - assertNotNull(resultBytes); - assertTrue(resultBytes.length > 0); - } - - private byte[] buildSimpleOrchestrationRequest(String input) { - HistoryEvent orchestratorStarted = HistoryEvent.newBuilder() - .setEventId(-1) - .setTimestamp(Timestamp.getDefaultInstance()) - .setOrchestratorStarted(OrchestratorStartedEvent.getDefaultInstance()) - .build(); - - HistoryEvent executionStarted = HistoryEvent.newBuilder() - .setEventId(-1) - .setTimestamp(Timestamp.getDefaultInstance()) - .setExecutionStarted(ExecutionStartedEvent.newBuilder() - .setName("TestOrchestration") - .setVersion(StringValue.of("")) - .setInput(StringValue.of(input)) - .setOrchestrationInstance(OrchestrationInstance.newBuilder() - .setInstanceId("test-" + UUID.randomUUID()) - .build()) - .build()) - .build(); - - HistoryEvent orchestratorCompleted = HistoryEvent.newBuilder() - .setEventId(-1) - .setTimestamp(Timestamp.getDefaultInstance()) - .setOrchestratorCompleted(OrchestratorCompletedEvent.getDefaultInstance()) - .build(); - - return OrchestratorRequest.newBuilder() - .setInstanceId("test-" + UUID.randomUUID()) - .addNewEvents(orchestratorStarted) - .addNewEvents(executionStarted) - .addNewEvents(orchestratorCompleted) - .build() - .toByteArray(); - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java b/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java deleted file mode 100644 index 117825cf..00000000 --- a/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask; - -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Unit tests for PayloadHelper. - */ -public class PayloadHelperTest { - - /** - * A simple in-memory PayloadStore for testing. - */ - private static class InMemoryPayloadStore implements PayloadStore { - private static final String TOKEN_PREFIX = "inmemory://"; - private final java.util.Map blobs = new java.util.HashMap<>(); - private int uploadCount = 0; - private int downloadCount = 0; - - @Override - public String upload(String payload) { - uploadCount++; - String token = TOKEN_PREFIX + java.util.UUID.randomUUID().toString(); - blobs.put(token, payload); - return token; - } - - @Override - public String download(String token) { - downloadCount++; - String value = blobs.get(token); - if (value == null) { - throw new IllegalArgumentException("Unknown token: " + token); - } - return value; - } - - @Override - public boolean isKnownPayloadToken(String value) { - return value != null && value.startsWith(TOKEN_PREFIX); - } - - int getUploadCount() { return uploadCount; } - int getDownloadCount() { return downloadCount; } - } - - @Test - void maybeExternalize_nullValue_returnsNull() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - PayloadHelper helper = new PayloadHelper(store, defaultOptions()); - assertNull(helper.maybeExternalize(null)); - assertEquals(0, store.getUploadCount()); - } - - @Test - void maybeExternalize_emptyValue_returnsEmpty() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - PayloadHelper helper = new PayloadHelper(store, defaultOptions()); - assertEquals("", helper.maybeExternalize("")); - assertEquals(0, store.getUploadCount()); - } - - @Test - void maybeExternalize_belowThreshold_returnsOriginal() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(100) - .setMaxExternalizedPayloadBytes(1000) - .build(); - PayloadHelper helper = new PayloadHelper(store, options); - - String smallPayload = "hello"; - assertEquals(smallPayload, helper.maybeExternalize(smallPayload)); - assertEquals(0, store.getUploadCount()); - } - - @Test - void maybeExternalize_aboveThreshold_uploadsAndReturnsToken() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(10) - .setMaxExternalizedPayloadBytes(10_000) - .build(); - PayloadHelper helper = new PayloadHelper(store, options); - - String largePayload = "a]".repeat(100); // 200 chars - String result = helper.maybeExternalize(largePayload); - - assertNotEquals(largePayload, result); - assertTrue(store.isKnownPayloadToken(result)); - assertEquals(1, store.getUploadCount()); - } - - @Test - void maybeExternalize_exceedsMaxCap_throwsException() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(10) - .setMaxExternalizedPayloadBytes(50) - .build(); - PayloadHelper helper = new PayloadHelper(store, options); - - // Create a payload larger than 50 bytes - String hugePayload = "x".repeat(100); - assertThrows(PayloadTooLargeException.class, () -> helper.maybeExternalize(hugePayload)); - assertEquals(0, store.getUploadCount()); - } - - @Test - void maybeResolve_nullValue_returnsNull() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - PayloadHelper helper = new PayloadHelper(store, defaultOptions()); - assertNull(helper.maybeResolve(null)); - assertEquals(0, store.getDownloadCount()); - } - - @Test - void maybeResolve_regularValue_returnsOriginal() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - PayloadHelper helper = new PayloadHelper(store, defaultOptions()); - String regularValue = "just some data"; - assertEquals(regularValue, helper.maybeResolve(regularValue)); - assertEquals(0, store.getDownloadCount()); - } - - @Test - void maybeResolve_knownToken_downloadsAndReturnsPayload() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(10) - .setMaxExternalizedPayloadBytes(10_000) - .build(); - PayloadHelper helper = new PayloadHelper(store, options); - - // First upload, then resolve - String originalPayload = "x".repeat(100); - String token = helper.maybeExternalize(originalPayload); - String resolved = helper.maybeResolve(token); - - assertEquals(originalPayload, resolved); - assertEquals(1, store.getDownloadCount()); - } - - @Test - void roundTrip_externalizeAndResolve() { - InMemoryPayloadStore store = new InMemoryPayloadStore(); - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(5) - .setMaxExternalizedPayloadBytes(10_000) - .build(); - PayloadHelper helper = new PayloadHelper(store, options); - - String payload = "This is a test payload that is long enough"; - String token = helper.maybeExternalize(payload); - assertNotEquals(payload, token); - - String resolved = helper.maybeResolve(token); - assertEquals(payload, resolved); - } - - @Test - void constructor_nullStore_throws() { - assertThrows(IllegalArgumentException.class, - () -> new PayloadHelper(null, defaultOptions())); - } - - @Test - void constructor_nullOptions_throws() { - assertThrows(IllegalArgumentException.class, - () -> new PayloadHelper(new InMemoryPayloadStore(), null)); - } - - private static LargePayloadOptions defaultOptions() { - return new LargePayloadOptions.Builder().build(); - } -} diff --git a/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java b/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java deleted file mode 100644 index bb2b0ce3..00000000 --- a/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.microsoft.durabletask; - -import com.google.protobuf.StringValue; -import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * Unit tests for PayloadInterceptionHelper. - */ -public class PayloadInterceptionHelperTest { - - private static final String TOKEN_PREFIX = "test://token/"; - - /** - * A simple in-memory PayloadStore for testing. - */ - private static class TestPayloadStore implements PayloadStore { - private final Map blobs = new HashMap<>(); - - @Override - public String upload(String payload) { - String token = TOKEN_PREFIX + UUID.randomUUID().toString(); - blobs.put(token, payload); - return token; - } - - @Override - public String download(String token) { - String value = blobs.get(token); - if (value == null) { - throw new IllegalArgumentException("Unknown token: " + token); - } - return value; - } - - @Override - public boolean isKnownPayloadToken(String value) { - return value != null && value.startsWith(TOKEN_PREFIX); - } - - /** Pre-load a token->payload mapping for resolution tests. */ - void seed(String token, String payload) { - blobs.put(token, payload); - } - } - - private PayloadHelper createHelper(TestPayloadStore store) { - LargePayloadOptions options = new LargePayloadOptions.Builder() - .setThresholdBytes(10) - .setMaxExternalizedPayloadBytes(100_000) - .build(); - return new PayloadHelper(store, options); - } - - // ---- Resolve tests ---- - - @Test - void resolveOrchestratorRequest_noTokens_returnsOriginal() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - HistoryEvent event = buildExecutionStartedEvent("regular-input"); - OrchestratorRequest request = OrchestratorRequest.newBuilder() - .setInstanceId("test-1") - .addNewEvents(event) - .build(); - - OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); - assertSame(request, result, "Should return same object when no tokens found"); - } - - @Test - void resolveOrchestratorRequest_withToken_resolvesInput() { - TestPayloadStore store = new TestPayloadStore(); - String token = TOKEN_PREFIX + "abc"; - String originalPayload = "large-payload-data"; - store.seed(token, originalPayload); - PayloadHelper helper = createHelper(store); - - HistoryEvent event = buildExecutionStartedEvent(token); - OrchestratorRequest request = OrchestratorRequest.newBuilder() - .setInstanceId("test-2") - .addNewEvents(event) - .build(); - - OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); - assertNotSame(request, result); - assertEquals(originalPayload, result.getNewEvents(0).getExecutionStarted().getInput().getValue()); - } - - @Test - void resolveOrchestratorRequest_taskCompleted_resolvesResult() { - TestPayloadStore store = new TestPayloadStore(); - String token = TOKEN_PREFIX + "task-result"; - String originalPayload = "{\"key\":\"value\"}"; - store.seed(token, originalPayload); - PayloadHelper helper = createHelper(store); - - HistoryEvent event = HistoryEvent.newBuilder() - .setEventId(1) - .setTaskCompleted(TaskCompletedEvent.newBuilder() - .setTaskScheduledId(1) - .setResult(StringValue.of(token)) - .build()) - .build(); - - OrchestratorRequest request = OrchestratorRequest.newBuilder() - .setInstanceId("test-3") - .addPastEvents(event) - .build(); - - OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); - assertNotSame(request, result); - assertEquals(originalPayload, result.getPastEvents(0).getTaskCompleted().getResult().getValue()); - } - - @Test - void resolveOrchestratorRequest_eventRaised_resolvesInput() { - TestPayloadStore store = new TestPayloadStore(); - String token = TOKEN_PREFIX + "event-data"; - String originalPayload = "event payload content"; - store.seed(token, originalPayload); - PayloadHelper helper = createHelper(store); - - HistoryEvent event = HistoryEvent.newBuilder() - .setEventId(2) - .setEventRaised(EventRaisedEvent.newBuilder() - .setName("TestEvent") - .setInput(StringValue.of(token)) - .build()) - .build(); - - OrchestratorRequest request = OrchestratorRequest.newBuilder() - .setInstanceId("test-4") - .addNewEvents(event) - .build(); - - OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); - assertEquals(originalPayload, result.getNewEvents(0).getEventRaised().getInput().getValue()); - } - - @Test - void resolveActivityRequest_withToken_resolvesInput() { - TestPayloadStore store = new TestPayloadStore(); - String token = TOKEN_PREFIX + "activity-input"; - String originalPayload = "activity input data"; - store.seed(token, originalPayload); - PayloadHelper helper = createHelper(store); - - ActivityRequest request = ActivityRequest.newBuilder() - .setName("TestActivity") - .setInput(StringValue.of(token)) - .build(); - - ActivityRequest result = PayloadInterceptionHelper.resolveActivityRequestPayloads(request, helper); - assertNotSame(request, result); - assertEquals(originalPayload, result.getInput().getValue()); - } - - @Test - void resolveActivityRequest_noToken_returnsOriginal() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - ActivityRequest request = ActivityRequest.newBuilder() - .setName("TestActivity") - .setInput(StringValue.of("regular data")) - .build(); - - ActivityRequest result = PayloadInterceptionHelper.resolveActivityRequestPayloads(request, helper); - assertSame(request, result, "Should return same object when no token found"); - } - - // ---- Externalize tests ---- - - @Test - void externalizeOrchestratorResponse_smallPayloads_returnsOriginal() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId("test-5") - .setCustomStatus(StringValue.of("ok")) - .addActions(OrchestratorAction.newBuilder() - .setId(1) - .setScheduleTask(ScheduleTaskAction.newBuilder() - .setName("SmallTask") - .setInput(StringValue.of("tiny")) - .build()) - .build()) - .build(); - - OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); - assertSame(response, result, "Small payloads should not be externalized"); - } - - @Test - void externalizeOrchestratorResponse_largeScheduleTaskInput_externalizes() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - String largeInput = "x".repeat(100); // > 10 byte threshold - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId("test-6") - .addActions(OrchestratorAction.newBuilder() - .setId(1) - .setScheduleTask(ScheduleTaskAction.newBuilder() - .setName("LargeTask") - .setInput(StringValue.of(largeInput)) - .build()) - .build()) - .build(); - - OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); - assertNotSame(response, result); - - String externalizedInput = result.getActions(0).getScheduleTask().getInput().getValue(); - assertTrue(store.isKnownPayloadToken(externalizedInput), "Input should be externalized to a token"); - } - - @Test - void externalizeOrchestratorResponse_largeCompleteResult_externalizes() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - String largeResult = "y".repeat(200); - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId("test-7") - .addActions(OrchestratorAction.newBuilder() - .setId(1) - .setCompleteOrchestration(CompleteOrchestrationAction.newBuilder() - .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED) - .setResult(StringValue.of(largeResult)) - .build()) - .build()) - .build(); - - OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); - assertNotSame(response, result); - - String externalizedResult = result.getActions(0).getCompleteOrchestration().getResult().getValue(); - assertTrue(store.isKnownPayloadToken(externalizedResult)); - } - - @Test - void externalizeOrchestratorResponse_largeCustomStatus_externalizes() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - String largeStatus = "s".repeat(100); - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId("test-8") - .setCustomStatus(StringValue.of(largeStatus)) - .build(); - - OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); - assertNotSame(response, result); - - String externalizedStatus = result.getCustomStatus().getValue(); - assertTrue(store.isKnownPayloadToken(externalizedStatus)); - } - - @Test - void roundTrip_externalizeAndResolve_scheduleTaskInput() { - TestPayloadStore store = new TestPayloadStore(); - PayloadHelper helper = createHelper(store); - - String originalInput = "large-task-input-data-that-exceeds-threshold"; - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId("test-9") - .addActions(OrchestratorAction.newBuilder() - .setId(1) - .setScheduleTask(ScheduleTaskAction.newBuilder() - .setName("Task") - .setInput(StringValue.of(originalInput)) - .build()) - .build()) - .build(); - - // Externalize - OrchestratorResponse externalized = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); - String token = externalized.getActions(0).getScheduleTask().getInput().getValue(); - assertTrue(store.isKnownPayloadToken(token)); - - // Simulate the payload arriving as a TaskScheduled history event - HistoryEvent taskScheduled = HistoryEvent.newBuilder() - .setEventId(1) - .setTaskScheduled(TaskScheduledEvent.newBuilder() - .setName("Task") - .setInput(StringValue.of(token)) - .build()) - .build(); - - OrchestratorRequest request = OrchestratorRequest.newBuilder() - .setInstanceId("test-9") - .addPastEvents(taskScheduled) - .build(); - - // Resolve - OrchestratorRequest resolved = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); - assertEquals(originalInput, resolved.getPastEvents(0).getTaskScheduled().getInput().getValue()); - } - - // ---- Helper methods ---- - - private static HistoryEvent buildExecutionStartedEvent(String input) { - return HistoryEvent.newBuilder() - .setEventId(-1) - .setExecutionStarted(ExecutionStartedEvent.newBuilder() - .setName("TestOrchestration") - .setVersion(StringValue.of("")) - .setInput(StringValue.of(input)) - .setOrchestrationInstance(OrchestrationInstance.newBuilder() - .setInstanceId("test-instance") - .build()) - .build()) - .build(); - } -} diff --git a/settings.gradle b/settings.gradle index afe0838e..3813e3ae 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,7 +3,6 @@ rootProject.name = 'durabletask-java' include ":client" include ":azurefunctions" include ":azuremanaged" -include ":azure-blob-payloads" include ":samples" include ":samples-azure-functions" include ":endtoendtests"