-
Notifications
You must be signed in to change notification settings - Fork 185
Sample code for Nexus Standalone #782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Evanthx
wants to merge
8
commits into
main
Choose a base branch
from
sano
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
656e254
Sample code for Nexus Standalone
Evanthx e5a7d50
Changed the SDK interface a bit, this changed to match
Evanthx c7ab51d
Updated to match SDK changes
Evanthx be6f58a
Update core/src/main/java/io/temporal/samples/nexusstandalone/Standal…
Evanthx 1df6eca
Updates from PR
Evanthx d61a229
Removed whitespace change
Evanthx 0528e90
Fixing IDs
Evanthx 402e014
Added getting a handle by ID
Evanthx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
85 changes: 85 additions & 0 deletions
85
core/src/main/java/io/temporal/samples/nexusstandalone/README.MD
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| ## Standalone Nexus Operations | ||
|
|
||
| > [!WARNING] | ||
| > Standalone Nexus operations are experimental and may be subject to backwards-incompatible | ||
| > changes. They require a Temporal server that implements and enables them via the dynamic configs | ||
| > shown below. | ||
| > | ||
| This sample shows how to invoke and manage **standalone Nexus operations** — Nexus operations | ||
| started directly by a client rather than from within a caller workflow. The long-running operation | ||
| (`startGreeting`) is backed by a `GreetingWorkflow` that blocks until it is cancelled or terminated; | ||
| the quick operation (`greet`) is synchronous and completes immediately. | ||
|
|
||
| `StandaloneClientStarter` runs each capability in turn: | ||
| 1. **Execute** an operation and read its result, both directly (`execute`) and via a handle | ||
| (`start` then `handle.getResult`). | ||
| 2. **Cancel** a running operation (`handle.cancel`). | ||
| 3. **Terminate** a running operation (`handle.terminate`). Operation-terminate is a known gap that | ||
| does not stop the backing workflow, so the sample also terminates the backing workflow by ID. | ||
| 4. **Visibility** — `list` operations with a status filter and `count` them (total and grouped) via | ||
| `NexusClient`. | ||
|
|
||
| ### Running | ||
|
|
||
| Start a Temporal server (version `1.7.2-standalone-nexus-operations`) with the standalone-Nexus dynamic configs enabled: | ||
|
|
||
| ```bash | ||
| temporal server start-dev \ | ||
| --dynamic-config-value nexusoperation.enableStandalone=true \ | ||
| --dynamic-config-value history.enableChasmCallbacks=true | ||
| ``` | ||
|
|
||
| Create the namespace and the Nexus endpoint: | ||
|
|
||
| ```bash | ||
| temporal operator nexus endpoint create \ | ||
| --name nexus-standalone-operation-endpoint \ | ||
| --target-namespace default \ | ||
| --target-task-queue nexusstandalone-handler-task-queue | ||
| ``` | ||
|
|
||
| In one terminal, start the handler worker: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusstandalone.handler.HandlerWorker | ||
| ``` | ||
|
|
||
| In a second terminal, run the starter: | ||
|
|
||
| ```bash | ||
| ./gradlew -q :core:execute -PmainClass=io.temporal.samples.nexusstandalone.StandaloneClientStarter | ||
| ``` | ||
|
|
||
| Expected output (operation IDs and Visibility counts will differ between runs): | ||
|
|
||
| ``` | ||
| execute() returned: Hello, execute! | ||
| start() id=73e77105-f7ec-4a1f-a24a-1f9a9cc87248 then getResult() returned: Hello, execute-via-handle! | ||
| Started 'to-cancel' id=12b554b5-d9f8-4f4f-9314-db508fd91999, requesting cancellation | ||
| Operation id=12b554b5-d9f8-4f4f-9314-db508fd91999 ended as expected after cancel: Nexus operation failed: operationId='12b554b5-d9f8-4f4f-9314-db508fd91999' | ||
| Started 'to-terminate' id=b1dae9d4-2d6b-45d6-ab3b-8725cc2cf6de, terminating | ||
| 'to-terminate' ended as expected after terminate: Nexus operation failed: operationId='b1dae9d4-2d6b-45d6-ab3b-8725cc2cf6de' | ||
| Terminated backing workflow greeting-to-terminate-ef71547a | ||
| List filtered to Completed returned 2 operation(s) | ||
| Total operation count: 4 | ||
| Grouped count total=4, groups: | ||
| group values=[[Canceled]] count=1 | ||
| group values=[[Completed]] count=2 | ||
| group values=[[Terminated]] count=1 | ||
| ``` | ||
|
|
||
| ### Cancellation vs. termination | ||
|
|
||
| A workflow-backed Nexus operation does **not** need any explicit cancel handling to be cancellable. | ||
| When you call `handle.cancel(...)`, the server delivers a cancellation request to the backing | ||
| workflow, which makes the blocking call (`Workflow.await` in `GreetingWorkflowImpl`) throw a | ||
| `CanceledFailure`; letting it propagate out of the workflow method ends both the workflow and the | ||
| operation as cancelled. Cancellation is **cooperative**, though: if the backing workflow caught and | ||
| ignored `CanceledFailure` (or did all of its waiting inside a detached cancellation scope), the | ||
| cancel request would have no effect and the operation would run until it completes or hits its | ||
| schedule-to-close timeout. | ||
|
|
||
| `handle.terminate(...)` is different. It forcefully closes the **operation** record, but currently | ||
| does **not** propagate to the backing workflow (a known gap) — the workflow keeps running and | ||
| nothing appears in its history. Until that gap is closed, terminate the backing workflow directly by | ||
| its workflow ID, as `StandaloneClientStarter.terminateBackingWorkflow` does. |
216 changes: 216 additions & 0 deletions
216
core/src/main/java/io/temporal/samples/nexusstandalone/StandaloneClientStarter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,216 @@ | ||
| package io.temporal.samples.nexusstandalone; | ||
|
|
||
| import io.temporal.client.NexusClient; | ||
| import io.temporal.client.NexusClientOptions; | ||
| import io.temporal.client.NexusOperationException; | ||
| import io.temporal.client.NexusOperationExecutionCount; | ||
| import io.temporal.client.NexusOperationExecutionMetadata; | ||
| import io.temporal.client.NexusOperationHandle; | ||
| import io.temporal.client.NexusServiceClient; | ||
| import io.temporal.client.StartNexusOperationOptions; | ||
| import io.temporal.client.WorkflowClient; | ||
| import io.temporal.samples.nexusstandalone.service.ClientOptions; | ||
| import io.temporal.samples.nexusstandalone.service.GreetingNexusService; | ||
| import io.temporal.samples.nexusstandalone.service.GreetingNexusService.GreetingInput; | ||
| import io.temporal.samples.nexusstandalone.service.GreetingNexusService.GreetingOutput; | ||
| import io.temporal.serviceclient.WorkflowServiceStubs; | ||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| // Sample client for standalone Nexus operations — operations started and managed directly by a | ||
| // client rather than from within a workflow. Each capability is shown in its own method, called in | ||
| // turn from main(): executing an operation and reading its result, cancelling and terminating an | ||
| // operation, and querying operations via Visibility. | ||
| public class StandaloneClientStarter { | ||
| private static final Logger logger = LoggerFactory.getLogger(StandaloneClientStarter.class); | ||
|
|
||
| // Must match the Nexus endpoint configured on the server (see README). | ||
| public static final String ENDPOINT_NAME = "nexus-standalone-operation-endpoint"; | ||
|
|
||
| public static void main(String[] args) throws Exception { | ||
| WorkflowClient client = ClientOptions.getWorkflowClient(); | ||
| WorkflowServiceStubs stubs = client.getWorkflowServiceStubs(); | ||
| String namespace = client.getOptions().getNamespace(); | ||
|
|
||
| // A single NexusClient is the entry point: it serves Visibility queries (list/count) and | ||
| // produces service-bound clients. | ||
| NexusClient nexusClient = NexusClient.newInstance(stubs, clientOptions(namespace)); | ||
| // Typed service client: dispatches operations by method reference on the service interface. | ||
| NexusServiceClient<GreetingNexusService> greetingClient = | ||
| nexusClient.newNexusServiceClient(GreetingNexusService.class, ENDPOINT_NAME); | ||
|
|
||
| demonstrateExecuteAndGettingHandleById(nexusClient, greetingClient); | ||
| demonstrateStartAndCancel(greetingClient); | ||
| demonstrateStartAndTerminate(greetingClient, client); | ||
| demonstrateVisibility(nexusClient); | ||
| } | ||
|
|
||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| // execute — run a standalone Nexus operation and return its result in one call. | ||
| // getHandle — reconnect to an existing operation by its ID and read its result. | ||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| private static void demonstrateExecuteAndGettingHandleById( | ||
| NexusClient nexusClient, NexusServiceClient<GreetingNexusService> greetingClient) | ||
| throws Exception { | ||
| // execute(...) starts the operation and blocks until it completes, returning the result in one | ||
| // call. Used here on the synchronous 'greet' operation. | ||
| String operationId = "execute-nexus"; | ||
| GreetingOutput executed = | ||
| greetingClient.execute( | ||
| GreetingNexusService::greet, basicOptions(operationId), new GreetingInput("execute")); | ||
| logger.info("execute() returned: {}", executed.getMessage()); | ||
|
|
||
| // Reconnect to that same operation purely by its ID — nothing below references the execute() | ||
| // call above. This is how a separate process (or a later run) addresses an operation it did not | ||
| // start: NexusClient.getHandle(operationId, runId, resultClass) returns a typed handle (pass a | ||
| // null runId to target the latest run). getResult() then blocks until the operation is closed; | ||
| // since this one already completed, it returns the stored result immediately. | ||
| NexusOperationHandle<GreetingOutput> handle = | ||
| nexusClient.getHandle(operationId, null, GreetingOutput.class); | ||
| GreetingOutput viaHandle = handle.getResult(); | ||
| logger.info( | ||
| "getHandle(id={}) then getResult() returned: {}", | ||
| handle.getNexusOperationId(), | ||
| viaHandle.getMessage()); | ||
| } | ||
|
|
||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| // start - launch a Nexus operation and immediately return. Does not wait for the result. | ||
| // cancel — cooperative for workflow-backed operations (see GreetingWorkflowImpl comment). | ||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| private static void demonstrateStartAndCancel( | ||
| NexusServiceClient<GreetingNexusService> nexusClient) throws Exception { | ||
| // The backing workflow blocks indefinitely — giving cancellation something to act on. | ||
| NexusOperationHandle<GreetingOutput> handle = | ||
| nexusClient.start( | ||
| GreetingNexusService::startGreeting, | ||
| basicOptions("start-and-cancel-nexus"), | ||
| new GreetingInput("start-and-cancel")); | ||
| logger.info("Started 'to-cancel' id={}, requesting cancellation", handle.getNexusOperationId()); | ||
| handle.cancel("standalone-nexus sample: cancel demo"); | ||
| // getResult() blocks until the operation reaches a terminal state. A cancelled operation | ||
| // reports completion by throwing NexusOperationException rather than returning a result. | ||
| try { | ||
| handle.getResult(); | ||
| logger.warn( | ||
| "Operation id={} unexpectedly returned a result after cancel", | ||
| handle.getNexusOperationId()); | ||
| } catch (NexusOperationException e) { | ||
| logger.info( | ||
| "Operation id={} ended as expected after cancel: {}", | ||
| handle.getNexusOperationId(), | ||
| e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| // start - launch a Nexus operation and immediately return. Does not wait for the result. | ||
| // terminate — forcefully closes the operation record. | ||
| // | ||
| // KNOWN FEATURE GAP: terminating a standalone Nexus operation terminates ONLY the operation | ||
| // record — it does NOT propagate to the backing workflow (unlike cancel, which does). The backing | ||
| // workflow keeps running and nothing appears in its history. Until the server closes this gap, | ||
| // terminate the backing workflow directly by its workflow ID to avoid orphaning it. | ||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| private static void demonstrateStartAndTerminate( | ||
| NexusServiceClient<GreetingNexusService> nexusClient, WorkflowClient client) { | ||
| String name = "to-terminate"; | ||
| NexusOperationHandle<GreetingOutput> handle = | ||
| nexusClient.start( | ||
| GreetingNexusService::startGreeting, | ||
| basicOptions(name + "-nexus"), | ||
| new GreetingInput(name)); | ||
| logger.info("Started 'to-terminate' id={}, terminating", handle.getNexusOperationId()); | ||
| handle.terminate("standalone-nexus sample: terminate demo"); | ||
| // As with cancel, getResult() blocks until the operation record closes; a terminated operation | ||
| // reports completion by throwing rather than returning a result. | ||
| try { | ||
| handle.getResult(); | ||
| logger.warn("'to-terminate' unexpectedly returned a result after terminate"); | ||
| } catch (NexusOperationException e) { | ||
| logger.info("'to-terminate' ended as expected after terminate: {}", e.getMessage()); | ||
| } | ||
| // Operation-terminate did not stop the backing workflow (see the gap note above), so terminate | ||
| // it directly by its ID. | ||
| terminateBackingWorkflow(client, name); | ||
| } | ||
|
|
||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| // Visibility — list (filtered) and count (total and grouped) standalone operations. | ||
| // ───────────────────────────────────────────────────────────────────────────────────────────── | ||
| private static void demonstrateVisibility(NexusClient visibilityClient) { | ||
| // list accepts a Temporal Visibility query to filter results. Here we filter by the built-in | ||
| // ExecutionStatus attribute. Note the value is the SHORT status name ("Completed", "Canceled", | ||
| // "Terminated", "Running", ...) — not the full NEXUS_OPERATION_EXECUTION_STATUS_* enum | ||
| // constant. | ||
| // Visibility query syntax (operators, fields, AND/OR) is documented at | ||
| // https://docs.temporal.io/visibility#list-filter . | ||
| String completedQuery = "ExecutionStatus = \"Completed\""; | ||
| List<NexusOperationExecutionMetadata> completed = | ||
| visibilityClient.listNexusOperationExecutions(completedQuery).collect(Collectors.toList()); | ||
| logger.info("List filtered to Completed returned {} operation(s)", completed.size()); | ||
|
|
||
| // count() with no query returns the total in the namespace. | ||
| NexusOperationExecutionCount total = visibilityClient.countNexusOperationExecutions(null); | ||
| logger.info("Total operation count: {}", total.getCount()); | ||
|
|
||
| // count() with a GROUP BY query returns aggregation groups (a count per group value). | ||
| NexusOperationExecutionCount grouped = | ||
| visibilityClient.countNexusOperationExecutions("GROUP BY ExecutionStatus"); | ||
| logger.info("Grouped count total={}, groups:", grouped.getCount()); | ||
| for (NexusOperationExecutionCount.AggregationGroup group : grouped.getGroups()) { | ||
| logger.info(" group values={} count={}", group.getGroupValues(), group.getCount()); | ||
| } | ||
| } | ||
|
|
||
| // ── helpers ────────────────────────────────────────────────────────────────────────────────── | ||
|
|
||
| private static NexusClientOptions clientOptions(String namespace) { | ||
| return NexusClientOptions.newBuilder().setNamespace(namespace).build(); | ||
| } | ||
|
|
||
| /** Builds the per-call options used to start a Nexus operation. */ | ||
| private static StartNexusOperationOptions basicOptions(String name) { | ||
| return StartNexusOperationOptions.newBuilder() | ||
| // Required: a namespace-unique operation ID. The SDK never generates one for you, so you | ||
| // must supply your own. | ||
| .setId(name) | ||
| // Total time the caller is willing to wait for the operation to complete, including any | ||
| // server-side retries. Defaults to none (bounded only by server limits) if not set. | ||
| .setScheduleToCloseTimeout(Duration.ofMinutes(5)) | ||
| // Other optional per-call options (not set here, shown for reference): | ||
| // .setScheduleToStartTimeout(...) — max time the start request may wait before a handler | ||
| // picks it up. Default: unset (no limit). | ||
| // .setStartToCloseTimeout(...) — max time for a single start attempt. Default: unset. | ||
| // .setTypedSearchAttributes(...) — Visibility search attributes to index the operation | ||
| // by; each attribute must be registered on the namespace first. Default: none. | ||
| // .setSummary(...) — short text shown in the UI and returned by | ||
| // describe().getStaticSummary(). Default: none. | ||
| // .setIdReusePolicy(...) — behavior when the ID was used by a previously CLOSED | ||
| // operation. Default: ALLOW_DUPLICATE (a new run may reuse the ID). | ||
| // .setIdConflictPolicy(...) — behavior when the ID belongs to a currently RUNNING | ||
| // operation. Default: FAIL (reject with NexusOperationAlreadyStartedException). | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
| * Terminates the backing workflow for {@code name} directly by its workflow ID. Needed because | ||
| * terminating a standalone Nexus operation is a known gap that does not propagate to the backing | ||
| * workflow. Best-effort: ignores the case where the workflow is already closed. | ||
| */ | ||
| private static void terminateBackingWorkflow(WorkflowClient client, String name) { | ||
| String workflowId = "greeting-" + name; | ||
| try { | ||
| client | ||
| .newUntypedWorkflowStub(workflowId) | ||
| .terminate("standalone-nexus sample: terminate orphaned backing workflow"); | ||
| logger.info("Terminated backing workflow {}", workflowId); | ||
| } catch (Exception e) { | ||
| logger.info( | ||
| "Backing workflow {} not terminated (already closed?): {}", workflowId, e.getMessage()); | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't merge this PR until
1.36.0is released