diff --git a/build.gradle b/build.gradle index 8e476ad7..4985ea22 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ subprojects { ext { otelVersion = '1.30.1' otelVersionAlpha = "${otelVersion}-alpha" - javaSDKVersion = '1.35.0' + javaSDKVersion = '1.36.0-SNAPSHOT' camelVersion = '3.22.1' jarVersion = '1.0.0' } diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/README.MD b/core/src/main/java/io/temporal/samples/nexusstandalone/README.MD new file mode 100644 index 00000000..bd86a0e9 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/README.MD @@ -0,0 +1,85 @@ +## Standalone Nexus Operations + +> [!WARNING] +> Standalone Nexus operations are experimental and may be subject to backwards-incompatible +> changes. They require a Temporal server that implements and enables them via the dynamic configs +> shown below. +> +This sample shows how to invoke and manage **standalone Nexus operations** — Nexus operations +started directly by a client rather than from within a caller workflow. The long-running operation +(`startGreeting`) is backed by a `GreetingWorkflow` that blocks until it is cancelled or terminated; +the quick operation (`greet`) is synchronous and completes immediately. + +`StandaloneClientStarter` runs each capability in turn: +1. **Execute** an operation and read its result, both directly (`execute`) and via a handle + (`start` then `handle.getResult`). +2. **Cancel** a running operation (`handle.cancel`). +3. **Terminate** a running operation (`handle.terminate`). Operation-terminate is a known gap that + does not stop the backing workflow, so the sample also terminates the backing workflow by ID. +4. **Visibility** — `list` operations with a status filter and `count` them (total and grouped) via + `NexusClient`. + +### Running + +Start a Temporal server (version `1.7.2-standalone-nexus-operations`) with the standalone-Nexus dynamic configs enabled: + +```bash +temporal server start-dev \ + --dynamic-config-value nexusoperation.enableStandalone=true \ + --dynamic-config-value history.enableChasmCallbacks=true +``` + +Create the namespace and the Nexus endpoint: + +```bash +temporal operator nexus endpoint create \ + --name nexus-standalone-operation-endpoint \ + --target-namespace default \ + --target-task-queue nexusstandalone-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusstandalone.handler.HandlerWorker +``` + +In a second terminal, run the starter: + +```bash +./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusstandalone.StandaloneClientStarter +``` + +Expected output (operation IDs and Visibility counts will differ between runs): + +``` +execute() returned: Hello, execute! +start() id=73e77105-f7ec-4a1f-a24a-1f9a9cc87248 then getResult() returned: Hello, execute-via-handle! +Started 'to-cancel' id=12b554b5-d9f8-4f4f-9314-db508fd91999, requesting cancellation +Operation id=12b554b5-d9f8-4f4f-9314-db508fd91999 ended as expected after cancel: Nexus operation failed: operationId='12b554b5-d9f8-4f4f-9314-db508fd91999' +Started 'to-terminate' id=b1dae9d4-2d6b-45d6-ab3b-8725cc2cf6de, terminating +'to-terminate' ended as expected after terminate: Nexus operation failed: operationId='b1dae9d4-2d6b-45d6-ab3b-8725cc2cf6de' +Terminated backing workflow greeting-to-terminate-ef71547a +List filtered to Completed returned 2 operation(s) +Total operation count: 4 +Grouped count total=4, groups: + group values=[[Canceled]] count=1 + group values=[[Completed]] count=2 + group values=[[Terminated]] count=1 +``` + +### Cancellation vs. termination + +A workflow-backed Nexus operation does **not** need any explicit cancel handling to be cancellable. +When you call `handle.cancel(...)`, the server delivers a cancellation request to the backing +workflow, which makes the blocking call (`Workflow.await` in `GreetingWorkflowImpl`) throw a +`CanceledFailure`; letting it propagate out of the workflow method ends both the workflow and the +operation as cancelled. Cancellation is **cooperative**, though: if the backing workflow caught and +ignored `CanceledFailure` (or did all of its waiting inside a detached cancellation scope), the +cancel request would have no effect and the operation would run until it completes or hits its +schedule-to-close timeout. + +`handle.terminate(...)` is different. It forcefully closes the **operation** record, but currently +does **not** propagate to the backing workflow (a known gap) — the workflow keeps running and +nothing appears in its history. Until that gap is closed, terminate the backing workflow directly by +its workflow ID, as `StandaloneClientStarter.terminateBackingWorkflow` does. diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/StandaloneClientStarter.java b/core/src/main/java/io/temporal/samples/nexusstandalone/StandaloneClientStarter.java new file mode 100644 index 00000000..1b15ef70 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/StandaloneClientStarter.java @@ -0,0 +1,216 @@ +package io.temporal.samples.nexusstandalone; + +import io.temporal.client.NexusClient; +import io.temporal.client.NexusClientOptions; +import io.temporal.client.NexusOperationException; +import io.temporal.client.NexusOperationExecutionCount; +import io.temporal.client.NexusOperationExecutionMetadata; +import io.temporal.client.NexusOperationHandle; +import io.temporal.client.NexusServiceClient; +import io.temporal.client.StartNexusOperationOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexusstandalone.service.ClientOptions; +import io.temporal.samples.nexusstandalone.service.GreetingNexusService; +import io.temporal.samples.nexusstandalone.service.GreetingNexusService.GreetingInput; +import io.temporal.samples.nexusstandalone.service.GreetingNexusService.GreetingOutput; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Sample client for standalone Nexus operations — operations started and managed directly by a +// client rather than from within a workflow. Each capability is shown in its own method, called in +// turn from main(): executing an operation and reading its result, cancelling and terminating an +// operation, and querying operations via Visibility. +public class StandaloneClientStarter { + private static final Logger logger = LoggerFactory.getLogger(StandaloneClientStarter.class); + + // Must match the Nexus endpoint configured on the server (see README). + public static final String ENDPOINT_NAME = "nexus-standalone-operation-endpoint"; + + public static void main(String[] args) throws Exception { + WorkflowClient client = ClientOptions.getWorkflowClient(); + WorkflowServiceStubs stubs = client.getWorkflowServiceStubs(); + String namespace = client.getOptions().getNamespace(); + + // A single NexusClient is the entry point: it serves Visibility queries (list/count) and + // produces service-bound clients. + NexusClient nexusClient = NexusClient.newInstance(stubs, clientOptions(namespace)); + // Typed service client: dispatches operations by method reference on the service interface. + NexusServiceClient greetingClient = + nexusClient.newNexusServiceClient(GreetingNexusService.class, ENDPOINT_NAME); + + demonstrateExecuteAndGettingHandleById(nexusClient, greetingClient); + demonstrateStartAndCancel(greetingClient); + demonstrateStartAndTerminate(greetingClient, client); + demonstrateVisibility(nexusClient); + } + + // ───────────────────────────────────────────────────────────────────────────────────────────── + // execute — run a standalone Nexus operation and return its result in one call. + // getHandle — reconnect to an existing operation by its ID and read its result. + // ───────────────────────────────────────────────────────────────────────────────────────────── + private static void demonstrateExecuteAndGettingHandleById( + NexusClient nexusClient, NexusServiceClient greetingClient) + throws Exception { + // execute(...) starts the operation and blocks until it completes, returning the result in one + // call. Used here on the synchronous 'greet' operation. + String operationId = "execute-nexus"; + GreetingOutput executed = + greetingClient.execute( + GreetingNexusService::greet, basicOptions(operationId), new GreetingInput("execute")); + logger.info("execute() returned: {}", executed.getMessage()); + + // Reconnect to that same operation purely by its ID — nothing below references the execute() + // call above. This is how a separate process (or a later run) addresses an operation it did not + // start: NexusClient.getHandle(operationId, runId, resultClass) returns a typed handle (pass a + // null runId to target the latest run). getResult() then blocks until the operation is closed; + // since this one already completed, it returns the stored result immediately. + NexusOperationHandle handle = + nexusClient.getHandle(operationId, null, GreetingOutput.class); + GreetingOutput viaHandle = handle.getResult(); + logger.info( + "getHandle(id={}) then getResult() returned: {}", + handle.getNexusOperationId(), + viaHandle.getMessage()); + } + + // ───────────────────────────────────────────────────────────────────────────────────────────── + // start - launch a Nexus operation and immediately return. Does not wait for the result. + // cancel — cooperative for workflow-backed operations (see GreetingWorkflowImpl comment). + // ───────────────────────────────────────────────────────────────────────────────────────────── + private static void demonstrateStartAndCancel( + NexusServiceClient nexusClient) throws Exception { + // The backing workflow blocks indefinitely — giving cancellation something to act on. + NexusOperationHandle handle = + nexusClient.start( + GreetingNexusService::startGreeting, + basicOptions("start-and-cancel-nexus"), + new GreetingInput("start-and-cancel")); + logger.info("Started 'to-cancel' id={}, requesting cancellation", handle.getNexusOperationId()); + handle.cancel("standalone-nexus sample: cancel demo"); + // getResult() blocks until the operation reaches a terminal state. A cancelled operation + // reports completion by throwing NexusOperationException rather than returning a result. + try { + handle.getResult(); + logger.warn( + "Operation id={} unexpectedly returned a result after cancel", + handle.getNexusOperationId()); + } catch (NexusOperationException e) { + logger.info( + "Operation id={} ended as expected after cancel: {}", + handle.getNexusOperationId(), + e.getMessage()); + } + } + + // ───────────────────────────────────────────────────────────────────────────────────────────── + // start - launch a Nexus operation and immediately return. Does not wait for the result. + // terminate — forcefully closes the operation record. + // + // KNOWN FEATURE GAP: terminating a standalone Nexus operation terminates ONLY the operation + // record — it does NOT propagate to the backing workflow (unlike cancel, which does). The backing + // workflow keeps running and nothing appears in its history. Until the server closes this gap, + // terminate the backing workflow directly by its workflow ID to avoid orphaning it. + // ───────────────────────────────────────────────────────────────────────────────────────────── + private static void demonstrateStartAndTerminate( + NexusServiceClient nexusClient, WorkflowClient client) { + String name = "to-terminate"; + NexusOperationHandle handle = + nexusClient.start( + GreetingNexusService::startGreeting, + basicOptions(name + "-nexus"), + new GreetingInput(name)); + logger.info("Started 'to-terminate' id={}, terminating", handle.getNexusOperationId()); + handle.terminate("standalone-nexus sample: terminate demo"); + // As with cancel, getResult() blocks until the operation record closes; a terminated operation + // reports completion by throwing rather than returning a result. + try { + handle.getResult(); + logger.warn("'to-terminate' unexpectedly returned a result after terminate"); + } catch (NexusOperationException e) { + logger.info("'to-terminate' ended as expected after terminate: {}", e.getMessage()); + } + // Operation-terminate did not stop the backing workflow (see the gap note above), so terminate + // it directly by its ID. + terminateBackingWorkflow(client, name); + } + + // ───────────────────────────────────────────────────────────────────────────────────────────── + // Visibility — list (filtered) and count (total and grouped) standalone operations. + // ───────────────────────────────────────────────────────────────────────────────────────────── + private static void demonstrateVisibility(NexusClient visibilityClient) { + // list accepts a Temporal Visibility query to filter results. Here we filter by the built-in + // ExecutionStatus attribute. Note the value is the SHORT status name ("Completed", "Canceled", + // "Terminated", "Running", ...) — not the full NEXUS_OPERATION_EXECUTION_STATUS_* enum + // constant. + // Visibility query syntax (operators, fields, AND/OR) is documented at + // https://docs.temporal.io/visibility#list-filter . + String completedQuery = "ExecutionStatus = \"Completed\""; + List completed = + visibilityClient.listNexusOperationExecutions(completedQuery).collect(Collectors.toList()); + logger.info("List filtered to Completed returned {} operation(s)", completed.size()); + + // count() with no query returns the total in the namespace. + NexusOperationExecutionCount total = visibilityClient.countNexusOperationExecutions(null); + logger.info("Total operation count: {}", total.getCount()); + + // count() with a GROUP BY query returns aggregation groups (a count per group value). + NexusOperationExecutionCount grouped = + visibilityClient.countNexusOperationExecutions("GROUP BY ExecutionStatus"); + logger.info("Grouped count total={}, groups:", grouped.getCount()); + for (NexusOperationExecutionCount.AggregationGroup group : grouped.getGroups()) { + logger.info(" group values={} count={}", group.getGroupValues(), group.getCount()); + } + } + + // ── helpers ────────────────────────────────────────────────────────────────────────────────── + + private static NexusClientOptions clientOptions(String namespace) { + return NexusClientOptions.newBuilder().setNamespace(namespace).build(); + } + + /** Builds the per-call options used to start a Nexus operation. */ + private static StartNexusOperationOptions basicOptions(String name) { + return StartNexusOperationOptions.newBuilder() + // Required: a namespace-unique operation ID. The SDK never generates one for you, so you + // must supply your own. + .setId(name) + // Total time the caller is willing to wait for the operation to complete, including any + // server-side retries. Defaults to none (bounded only by server limits) if not set. + .setScheduleToCloseTimeout(Duration.ofMinutes(5)) + // Other optional per-call options (not set here, shown for reference): + // .setScheduleToStartTimeout(...) — max time the start request may wait before a handler + // picks it up. Default: unset (no limit). + // .setStartToCloseTimeout(...) — max time for a single start attempt. Default: unset. + // .setTypedSearchAttributes(...) — Visibility search attributes to index the operation + // by; each attribute must be registered on the namespace first. Default: none. + // .setSummary(...) — short text shown in the UI and returned by + // describe().getStaticSummary(). Default: none. + // .setIdReusePolicy(...) — behavior when the ID was used by a previously CLOSED + // operation. Default: ALLOW_DUPLICATE (a new run may reuse the ID). + // .setIdConflictPolicy(...) — behavior when the ID belongs to a currently RUNNING + // operation. Default: FAIL (reject with NexusOperationAlreadyStartedException). + .build(); + } + + /** + * Terminates the backing workflow for {@code name} directly by its workflow ID. Needed because + * terminating a standalone Nexus operation is a known gap that does not propagate to the backing + * workflow. Best-effort: ignores the case where the workflow is already closed. + */ + private static void terminateBackingWorkflow(WorkflowClient client, String name) { + String workflowId = "greeting-" + name; + try { + client + .newUntypedWorkflowStub(workflowId) + .terminate("standalone-nexus sample: terminate orphaned backing workflow"); + logger.info("Terminated backing workflow {}", workflowId); + } catch (Exception e) { + logger.info( + "Backing workflow {} not terminated (already closed?): {}", workflowId, e.getMessage()); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingNexusServiceImpl.java b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingNexusServiceImpl.java new file mode 100644 index 00000000..ea003955 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingNexusServiceImpl.java @@ -0,0 +1,46 @@ +package io.temporal.samples.nexusstandalone.handler; + +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowRunOperation; +import io.temporal.samples.nexusstandalone.service.GreetingNexusService; + +// Implements the GreetingNexusService operations. startGreeting is backed by a workflow that blocks +// (so it runs long enough to be cancelled/terminated); greet is a synchronous handler that +// completes inline. +@ServiceImpl(service = GreetingNexusService.class) +public class GreetingNexusServiceImpl { + + // Workflow-backed asynchronous operation. WorkflowRunOperation.fromWorkflowMethod exposes a + // workflow as a Nexus operation: starting the operation starts the workflow, and the operation + // completes when the workflow returns. The workflow ID is derived deterministically from the + // input name so the client can address the backing workflow directly (the sample uses this to + // terminate it by ID — it is just the word "greeting-" plus a known string from the object). + @OperationImpl + public OperationHandler + startGreeting() { + return WorkflowRunOperation.fromWorkflowMethod( + (ctx, details, input) -> + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId("greeting-" + input.getName()) + .build()) + ::greet); + } + + // Synchronous operation: OperationHandler.sync runs the lambda inline and returns the result + // immediately, so the Nexus operation completes as part of the start call. + @OperationImpl + public OperationHandler + greet() { + return OperationHandler.sync( + (ctx, details, input) -> + new GreetingNexusService.GreetingOutput("Hello, " + input.getName() + "!")); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflow.java b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflow.java new file mode 100644 index 00000000..fc8c2bbb --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflow.java @@ -0,0 +1,14 @@ +package io.temporal.samples.nexusstandalone.handler; + +import io.temporal.samples.nexusstandalone.service.GreetingNexusService; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +// The workflow backing the startGreeting Nexus operation. It blocks indefinitely and never +// completes on its own, which keeps the backing standalone Nexus operation in a running state so +// the sample can demonstrate cancel/terminate against it. +@WorkflowInterface +public interface GreetingWorkflow { + @WorkflowMethod + GreetingNexusService.GreetingOutput greet(GreetingNexusService.GreetingInput input); +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflowImpl.java new file mode 100644 index 00000000..b5079ae4 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/GreetingWorkflowImpl.java @@ -0,0 +1,23 @@ +package io.temporal.samples.nexusstandalone.handler; + +import io.temporal.samples.nexusstandalone.service.GreetingNexusService; +import io.temporal.workflow.Workflow; +import org.slf4j.Logger; + +public class GreetingWorkflowImpl implements GreetingWorkflow { + private static final Logger logger = Workflow.getLogger(GreetingWorkflowImpl.class); + + @Override + public GreetingNexusService.GreetingOutput greet(GreetingNexusService.GreetingInput input) { + logger.info( + "Greeting workflow started for {}; blocking until cancelled or terminated", + input.getName()); + // This workflow exists only to keep the backing standalone Nexus operation in a running state + // long enough for the sample to demonstrate cancel/terminate. It blocks forever and never + // completes on its own. + Workflow.await(() -> false); + + throw Workflow.wrap( + new IllegalStateException("greeting workflow should never complete normally")); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/HandlerWorker.java new file mode 100644 index 00000000..4704e284 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/handler/HandlerWorker.java @@ -0,0 +1,24 @@ +package io.temporal.samples.nexusstandalone.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexusstandalone.service.ClientOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +// Worker that hosts the Nexus service implementation and the workflow backing its operation. The +// task queue must match the Nexus endpoint's target task queue (see README). +public class HandlerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "nexusstandalone-handler-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(); + + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + worker.registerNexusServiceImplementation(new GreetingNexusServiceImpl()); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/service/ClientOptions.java b/core/src/main/java/io/temporal/samples/nexusstandalone/service/ClientOptions.java new file mode 100644 index 00000000..967fca58 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/service/ClientOptions.java @@ -0,0 +1,28 @@ +package io.temporal.samples.nexusstandalone.service; + +import io.temporal.client.WorkflowClient; +import io.temporal.envconfig.ClientConfigProfile; +import io.temporal.serviceclient.WorkflowServiceStubs; + +/** + * Builds a {@link WorkflowClient} from the {@code default} profile loaded by {@link + * ClientConfigProfile#load()}. By default this reads the TOML file at {@code TEMPORAL_CONFIG_FILE}, + * or, if that is unset, {@code [user config dir]/temporalio/temporal.toml}. Point that profile at a + * different server or namespace — or override via {@code TEMPORAL_*} environment variables — to run + * against, for example, a Temporal Cloud namespace with an API key. + */ +public class ClientOptions { + + public static WorkflowClient getWorkflowClient() { + ClientConfigProfile profile; + try { + profile = ClientConfigProfile.load(); + } catch (Exception e) { + throw new RuntimeException("Failed to load client configuration", e); + } + + WorkflowServiceStubs service = + WorkflowServiceStubs.newServiceStubs(profile.toWorkflowServiceStubsOptions()); + return WorkflowClient.newInstance(service, profile.toWorkflowClientOptions()); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusstandalone/service/GreetingNexusService.java b/core/src/main/java/io/temporal/samples/nexusstandalone/service/GreetingNexusService.java new file mode 100644 index 00000000..f234e939 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusstandalone/service/GreetingNexusService.java @@ -0,0 +1,52 @@ +package io.temporal.samples.nexusstandalone.service; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; + +// Shared Nexus service definition for the standalone-Nexus sample. It declares two operations: +// - startGreeting: backed by a workflow that blocks (long-running), so the client can demonstrate +// cancel/terminate against an operation that is still running. +// - greet: synchronous, completes immediately, so the client can demonstrate execute. +@Service +public interface GreetingNexusService { + + class GreetingInput { + private final String name; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GreetingInput(@JsonProperty("name") String name) { + this.name = name; + } + + @JsonProperty("name") + public String getName() { + return name; + } + } + + class GreetingOutput { + private final String message; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GreetingOutput(@JsonProperty("message") String message) { + this.message = message; + } + + @JsonProperty("message") + public String getMessage() { + return message; + } + } + + // An asynchronous operation backed by a workflow that blocks indefinitely, so the operation stays + // running until the caller cancels or terminates it. + @Operation + GreetingOutput startGreeting(GreetingInput input); + + // A synchronous operation that completes immediately. Used to demonstrate execute, which blocks + // on the operation result. + @Operation + GreetingOutput greet(GreetingInput input); +}