Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
772725d
First pass at adding links for signalling
Evanthx May 22, 2026
9ce8f8e
Progress
Evanthx May 23, 2026
f15d07b
Added some tests
Evanthx May 26, 2026
3a171b6
First pass at adding links for signalling
Evanthx May 22, 2026
312e5a1
Progress
Evanthx May 23, 2026
a9ea1a0
Added some tests
Evanthx May 26, 2026
e0ac5bf
Merge remote-tracking branch 'refs/remotes/origin/signal-links' into …
Evanthx May 26, 2026
f459d3e
Pin proto submodule to standalone-nexus-op branch for signal-backlink…
Evanthx May 26, 2026
331fcce
Update temporal-sdk/src/main/java/io/temporal/internal/nexus/Internal…
Evanthx Jun 1, 2026
978cebd
Renamed methods to remove the word signal
Evanthx Jun 1, 2026
c5b4b28
Renamed a few more methods from Signal to Backlinks
Evanthx Jun 1, 2026
202e8fd
Adding a Nexus link converter helper method
Evanthx Jun 1, 2026
c77accb
Code review results
Evanthx Jun 1, 2026
68f16aa
Added a comment
Evanthx Jun 1, 2026
6d1b8c3
Some changes for PR comments
Evanthx Jun 2, 2026
ce7ef6e
Restoring try try block
Evanthx Jun 2, 2026
c99efd9
Removed cross-namespace test
Evanthx Jun 2, 2026
2b67010
Removed SANO links
Evanthx Jun 3, 2026
2a180a7
Fixed up a unit test
Evanthx Jun 3, 2026
6c1338c
Addressing PR comments
Evanthx Jun 4, 2026
42040a6
Addressing PR comments
Evanthx Jun 4, 2026
08084cf
Reset proto submodule to v1.62.12
Evanthx Jun 4, 2026
4fe3b60
Reviewing
Evanthx Jun 11, 2026
812d841
Renaming to RequestLink and ResponseLink
Evanthx Jun 15, 2026
ebc8a2e
change CLI server version
Evanthx Jun 18, 2026
37660a3
Adding a dynamic config to fix some tests to work with CHASM
Evanthx Jun 18, 2026
ef153cc
The CLI server has a new WORKFLOW_EXECUTION_SIGNALED entry that the i…
Evanthx Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:

- name: Start CLI server
env:
TEMPORAL_CLI_VERSION: 1.7.1-standalone-nexus-operations
TEMPORAL_CLI_VERSION: 1.7.2-standalone-nexus-operations
run: |
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
tar -xzf temporal_cli.tar.gz
Expand Down Expand Up @@ -112,11 +112,13 @@ jobs:
--dynamic-config-value frontend.ListWorkersEnabled=true \
--dynamic-config-value frontend.enableCancelWorkerPollsOnShutdown=true \
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
--dynamic-config-value 'callback.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
--dynamic-config-value frontend.activityAPIsEnabled=true \
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value activity.startDelayEnabled=true \
--dynamic-config-value nexusoperation.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableCHASMSignalBacklinks=true \
--dynamic-config-value history.enableTransitionHistory=true &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
e);
}
}
// If this start is being issued from inside a Nexus operation handler, stash only the
// forward operation->workflow link from the start response so NexusStartWorkflowHelper can
// attach it to the WorkflowExecutionStarted event. Unlike signal/signalWithStart, start
// deliberately does NOT add a response link here: the operation->workflow relationship is
// already captured by the forward link, so re-adding response.getLink() as a response link
// would duplicate it on the caller's history event. Do not "restore symmetry" by calling
// addResponseLink here.
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
}
Expand All @@ -120,6 +127,13 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
.setRequestId(UUID.randomUUID().toString())
.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));

// If this signal is being issued from inside a Nexus operation handler, forward the inbound
// Nexus task links so the SignalWorkflowExecution history event links back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
request.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks());
}

DataConverter dataConverterWitSignalContext =
clientOptions
.getDataConverter()
Expand All @@ -129,7 +143,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {

Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
inputArgs.ifPresent(request::setInput);
genericClient.signal(request.build());
SignalWorkflowExecutionResponse response = genericClient.signal(request.build());
// Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasLink()) {
CurrentNexusOperationContext.get().addResponseLink(response.getLink());
}
return new WorkflowSignalOutput();
}

Expand All @@ -148,17 +167,28 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
SignalWithStartWorkflowExecutionRequest request =
requestsHelper
.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null))
.build();
SignalWithStartWorkflowExecutionRequest.Builder requestBuilder =
requestsHelper.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null));
// If this signalWithStart is being issued from inside a Nexus operation handler, forward
// the inbound Nexus task links so both the WorkflowExecutionStarted and
// WorkflowExecutionSignaled events on the callee link back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getRequestLinks());
}
SignalWithStartWorkflowExecutionRequest request = requestBuilder.build();
SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
WorkflowExecution execution =
WorkflowExecution.newBuilder()
.setRunId(response.getRunId())
.setWorkflowId(request.getWorkflowId())
.build();
// Server >=1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasSignalLink()) {
CurrentNexusOperationContext.get().addResponseLink(response.getSignalLink());
}
// TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
// We should wire it when it's implemented server-side.
return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface GenericWorkflowClient {

StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request);

void signal(SignalWorkflowExecutionRequest request);
SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request);

SignalWithStartWorkflowExecutionResponse signalWithStart(
SignalWithStartWorkflowExecutionRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ private static Map<String, String> tagsForStartWorkflow(StartWorkflowExecutionRe
}

@Override
public void signal(SignalWorkflowExecutionRequest request) {
public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) {
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1)
.put(MetricsTag.SIGNAL_NAME, request.getSignalName())
.build();
Scope scope = metricsScope.tagged(tags);
grpcRetryer.retry(
return grpcRetryer.retryWithResult(
() ->
service
.blockingStub()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class InternalNexusOperationContext {
private final String namespace;
Expand All @@ -14,7 +18,25 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
// Link returned by the StartWorkflowExecution response when the operation is backed by a workflow
// (workflow-run operations). Read by NexusStartWorkflowHelper to attach the forward
// operation->workflow link, fabricating a WORKFLOW_EXECUTION_STARTED link when the server omits
// one. Distinct from the response links below.
Link startWorkflowResponseLink;
// Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the
// workflow client can attach them to the outgoing requests it issues (e.g. signal,
// signalWithStart) via the request's links field.
private List<Link> requestLinks = Collections.emptyList();
// Links returned by outbound RPCs the operation handler issues (such as
// SignalWorkflowExecutionResponse.link or SignalWithStartWorkflowExecutionResponse.signal_link).
// One entry per outbound RPC that returned a link. Drained
// by the task handler when building StartOperationResponse so each RPC the handler issued gets a
// corresponding link on the caller workflow's history event.
//
// A handler may issue RPCs from multiple threads, so every read and write of this list is guarded
// by responseLinksLock and getResponseLinks() returns a defensive copy taken under the lock.
private final Object responseLinksLock = new Object();
private final List<Link> responseLinks = new ArrayList<>();

public InternalNexusOperationContext(
String namespace,
Expand Down Expand Up @@ -60,6 +82,19 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

/**
* Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached
* to RPCs issued by the operation handler.
*/
Comment thread
Evanthx marked this conversation as resolved.
public void setRequestLinks(List<Link> links) {
this.requestLinks = links == null ? Collections.emptyList() : links;
}

/** Links from the inbound Nexus task; empty if none. */
public @Nonnull List<Link> getRequestLinks() {
return Collections.unmodifiableList(requestLinks);
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
}
Expand All @@ -68,6 +103,32 @@ public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
}

/**
* Append a response link returned by an outbound RPC the operation handler issued (e.g. signal,
* signalWithStart, etc). The task handler drains the list when building the operation's
* StartOperationResponse.
*/
public void addResponseLink(Link link) {
if (link != null) {
synchronized (responseLinksLock) {
responseLinks.add(link);
}
}
}

/**
* Response links from every outbound RPC the handler issued. Returned as an unmodifiable view;
* callers must not attempt to mutate. Entries are accumulated while the operation handler runs
* (the call that flows through {@link
* io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor#startOperation}) and are
* drained afterward by the task handler when building the StartOperationResponse.
*/
public @Nonnull List<Link> getResponseLinks() {
synchronized (responseLinksLock) {
return Collections.unmodifiableList(new ArrayList<>(responseLinks));
}
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public NexusOperationInfo getInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.LinkConverter;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
Expand Down Expand Up @@ -284,6 +285,10 @@ private StartOperationResponse handleStartOperation(
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
// Stash the inbound links in common.v1.Link form on the operation context so the RPCs the
// handler issues (e.g. signal, signalWithStart, etc) can attach them to their
// request's links field.
List<io.temporal.api.common.v1.Link> inboundCommonLinks = new ArrayList<>();
task.getLinksList()
.forEach(
link -> {
Expand All @@ -296,7 +301,23 @@ private StartOperationResponse handleStartOperation(
"Invalid link URL: " + link.getUrl(),
e);
}
// LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of
// other shapes (e.g. non-temporal URLs) come back null and are intentionally not
// forwarded onto the RPCs the handler issues, which require the WorkflowEvent
// variant. Log so a debugging session can see what was dropped.
io.temporal.api.common.v1.Link commonLink =
LinkConverter.nexusLinkToWorkflowEvent(link);
if (commonLink != null) {
inboundCommonLinks.add(commonLink);
} else {
log.warn(
Comment thread
Evanthx marked this conversation as resolved.
"Dropping inbound Nexus link from outbound link propagation: type='{}',"
+ " url='{}' (not a parseable temporal WorkflowEvent link)",
link.getType(),
link.getUrl());
}
});
CurrentNexusOperationContext.get().setRequestLinks(inboundCommonLinks);

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -307,10 +328,28 @@ private StartOperationResponse handleStartOperation(
try {
OperationStartResult<HandlerResultContent> result =
startOperation(context, operationStartDetails.build(), input.build());
// If any RPCs the handler issued (e.g. signal, signalWithStart, etc) returned
// response links, propagate them to the caller so the caller workflow's history event links
// to each event on the callee. Same set of response links applies to both sync and async
// response variants.
List<io.temporal.api.nexus.v1.Link> responseLinks = new ArrayList<>();
for (io.temporal.api.common.v1.Link responseLink :
CurrentNexusOperationContext.get().getResponseLinks()) {
if (!responseLink.hasWorkflowEvent()) {
continue;
}
io.temporal.api.nexus.v1.Link converted =
LinkConverter.workflowEventToNexusLink(responseLink.getWorkflowEvent());
if (converted != null) {
responseLinks.add(converted);
}
}

if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.addAllLinks(responseLinks)
.build());
} else {
startResponseBuilder.setAsyncSuccess(
Expand All @@ -326,6 +365,7 @@ private StartOperationResponse handleStartOperation(
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.addAllLinks(responseLinks)
.build());
}
} catch (OperationException e) {
Comment thread
Evanthx marked this conversation as resolved.
Expand Down
Loading
Loading