Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:

- name: Start CLI server
env:
TEMPORAL_CLI_VERSION: 1.7.2-standalone-nexus-operations
TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down Expand Up @@ -112,13 +112,11 @@ jobs:
--dynamic-config-value frontend.ListWorkersEnabled=true \
--dynamic-config-value frontend.enableCancelWorkerPollsOnShutdown=true \
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
--dynamic-config-value 'callback.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value activity.startDelayEnabled=true \
--dynamic-config-value nexusoperation.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableCHASMSignalBacklinks=true \
--dynamic-config-value history.enableTransitionHistory=true &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
e);
}
}
// If this start is being issued from inside a Nexus operation handler, stash only the
// forward operation->workflow link from the start response so NexusStartWorkflowHelper can
// attach it to the WorkflowExecutionStarted event. Unlike signal/signalWithStart, start
// deliberately does NOT add a response link here: the operation->workflow relationship is
// already captured by the forward link, so re-adding response.getLink() as a response link
// would duplicate it on the caller's history event. Do not "restore symmetry" by calling
// addResponseLink here.
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
}
Expand All @@ -127,13 +120,6 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
.setRequestId(UUID.randomUUID().toString())
.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));

// If this signal is being issued from inside a Nexus operation handler, forward the inbound
// Nexus task links so the SignalWorkflowExecution history event links back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
request.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks());
}

DataConverter dataConverterWitSignalContext =
clientOptions
.getDataConverter()
Expand All @@ -143,12 +129,7 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {

Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
inputArgs.ifPresent(request::setInput);
SignalWorkflowExecutionResponse response = genericClient.signal(request.build());
// Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasLink()) {
CurrentNexusOperationContext.get().addResponseLink(response.getLink());
}
genericClient.signal(request.build());
return new WorkflowSignalOutput();
}

Expand All @@ -167,28 +148,17 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
SignalWithStartWorkflowExecutionRequest.Builder requestBuilder =
requestsHelper.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null));
// If this signalWithStart is being issued from inside a Nexus operation handler, forward
// the inbound Nexus task links so both the WorkflowExecutionStarted and
// WorkflowExecutionSignaled events on the callee link back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks());
}
SignalWithStartWorkflowExecutionRequest request = requestBuilder.build();
SignalWithStartWorkflowExecutionRequest request =
requestsHelper
.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null))
.build();
SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
WorkflowExecution execution =
WorkflowExecution.newBuilder()
.setRunId(response.getRunId())
.setWorkflowId(request.getWorkflowId())
.build();
// Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasSignalLink()) {
CurrentNexusOperationContext.get().addResponseLink(response.getSignalLink());
}
// TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
// We should wire it when it's implemented server-side.
return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface GenericWorkflowClient {

StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request);

SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request);
void signal(SignalWorkflowExecutionRequest request);

SignalWithStartWorkflowExecutionResponse signalWithStart(
SignalWithStartWorkflowExecutionRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ private static Map<String, String> tagsForStartWorkflow(StartWorkflowExecutionRe
}

@Override
public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) {
public void signal(SignalWorkflowExecutionRequest request) {
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1)
.put(MetricsTag.SIGNAL_NAME, request.getSignalName())
.build();
Scope scope = metricsScope.tagged(tags);
return grpcRetryer.retryWithResult(
grpcRetryer.retry(
() ->
service
.blockingStub()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class InternalNexusOperationContext {
private final String namespace;
Expand All @@ -18,25 +14,7 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
// Link returned by the StartWorkflowExecution response when the operation is backed by a workflow
// (workflow-run operations). Read by NexusStartWorkflowHelper to attach the forward
// operation->workflow link, fabricating a WORKFLOW_EXECUTION_STARTED link when the server omits
// one. Distinct from the response links below.
Link startWorkflowResponseLink;
// Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the
// workflow client can attach them to the outgoing requests it issues (e.g. signal,
// signalWithStart) via the request's links field.
private List<Link> requestLinks = Collections.emptyList();
// Links returned by outbound RPCs the operation handler issues (such as
// SignalWorkflowExecutionResponse.link or SignalWithStartWorkflowExecutionResponse.signal_link).
// One entry per outbound RPC that returned a link. Drained
// by the task handler when building StartOperationResponse so each RPC the handler issued gets a
// corresponding link on the caller workflow's history event.
//
// A handler may issue RPCs from multiple threads, so every read and write of this list is guarded
// by responseLinksLock and getResponseLinks() returns a defensive copy taken under the lock.
private final Object responseLinksLock = new Object();
private final List<Link> responseLinks = new ArrayList<>();

public InternalNexusOperationContext(
String namespace,
Expand Down Expand Up @@ -82,19 +60,6 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

/**
* Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached
* to RPCs issued by the operation handler.
*/
public void setRequestLinks(List<Link> links) {
this.requestLinks = links == null ? Collections.emptyList() : links;
}

/** Links from the inbound Nexus task; empty if none. */
public @Nonnull List<Link> getRequestLinks() {
return Collections.unmodifiableList(requestLinks);
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
}
Expand All @@ -103,32 +68,6 @@ public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
}

/**
* Append a response link returned by an outbound RPC the operation handler issued (e.g. signal,
* signalWithStart, etc). The task handler drains the list when building the operation's
* StartOperationResponse.
*/
public void addResponseLink(Link link) {
if (link != null) {
synchronized (responseLinksLock) {
responseLinks.add(link);
}
}
}

/**
* Response links from every outbound RPC the handler issued. Returned as an unmodifiable view;
* callers must not attempt to mutate. Entries are accumulated while the operation handler runs
* (the call that flows through {@link
* io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor#startOperation}) and are
* drained afterward by the task handler when building the StartOperationResponse.
*/
public @Nonnull List<Link> getResponseLinks() {
synchronized (responseLinksLock) {
return Collections.unmodifiableList(new ArrayList<>(responseLinks));
}
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public NexusOperationInfo getInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.LinkConverter;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
Expand Down Expand Up @@ -285,10 +284,6 @@ private StartOperationResponse handleStartOperation(
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
// Stash the inbound links in common.v1.Link form on the operation context so the RPCs the
// handler issues (e.g. signal, signalWithStart, etc) can attach them to their
// request's links field.
List<io.temporal.api.common.v1.Link> inboundCommonLinks = new ArrayList<>();
task.getLinksList()
.forEach(
link -> {
Expand All @@ -301,23 +296,7 @@ private StartOperationResponse handleStartOperation(
"Invalid link URL: " + link.getUrl(),
e);
}
// LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of
// other shapes (e.g. non-temporal URLs) come back null and are intentionally not
// forwarded onto the RPCs the handler issues, which require the WorkflowEvent
// variant. Log so a debugging session can see what was dropped.
io.temporal.api.common.v1.Link commonLink =
LinkConverter.nexusLinkToWorkflowEvent(link);
if (commonLink != null) {
inboundCommonLinks.add(commonLink);
} else {
log.warn(
"Dropping inbound Nexus link from outbound link propagation: type='{}',"
+ " url='{}' (not a parseable temporal WorkflowEvent link)",
link.getType(),
link.getUrl());
}
});
CurrentNexusOperationContext.get().setRequestLinks(inboundCommonLinks);

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -328,28 +307,10 @@ private StartOperationResponse handleStartOperation(
try {
OperationStartResult<HandlerResultContent> result =
startOperation(context, operationStartDetails.build(), input.build());
// If any RPCs the handler issued (e.g. signal, signalWithStart, etc) returned
// response links, propagate them to the caller so the caller workflow's history event links
// to each event on the callee. Same set of response links applies to both sync and async
// response variants.
List<io.temporal.api.nexus.v1.Link> responseLinks = new ArrayList<>();
for (io.temporal.api.common.v1.Link responseLink :
CurrentNexusOperationContext.get().getResponseLinks()) {
if (!responseLink.hasWorkflowEvent()) {
continue;
}
io.temporal.api.nexus.v1.Link converted =
LinkConverter.workflowEventToNexusLink(responseLink.getWorkflowEvent());
if (converted != null) {
responseLinks.add(converted);
}
}

if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.addAllLinks(responseLinks)
.build());
} else {
startResponseBuilder.setAsyncSuccess(
Expand All @@ -365,7 +326,6 @@ private StartOperationResponse handleStartOperation(
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.addAllLinks(responseLinks)
.build());
}
} catch (OperationException e) {
Expand Down
Loading
Loading