Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
import io.temporal.common.interceptors.ActivityClientInterceptor;
import io.temporal.common.interceptors.Header;
import io.temporal.internal.client.ActivityClientInternal;
import io.temporal.internal.client.ActivityHandleImpl;
import io.temporal.internal.client.RootActivityClientInvoker;
import io.temporal.internal.client.external.GenericWorkflowClientImpl;
Expand All @@ -27,7 +28,7 @@
* Implementation of {@link ActivityClient} that delegates calls through the activity interceptor
* chain and ultimately to the Temporal service.
*/
class ActivityClientImpl implements ActivityClient {
class ActivityClientImpl implements ActivityClient, ActivityClientInternal {

private final WorkflowServiceStubs stubs;
private final ActivityClientOptions options;
Expand Down Expand Up @@ -56,6 +57,11 @@ private static ActivityClientCallsInterceptor initializeClientInvoker(
return invoker;
}

@Override
public ActivityClientCallsInterceptor getInvoker() {
return invoker;
}

// ---- Interface-based start (Proc variants) ----

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.temporal.common.Experimental;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -116,19 +118,56 @@ <R> GetActivityResultOutput<R> getActivityResult(GetActivityResultInput<R> input
<R> CompletableFuture<GetActivityResultOutput<R>> getActivityResultAsync(
GetActivityResultInput<R> input);

/**
* Nexus completion-callback metadata propagated through {@link StartActivityInput} when an
* activity is scheduled from a Nexus operation handler.
*/
@Experimental
final class CompletionCallback {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think if this is the right way to propogate this information

private final String url;
private final Map<String, String> headers;

public CompletionCallback(String url, Map<String, String> headers) {
this.url = Objects.requireNonNull(url);
this.headers = Objects.requireNonNull(headers);
}

public String getUrl() {
return url;
}

public Map<String, String> getHeaders() {
return headers;
}
}

@Experimental
final class StartActivityInput {
private final String activityType;
private final List<Object> args;
private final StartActivityOptions options;
private final Header header;
private final @Nullable CompletionCallback completionCallback;
private final @Nullable List<io.nexusrpc.Link> links;

public StartActivityInput(
String activityType, List<Object> args, StartActivityOptions options, Header header) {
this(activityType, args, options, header, null, null);
}

public StartActivityInput(
String activityType,
List<Object> args,
StartActivityOptions options,
Header header,
@Nullable CompletionCallback completionCallback,
@Nullable List<io.nexusrpc.Link> links) {
this.activityType = activityType;
this.args = args;
this.options = options;
this.header = header;
this.completionCallback = completionCallback;
this.links = links;
}

public String getActivityType() {
Expand All @@ -146,16 +185,38 @@ public StartActivityOptions getOptions() {
public Header getHeader() {
return header;
}

@Nullable
public CompletionCallback getCompletionCallback() {
return completionCallback;
}

@Nullable
public List<io.nexusrpc.Link> getLinks() {
return links;
}
}

@Experimental
final class StartActivityOutput {
private final String activityId;
private final @Nullable String activityRunId;

/**
* Set by the invoker when the start request included a Nexus completion callback; null
* otherwise. Internal use; do not depend on this in user-facing code.
*/
private final @Nullable String nexusOperationToken;

public StartActivityOutput(String activityId, @Nullable String activityRunId) {
this(activityId, activityRunId, null);
}

public StartActivityOutput(
String activityId, @Nullable String activityRunId, @Nullable String nexusOperationToken) {
this.activityId = activityId;
this.activityRunId = activityRunId;
this.nexusOperationToken = nexusOperationToken;
}

public String getActivityId() {
Expand All @@ -166,6 +227,11 @@ public String getActivityId() {
public String getActivityRunId() {
return activityRunId;
}

@Nullable
public String getNexusOperationToken() {
return nexusOperationToken;
}
}

@Experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.temporal.internal.client;

import io.temporal.common.interceptors.ActivityClientCallsInterceptor;

/**
* Internal-only view of an {@code ActivityClient} that exposes the configured interceptor chain.
*
* <p>Lives in {@code io.temporal.internal.client} so that other internal SDK packages (e.g. {@code
* io.temporal.nexus}) can route a fully-constructed {@link
* ActivityClientCallsInterceptor.StartActivityInput} through the chain without bypassing
* user-registered interceptors or the metrics-tagged scope, and without forcing the concrete impl
* class to be public.
*/
public interface ActivityClientInternal {
ActivityClientCallsInterceptor getInvoker();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.temporal.internal.client;

import io.nexusrpc.Link;
import io.temporal.client.StartActivityOptions;
import io.temporal.common.Experimental;
import io.temporal.common.interceptors.Header;
import java.util.List;
import java.util.Map;

/**
* Request used to start an activity from a Nexus operation handler. Mirrors {@link
* NexusStartWorkflowRequest} but carries the activity-specific scheduling payload.
*/
@Experimental
public final class NexusStartActivityRequest {
private final String requestId;
private final String callbackUrl;
private final Map<String, String> callbackHeaders;
private final String taskQueue;
private final List<Link> links;
private final String activityType;
private final List<Object> args;
private final StartActivityOptions options;
private final Header header;

public NexusStartActivityRequest(
String requestId,
String callbackUrl,
Map<String, String> callbackHeaders,
String taskQueue,
List<Link> links,
String activityType,
List<Object> args,
StartActivityOptions options,
Header header) {
this.requestId = requestId;
this.callbackUrl = callbackUrl;
this.callbackHeaders = callbackHeaders;
this.taskQueue = taskQueue;
this.links = links;
this.activityType = activityType;
this.args = args;
this.options = options;
this.header = header;
}

public String getRequestId() {
return requestId;
}

public String getCallbackUrl() {
return callbackUrl;
}

public Map<String, String> getCallbackHeaders() {
return callbackHeaders;
}

public String getTaskQueue() {
return taskQueue;
}

public List<Link> getLinks() {
return links;
}

public String getActivityType() {
return activityType;
}

public List<Object> getArgs() {
return args;
}

public StartActivityOptions getOptions() {
return options;
}

public Header getHeader() {
return header;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.temporal.internal.client;

import io.temporal.common.Experimental;
import javax.annotation.Nullable;

/**
* Response returned from starting an activity via {@link NexusStartActivityRequest}. Mirrors {@link
* NexusStartWorkflowResponse}.
*/
@Experimental
public final class NexusStartActivityResponse {
private final String activityId;
private final @Nullable String runId;
private final String operationToken;

public NexusStartActivityResponse(
String activityId, @Nullable String runId, String operationToken) {
this.activityId = activityId;
this.runId = runId;
this.operationToken = operationToken;
}

public String getActivityId() {
return activityId;
}

@Nullable
public String getRunId() {
return runId;
}

public String getOperationToken() {
return operationToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.grpc.StatusRuntimeException;
import io.temporal.api.activity.v1.ActivityExecutionOutcome;
import io.temporal.api.common.v1.ActivityType;
import io.temporal.api.common.v1.Callback;
import io.temporal.api.common.v1.Link;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.errordetails.v1.ActivityExecutionAlreadyStartedFailure;
import io.temporal.api.sdk.v1.UserMetadata;
Expand All @@ -19,6 +21,8 @@
import io.temporal.common.interceptors.ActivityClientCallsInterceptor;
import io.temporal.internal.client.external.GenericWorkflowClient;
import io.temporal.internal.common.HeaderUtils;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.LinkConverter;
import io.temporal.internal.common.ProtoConverters;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SearchAttributesUtil;
Expand All @@ -28,14 +32,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Terminus of the activity interceptor chain. Implements all activity RPCs against the Temporal
* service.
*/
public class RootActivityClientInvoker implements ActivityClientCallsInterceptor {

private static final Logger log = LoggerFactory.getLogger(RootActivityClientInvoker.class);

private final GenericWorkflowClient genericClient;
private final ActivityClientOptions clientOptions;

Expand Down Expand Up @@ -104,6 +113,58 @@ public StartActivityOutput startActivity(StartActivityInput input) {
io.temporal.api.common.v1.Header grpcHeader = HeaderUtils.toHeaderGrpc(input.getHeader(), null);
request.setHeader(grpcHeader);

// Hoisted so it can be returned in StartActivityOutput when a Nexus callback is present.
String nexusOperationToken = null;
if (input.getCompletionCallback() != null) {
List<Link> protoLinks = null;
if (input.getLinks() != null) {
protoLinks =
input.getLinks().stream()
.map(
link -> {
if (io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor()
.getFullName()
.equals(link.getType())) {
io.temporal.api.nexus.v1.Link nexusLink =
io.temporal.api.nexus.v1.Link.newBuilder()
.setType(link.getType())
.setUrl(link.getUri().toString())
.build();
return LinkConverter.nexusLinkToWorkflowEvent(nexusLink);
} else {
log.warn("ignoring unsupported link data type: {}", link.getType());
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (!protoLinks.isEmpty()) {
request.addAllLinks(protoLinks);
} else {
protoLinks = null;
}
}
// Generate the operation token from the user-supplied activity ID and namespace so the
// dual OPERATION_ID + OPERATION_TOKEN headers can be injected before the start RPC fires.
try {
nexusOperationToken =
io.temporal.internal.nexus.OperationTokenUtil.generateActivityExecutionOperationToken(
options.getId(), clientOptions.getNamespace());
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
throw new io.nexusrpc.handler.HandlerException(
io.nexusrpc.handler.HandlerException.ErrorType.BAD_REQUEST,
"failed to generate activity operation token",
e);
}
Callback cb =
InternalUtils.buildNexusCallback(
input.getCompletionCallback().getUrl(),
input.getCompletionCallback().getHeaders(),
nexusOperationToken,
protoLinks);
request.addCompletionCallbacks(cb);
}

StartActivityExecutionResponse response;
try {
response = genericClient.startActivity(request.build());
Expand All @@ -121,7 +182,7 @@ public StartActivityOutput startActivity(StartActivityInput input) {
}

String runId = response.getRunId().isEmpty() ? null : response.getRunId();
return new StartActivityOutput(options.getId(), runId);
return new StartActivityOutput(options.getId(), runId, nexusOperationToken);
}

@Override
Expand Down
Loading
Loading