Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions durabletask-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void raiseEvent(String instanceId, String eventName) {
* Waits for an orchestration to start running and returns an {@link OrchestrationMetadata} object that contains
* metadata about the started instance.
*
* <p> A "started" orchestration instance is any instance not in the <code>Pending</code> state. </p>
* <p>A "started" orchestration instance is any instance not in the <code>Pending</code> state. </p>
*
* <p>If an orchestration instance is already running when this method is called, the method will return immediately.
*</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand All @@ -42,7 +43,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);

private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
private final TaskOrchestrationFactories orchestrationFactories;

private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();

private final ManagedChannel managedSidecarChannel;
Expand All @@ -57,7 +59,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private Thread workerThread;

DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
this.orchestrationFactories = builder.orchestrationFactories;
this.activityFactories.putAll(builder.activityFactories);
this.appId = builder.appId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.dapr.durabletask;

import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
import io.grpc.Channel;

import java.time.Duration;
Expand All @@ -24,7 +26,7 @@
*
*/
public final class DurableTaskGrpcWorkerBuilder {
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
int port;
Channel channel;
Expand All @@ -40,17 +42,7 @@ public final class DurableTaskGrpcWorkerBuilder {
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
String key = factory.getName();
if (key == null || key.length() == 0) {
throw new IllegalArgumentException("A non-empty task orchestration name is required.");
}

if (this.orchestrationFactories.containsKey(key)) {
throw new IllegalArgumentException(
String.format("A task orchestration factory named %s is already registered.", key));
}

this.orchestrationFactories.put(key, factory);
this.orchestrationFactories.addOrchestration(factory);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public boolean isCustomStatusFetched() {
private <T> T readPayloadAs(Class<T> type, String payload) {
if (!this.requestedInputsAndOutputs) {
throw new IllegalStateException("This method can only be used when instance metadata is fetched with the option "
+ "to include input and output data.");
+ "to include input and output data.");
}

// Note that the Java gRPC implementation converts null protobuf strings into empty Java strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;

import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -134,8 +135,8 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati
}

// Register the passed orchestration as the default ("*") orchestration
HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
orchestrationFactories.put("*", new TaskOrchestrationFactory() {
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
orchestrationFactories.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() {
return "*";
Expand All @@ -145,6 +146,16 @@ public String getName() {
public TaskOrchestration create() {
return orchestration;
}

@Override
public String getVersionName() {
return "";
}

@Override
public Boolean isLatestVersion() {
return false;
}
});

TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ public enum OrchestrationRuntimeStatus {
/**
* The orchestration is in a suspended state.
*/
SUSPENDED;
SUSPENDED,

/**
* The orchestration is in a stalled state.
*/
STALLED;

static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.OrchestrationStatus status) {
switch (status) {
Expand All @@ -88,6 +93,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.Orchestration
return PENDING;
case ORCHESTRATION_STATUS_SUSPENDED:
return SUSPENDED;
case ORCHESTRATION_STATUS_STALLED:
return STALLED;
default:
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
* <pre>
* Task{@literal <}int{@literal >} activityTask = ctx.callActivity("MyActivity", someInput, int.class);
* </pre>
*
* <p>Orchestrator code uses the {@link #await()} method to block on the completion of the task and retrieve the result.
* If the task is not yet complete, the {@code await()} method will throw an {@link OrchestratorBlockedException}, which
* pauses the orchestrator's execution so that it can save its progress into durable storage and schedule any
* outstanding work. When the task is complete, the orchestrator will run again from the beginning and the next time
* the task's {@code await()} method is called, the result will be returned, or a {@link TaskFailedException} will be
* thrown if the result of the task was an unhandled exception.</p>
*
* <p>Note that orchestrator code must never catch {@code OrchestratorBlockedException} because doing so can cause the
* orchestration instance to get permanently stuck.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public interface TaskActivityContext {
*/
<T> T getInput(Class<T> targetType);


/**
* Gets the execution id of the current task activity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
/**
* Exception that gets thrown when awaiting a {@link Task} for an activity or sub-orchestration that fails with an
* unhandled exception.
*
* <p>Detailed information associated with a particular task failure can be retrieved
* using the {@link #getErrorDetails()} method.</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ default void continueAsNew(Object input) {
*/
void continueAsNew(Object input, boolean preserveUnprocessedEvents);

/**
* Check if the given patch name can be applied to the orchestration.
*
* @param patchName The name of the patch to check.
* @return True if the given patch name can be applied to the orchestration, False otherwise.
*/

boolean isPatched(String patchName);

/**
* Create a new Uuid that is safe for replay within an orchestration or operation.
*
Expand Down
Loading
Loading