Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ jobs:
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableCHASMSignalBacklinks=true \
--dynamic-config-value history.enableTransitionHistory=true &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.api.common.v1.*;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure;
Expand Down Expand Up @@ -103,7 +104,28 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
}
}
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
// Auto-capture the start-workflow backlink so the task handler drains it onto the
// StartOperationResponse, the same path used for signal/signalWithStart responses.
if (response.hasLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getLink());
} else {
// Older servers (pre-1.31) don't return a link on the start response. Fabricate one
// pointing at the started workflow's WorkflowExecutionStarted event so the caller still
// gets a backlink.
CurrentNexusOperationContext.get()
.addBacklink(
Link.newBuilder()
.setWorkflowEvent(
Link.WorkflowEvent.newBuilder()
.setNamespace(clientOptions.getNamespace())
.setWorkflowId(execution.getWorkflowId())
.setRunId(execution.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
.build());
}
}
return new WorkflowStartOutput(execution);
}
Expand All @@ -120,6 +142,13 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
.setRequestId(UUID.randomUUID().toString())
.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));

// If this signal is being issued from inside a Nexus operation handler, forward the inbound
// Nexus task links so the SignalWorkflowExecution history event links back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
request.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
}

DataConverter dataConverterWitSignalContext =
clientOptions
.getDataConverter()
Expand All @@ -129,7 +158,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {

Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
inputArgs.ifPresent(request::setInput);
genericClient.signal(request.build());
SignalWorkflowExecutionResponse response = genericClient.signal(request.build());
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getLink());
}
return new WorkflowSignalOutput();
}

Expand All @@ -148,17 +182,28 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
SignalWithStartWorkflowExecutionRequest request =
requestsHelper
.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null))
.build();
SignalWithStartWorkflowExecutionRequest.Builder requestBuilder =
requestsHelper.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null));
// If this signalWithStart is being issued from inside a Nexus operation handler, forward
// the inbound Nexus task links so both the WorkflowExecutionStarted and
// WorkflowExecutionSignaled events on the callee link back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
}
SignalWithStartWorkflowExecutionRequest request = requestBuilder.build();
SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
WorkflowExecution execution =
WorkflowExecution.newBuilder()
.setRunId(response.getRunId())
.setWorkflowId(request.getWorkflowId())
.build();
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasSignalLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getSignalLink());
}
// TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
// We should wire it when it's implemented server-side.
return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface GenericWorkflowClient {

StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request);

void signal(SignalWorkflowExecutionRequest request);
SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request);

SignalWithStartWorkflowExecutionResponse signalWithStart(
SignalWithStartWorkflowExecutionRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ private static Map<String, String> tagsForStartWorkflow(StartWorkflowExecutionRe
}

@Override
public void signal(SignalWorkflowExecutionRequest request) {
public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) {
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1)
.put(MetricsTag.SIGNAL_NAME, request.getSignalName())
.build();
Scope scope = metricsScope.tagged(tags);
grpcRetryer.retry(
return grpcRetryer.retryWithResult(
() ->
service
.blockingStub()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class InternalNexusOperationContext {
private final String namespace;
Expand All @@ -14,7 +18,20 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
Link startWorkflowResponseLink;
// Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the
// workflow client can attach them to the outgoing requests it issues (e.g. signal,
// signalWithStart) via the request's links field.
private List<Link> nexusOperationLinks = Collections.emptyList();
// Backlinks returned by outbound RPCs the operation handler issues (such as
// SignalWorkflowExecutionResponse.link or SignalWithStartWorkflowExecutionResponse.signal_link).
// One entry per outbound RPC that returned a link. Drained
// by the task handler when building StartOperationResponse so each RPC the handler issued gets a
// corresponding link on the caller workflow's history event.
//
// This context is only safe for use from the single thread that runs the operation handler (the
// Nexus task executor's thread); the backing ArrayList is not synchronized. Handlers must not
// mutate it from other threads.
private final List<Link> responseBacklinks = new ArrayList<>();

public InternalNexusOperationContext(
String namespace,
Expand Down Expand Up @@ -60,12 +77,39 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
/**
* Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached
* to RPCs issued by the operation handler.
*/
Comment thread
Evanthx marked this conversation as resolved.
public void setNexusOperationLinks(List<Link> links) {
this.nexusOperationLinks = links == null ? Collections.emptyList() : links;
}

public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
/** Links from the inbound Nexus task; empty if none. */
public @Nonnull List<Link> getNexusOperationLinks() {
return Collections.unmodifiableList(nexusOperationLinks);
}

/**
* Append a backlink returned by an outbound RPC the operation handler issued (e.g. signal,
* signalWithStart, etc). The task handler drains the list when building the operation's
* StartOperationResponse.
*/
public void addBacklink(Link link) {
if (link != null) {
this.responseBacklinks.add(link);
}
}

/**
* Backlinks from every outbound RPC the handler issued. Returned as an unmodifiable view; callers
* must not attempt to mutate. Entries are accumulated while the operation handler runs (the call
* that flows through {@link
* io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor#startOperation}) and are
* drained afterward by the task handler when building the StartOperationResponse.
*/
public @Nonnull List<Link> getBacklinks() {
return Collections.unmodifiableList(responseBacklinks);
}

private class NexusOperationContextImpl implements NexusOperationContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.LinkConverter;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
Expand Down Expand Up @@ -284,6 +285,10 @@ private StartOperationResponse handleStartOperation(
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
// Stash the inbound links in common.v1.Link form on the operation context so the RPCs the
// handler issues (e.g. signal, signalWithStart, etc) can attach them to their
// request's links field.
List<io.temporal.api.common.v1.Link> inboundCommonLinks = new ArrayList<>();
task.getLinksList()
.forEach(
link -> {
Expand All @@ -296,7 +301,23 @@ private StartOperationResponse handleStartOperation(
"Invalid link URL: " + link.getUrl(),
e);
}
// LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of
// other shapes (e.g. non-temporal URLs) come back null and are intentionally not
// forwarded onto the RPCs the handler issues, which require the WorkflowEvent
// variant. Log so a debugging session can see what was dropped.
io.temporal.api.common.v1.Link commonLink =
LinkConverter.nexusLinkToWorkflowEvent(link);
if (commonLink != null) {
inboundCommonLinks.add(commonLink);
} else {
log.warn(
Comment thread
Evanthx marked this conversation as resolved.
"Dropping inbound Nexus link from outbound link propagation: type='{}',"
+ " url='{}' (not a parseable temporal WorkflowEvent link)",
link.getType(),
link.getUrl());
}
});
CurrentNexusOperationContext.get().setNexusOperationLinks(inboundCommonLinks);

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -307,10 +328,28 @@ private StartOperationResponse handleStartOperation(
try {
OperationStartResult<HandlerResultContent> result =
startOperation(context, operationStartDetails.build(), input.build());
// If any RPCs the handler issued (e.g. signal, signalWithStart, etc) returned
// backlinks, propagate them to the caller so the caller workflow's history event links to
// each event on the callee. Same set of backlinks applies to both sync and async response
// variants.
List<io.temporal.api.nexus.v1.Link> backlinks = new ArrayList<>();
for (io.temporal.api.common.v1.Link backlink :
CurrentNexusOperationContext.get().getBacklinks()) {
if (!backlink.hasWorkflowEvent()) {
continue;
}
io.temporal.api.nexus.v1.Link converted =
LinkConverter.workflowEventToNexusLink(backlink.getWorkflowEvent());
if (converted != null) {
backlinks.add(converted);
}
}

if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.addAllLinks(backlinks)
.build());
} else {
startResponseBuilder.setAsyncSuccess(
Expand All @@ -326,6 +365,7 @@ private StartOperationResponse handleStartOperation(
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.addAllLinks(backlinks)
.build());
}
} catch (OperationException e) {
Comment thread
Evanthx marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package io.temporal.nexus;

import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;

import io.nexusrpc.handler.*;
import io.nexusrpc.handler.OperationHandler;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.client.WorkflowClient;
import io.temporal.internal.client.NexusStartWorkflowRequest;
import io.temporal.internal.client.NexusStartWorkflowResponse;
import io.temporal.internal.nexus.CurrentNexusOperationContext;
import io.temporal.internal.nexus.InternalNexusOperationContext;
import io.temporal.internal.nexus.OperationTokenUtil;
import java.net.URISyntaxException;

class WorkflowRunOperationImpl<T, R> implements OperationHandler<T, R> {
private final WorkflowHandleFactory<T, R> handleFactory;
Expand All @@ -40,38 +33,9 @@ public OperationStartResult<R> start(

NexusStartWorkflowResponse nexusStartWorkflowResponse =
handle.getInvoker().invoke(nexusRequest);
WorkflowExecution workflowExec = nexusStartWorkflowResponse.getWorkflowExecution();

// If the start workflow response returned a link use it, otherwise
Comment thread
Evanthx marked this conversation as resolved.
// create the link information about the new workflow and return to the caller.
Link.WorkflowEvent workflowEventLink =
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
: null;
if (workflowEventLink == null) {
workflowEventLink =
Link.WorkflowEvent.newBuilder()
.setNamespace(nexusCtx.getNamespace())
.setWorkflowId(workflowExec.getWorkflowId())
.setRunId(workflowExec.getRunId())
.setEventRef(
Link.WorkflowEvent.EventReference.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
.build();
}
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
// Attach the link to the operation result.
OperationStartResult.Builder<R> result =
OperationStartResult.newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken());
if (nexusLink != null) {
try {
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
} catch (URISyntaxException e) {
// Not expected as the link is constructed by the SDK.
throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e);
}
}
return result.build();
return OperationStartResult.<R>newAsyncBuilder(nexusStartWorkflowResponse.getOperationToken())
.build();
}

@Override
Expand Down
Loading
Loading