diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java
new file mode 100644
index 00000000000..353c8345423
--- /dev/null
+++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java
@@ -0,0 +1,41 @@
+package datadog.trace.bootstrap.instrumentation.reactivestreams;
+
+import datadog.context.Context;
+
+/**
+ * Value of the {@code (Publisher, HandoffContext)} store used to hand a context from a publisher's
+ * subscribe to that publisher's own subscriber (or a blocking call).
+ *
+ *
{@link #threadConfined} deposits are only adopted on the producing thread. The reactor-core
+ * bridge uses them because a shared publisher — a multicast/replay {@code Sinks.Many} with several
+ * consumers — is subscribed concurrently, and keying by publisher identity alone would let one
+ * consumer adopt another's context. This is safe because a subscribe chain runs synchronously on
+ * one thread. {@link #anyThread} deposits are adopted anywhere, for producers (resilience4j,
+ * spring-webflux, spring-messaging) that attach to a unique publisher subscribed later, possibly on
+ * another thread.
+ */
+public final class HandoffContext {
+
+ private static final long ANY_THREAD = 0L;
+
+ private final long threadId;
+ private final Context context;
+
+ private HandoffContext(final Context context, final long threadId) {
+ this.context = context;
+ this.threadId = threadId;
+ }
+
+ public static HandoffContext anyThread(final Context context) {
+ return new HandoffContext(context, ANY_THREAD);
+ }
+
+ public static HandoffContext threadConfined(final Context context) {
+ return new HandoffContext(context, Thread.currentThread().getId());
+ }
+
+ /** The context, or {@code null} if this is a thread-confined deposit read on another thread. */
+ public Context contextForCurrentThread() {
+ return threadId == ANY_THREAD || threadId == Thread.currentThread().getId() ? context : null;
+ }
+}
diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java
index 77c59d8799e..0636a144017 100644
--- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java
+++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java
@@ -13,6 +13,7 @@
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -55,7 +56,7 @@ public static ContextScope onSubscribe(
return ReactiveStreamsContextPropagation.captureOnSubscribe(
self,
s,
- InstrumentationContext.get(Publisher.class, Context.class),
+ InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}
diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java
index 22c9ed9ac2b..32bdf7b3299 100644
--- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java
+++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java
@@ -3,6 +3,7 @@
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -13,7 +14,7 @@ private ReactiveStreamsContextPropagation() {}
public static ContextScope captureOnSubscribe(
final Publisher> publisher,
final Subscriber> subscriber,
- final ContextStore publisherContexts,
+ final ContextStore publisherContexts,
final ContextStore subscriberContexts) {
// Don't consume the publisher context until we've verified the subscriber is non-null. For
// subscribe(null), Reactive Streams mandates an NPE after this advice returns. Consuming the
@@ -22,7 +23,8 @@ public static ContextScope captureOnSubscribe(
return null;
}
- final Context contextFromPublisher = publisherContexts.remove(publisher);
+ final HandoffContext handoff = publisherContexts.remove(publisher);
+ final Context contextFromPublisher = handoff == null ? null : handoff.contextForCurrentThread();
final Context activeContext = Context.current();
final Context context = contextFromPublisher != null ? contextFromPublisher : activeContext;
if (context == Context.root()) {
diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java
index 80b92265419..9749b838254 100644
--- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java
+++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java
@@ -6,6 +6,7 @@
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,7 +32,7 @@ public String[] helperClassNames() {
public Map contextStore() {
final Map store = new HashMap<>();
store.put("org.reactivestreams.Subscriber", Context.class.getName());
- store.put("org.reactivestreams.Publisher", Context.class.getName());
+ store.put("org.reactivestreams.Publisher", HandoffContext.class.getName());
return store;
}
diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java
index 177adf279e7..46b0e241e48 100644
--- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java
+++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java
@@ -8,6 +8,7 @@
import datadog.context.ContextKey;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.IdentityHashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
@@ -23,12 +24,12 @@ class ReactiveStreamsContextPropagationTest {
void publisherCapturedContextOverridesActiveContext() {
final Publisher publisher = subscriber -> {};
final Subscriber subscriber = new NoopSubscriber();
- final ContextStore publisherContexts = new MapContextStore<>();
+ final ContextStore publisherContexts = new MapContextStore<>();
final ContextStore subscriberContexts = new MapContextStore<>();
- // A context was captured on the publisher (e.g. at assembly / cross-thread subscribe).
+ // A context was handed off on the publisher, confined to this (the producing) thread.
final Context captured = Context.root().with(KEY, "captured");
- publisherContexts.put(publisher, captured);
+ publisherContexts.put(publisher, HandoffContext.threadConfined(captured));
// The current thread already carries a different, non-root active context.
final Context active = Context.root().with(KEY, "active");
@@ -52,11 +53,66 @@ void publisherCapturedContextOverridesActiveContext() {
assertSame(active, Context.current());
}
- // The captured context is remembered for the subscriber, and removed from the publisher store.
+ // The captured context is remembered for the subscriber, and consumed from the publisher store.
assertSame(captured, subscriberContexts.get(subscriber));
assertNull(publisherContexts.get(publisher));
}
+ @Test
+ void publisherContextFromAnotherThreadIsIgnored() throws InterruptedException {
+ // A thread-confined deposit from another thread (concurrent multicast subscribe) must be
+ // ignored.
+ final Publisher publisher = subscriber -> {};
+ final Subscriber subscriber = new NoopSubscriber();
+ final ContextStore publisherContexts = new MapContextStore<>();
+ final ContextStore subscriberContexts = new MapContextStore<>();
+
+ final Context foreign = Context.root().with(KEY, "foreign");
+ final Thread producer =
+ new Thread(() -> publisherContexts.put(publisher, HandoffContext.threadConfined(foreign)));
+ producer.start();
+ producer.join();
+
+ final Context active = Context.root().with(KEY, "active");
+ try (ContextScope activeScope = active.attach()) {
+ final ContextScope scope =
+ ReactiveStreamsContextPropagation.captureOnSubscribe(
+ publisher, subscriber, publisherContexts, subscriberContexts);
+ if (scope != null) {
+ scope.close();
+ }
+ }
+
+ // The foreign deposit is ignored; the subscriber keeps this thread's active context.
+ assertSame(active, subscriberContexts.get(subscriber));
+ }
+
+ @Test
+ void anyThreadPublisherContextIsAdoptedAcrossThreads() throws InterruptedException {
+ // An any-thread deposit (resilience4j/spring-messaging: attached early, subscribed later) is
+ // adopted even when the subscribe runs on a different thread.
+ final Publisher publisher = subscriber -> {};
+ final Subscriber subscriber = new NoopSubscriber();
+ final ContextStore publisherContexts = new MapContextStore<>();
+ final ContextStore subscriberContexts = new MapContextStore<>();
+
+ final Context captured = Context.root().with(KEY, "captured");
+ final Thread producer =
+ new Thread(() -> publisherContexts.put(publisher, HandoffContext.anyThread(captured)));
+ producer.start();
+ producer.join();
+
+ final ContextScope scope =
+ ReactiveStreamsContextPropagation.captureOnSubscribe(
+ publisher, subscriber, publisherContexts, subscriberContexts);
+ if (scope != null) {
+ scope.close();
+ }
+
+ // The any-thread deposit is adopted despite the cross-thread subscribe.
+ assertSame(captured, subscriberContexts.get(subscriber));
+ }
+
@Test
void signalActivationIsSkippedWhenAnotherContextIsActive() {
final Subscriber subscriber = new NoopSubscriber();
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy
index 260f5af9f9a..09823e83e9e 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy
@@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification {
"immediate" | Schedulers.immediate()
}
+ def "subscribe-time context propagates across threads with #name"() {
+ // Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread
+ // propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still
+ // be children of the subscribe-time parent.
+ when:
+ runUnderTrace("parent") {
+ pipeline.call().collectList().block()
+ }
+
+ then:
+ assertTraces(1) {
+ trace(3) {
+ sortSpansByStart()
+ span {
+ operationName "parent"
+ parent()
+ }
+ span {
+ operationName "addOne"
+ childOf span(0)
+ }
+ span {
+ operationName "addOne"
+ childOf span(0)
+ }
+ }
+ }
+
+ where:
+ name | pipeline
+ "publishOn" | {
+ Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne)
+ }
+ "subscribeOn" | {
+ Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne)
+ }
+ "subscribeOn+publishOn" | {
+ Flux.just(1, 2)
+ .subscribeOn(Schedulers.single())
+ .publishOn(Schedulers.parallel())
+ .map(addOne)
+ }
+ }
+
def "Context propagation through reactor context with span #spanType"() {
when:
runUnderTrace("parent", {
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java
index e68665cc4bd..6e0bb195f9c 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java
@@ -5,10 +5,10 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
-import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -42,7 +42,7 @@ public static class BlockingAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope before(@Advice.This final Publisher self) {
return ReactorContextBridge.activateForBlocking(
- self, InstrumentationContext.get(Publisher.class, Context.class));
+ self, InstrumentationContext.get(Publisher.class, HandoffContext.class));
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java
index f27fc0ebd35..11e025b0978 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java
@@ -13,6 +13,7 @@
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -58,7 +59,7 @@ public static ContextScope before(
return ReactorContextBridge.captureOnSubscribe(
self,
subscriber,
- InstrumentationContext.get(Publisher.class, Context.class),
+ InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java
index af7743c6fff..73c963dd898 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java
@@ -10,6 +10,7 @@
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -54,7 +55,7 @@ public static void onSubscribe(
self,
arg,
s,
- InstrumentationContext.get(Publisher.class, Context.class),
+ InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}
}
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java
index 7d1e3b523af..df2f9a91a0c 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java
@@ -4,6 +4,7 @@
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
@@ -48,38 +49,43 @@ public static ContextScope activateStoredContext(
/**
* On subscribe, hands the explicit context recorded for {@code subscriber} (a context-writing
* subscriber) to the publisher store so the reactive-streams layer can propagate it, and attaches
- * it.
+ * it. The deposit is {@linkplain HandoffContext#threadConfined thread-confined} since the
+ * subscribed publisher may be a concurrently-subscribed shared sink.
*/
public static ContextScope captureOnSubscribe(
final Publisher> publisher,
final Subscriber> subscriber,
- final ContextStore publisherContexts,
+ final ContextStore publisherContexts,
final ContextStore subscriberContexts) {
final Context context = subscriberContexts.get(subscriber);
if (context == null) {
return null;
}
- publisherContexts.put(publisher, context);
+ publisherContexts.put(publisher, HandoffContext.threadConfined(context));
return attachIfRequired(context, Context.current());
}
public static ContextScope activateForBlocking(
- final Publisher> publisher, final ContextStore publisherContexts) {
- return attachIfRequired(publisherContexts.get(publisher), Context.current());
+ final Publisher> publisher,
+ final ContextStore publisherContexts) {
+ final HandoffContext handoff = publisherContexts.get(publisher);
+ return attachIfRequired(
+ handoff == null ? null : handoff.contextForCurrentThread(), Context.current());
}
public static void transferToOptimizedSubscriber(
final Publisher> publisher,
final Subscriber> source,
final Subscriber> target,
- final ContextStore publisherContexts,
+ final ContextStore publisherContexts,
final ContextStore subscriberContexts) {
if (source == null || target == null) {
return;
}
- Context context = publisherContexts.get(publisher);
+ final HandoffContext handoff = publisherContexts.get(publisher);
+ Context context = handoff == null ? null : handoff.contextForCurrentThread();
if (context == null) {
context = subscriberContexts.get(source);
}
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java
index d729c5a7947..14bd109c458 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java
@@ -8,6 +8,7 @@
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -33,7 +34,7 @@ public String[] helperClassNames() {
public Map contextStore() {
final Map store = new HashMap<>();
store.put("org.reactivestreams.Subscriber", Context.class.getName());
- store.put("org.reactivestreams.Publisher", Context.class.getName());
+ store.put("org.reactivestreams.Publisher", HandoffContext.class.getName());
return store;
}
diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy
index 6a2dfcd14e4..f2654e4e1a4 100644
--- a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy
+++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy
@@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification {
"immediate" | Schedulers.immediate()
}
+ def "subscribe-time context propagates across threads with #name"() {
+ // Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread
+ // propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still
+ // be children of the subscribe-time parent.
+ when:
+ runUnderTrace("parent") {
+ pipeline.call().collectList().block()
+ }
+
+ then:
+ assertTraces(1) {
+ trace(3) {
+ sortSpansByStart()
+ span {
+ operationName "parent"
+ parent()
+ }
+ span {
+ operationName "addOne"
+ childOf span(0)
+ }
+ span {
+ operationName "addOne"
+ childOf span(0)
+ }
+ }
+ }
+
+ where:
+ name | pipeline
+ "publishOn" | {
+ Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne)
+ }
+ "subscribeOn" | {
+ Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne)
+ }
+ "subscribeOn+publishOn" | {
+ Flux.just(1, 2)
+ .subscribeOn(Schedulers.single())
+ .publishOn(Schedulers.parallel())
+ .map(addOne)
+ }
+ }
+
def "Context propagation through reactor context with span #spanType"() {
when:
runUnderTrace("parent", {
diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java
index b45d865f34b..873d57f7bce 100644
--- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java
+++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java
@@ -4,9 +4,9 @@
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
@@ -40,7 +40,8 @@ public static void after(
result,
CircuitBreakerDecorator.DECORATE,
circuitBreaker,
- InstrumentationContext.get(Publisher.class, Context.class)::put);
+ ReactorHelper.putInto(
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)));
}
}
}
diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java
index 8a87951497a..33e8d636906 100644
--- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java
+++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java
@@ -5,9 +5,9 @@
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import io.github.resilience4j.core.functions.CheckedSupplier;
import java.util.function.Function;
import net.bytebuddy.asm.Advice;
@@ -40,7 +40,9 @@ public static void after(
result =
ReactorHelper.wrapFunction(
- result, InstrumentationContext.get(Publisher.class, Context.class)::putIfAbsent);
+ result,
+ ReactorHelper.putIfAbsentInto(
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)));
}
// 2.0.0+
diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java
index 84b8c878b5e..bfb2bb7cdbe 100644
--- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java
+++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java
@@ -3,7 +3,9 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import datadog.context.ContextScope;
+import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -19,6 +21,20 @@ public class ReactorHelper {
private static final Logger log = LoggerFactory.getLogger(ReactorHelper.class);
+ // These build the hand-off BiConsumer here rather than in @Advice code on purpose: a lambda
+ // defined in advice desugars to an invokedynamic that cannot be linked once the advice is inlined
+ // into the (third-party) operator, and fails silently. anyThread: the span is attached at
+ // assembly and the publisher may be subscribed later on another thread.
+ public static BiConsumer, AgentSpan> putInto(
+ final ContextStore store) {
+ return (publisher, span) -> store.put(publisher, HandoffContext.anyThread(span));
+ }
+
+ public static BiConsumer, AgentSpan> putIfAbsentInto(
+ final ContextStore store) {
+ return (publisher, span) -> store.putIfAbsent(publisher, HandoffContext.anyThread(span));
+ }
+
public static Function, Publisher>> wrapFunction(
Function, Publisher>> operator,
BiConsumer, AgentSpan> attachContext) {
diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java
index f1d3266c1da..53d90f8919b 100644
--- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java
+++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java
@@ -1,9 +1,9 @@
package datadog.trace.instrumentation.resilience4j;
import com.google.auto.service.AutoService;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -33,7 +33,8 @@ public String[] helperClassNames() {
@Override
public Map contextStore() {
- return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ return Collections.singletonMap(
+ "org.reactivestreams.Publisher", HandoffContext.class.getName());
}
@Override
diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java
index d740571b9ae..6369e683971 100644
--- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java
+++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java
@@ -4,9 +4,9 @@
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import io.github.resilience4j.retry.Retry;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
@@ -39,7 +39,8 @@ public static void after(
result,
RetryDecorator.DECORATE,
retry,
- InstrumentationContext.get(Publisher.class, Context.class)::put);
+ ReactorHelper.putInto(
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)));
}
}
}
diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
index bf43161cce4..dc2020327f2 100644
--- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
+++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java
@@ -8,6 +8,7 @@
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
@@ -38,7 +39,8 @@ public KotlinAwareHandlerInstrumentation() {
@Override
public Map contextStore() {
- return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ return Collections.singletonMap(
+ "org.reactivestreams.Publisher", HandoffContext.class.getName());
}
@Override
@@ -58,8 +60,8 @@ public static class DoInvokeAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return Object result) {
if (result instanceof Publisher) {
- InstrumentationContext.get(Publisher.class, Context.class)
- .put((Publisher>) result, Context.current());
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)
+ .put((Publisher>) result, HandoffContext.anyThread(Context.current()));
}
}
}
diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java
index efcbab555d1..2f5cdcbeae8 100644
--- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java
+++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java
@@ -6,10 +6,10 @@
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DISPATCHER_HANDLE_HANDLER;
-import datadog.context.Context;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.function.Consumer;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
@@ -51,7 +51,8 @@ public static void methodExit(
final AgentSpan span = scope.span();
final Consumer finisher = new AdviceUtils.MonoSpanFinisher(span);
mono = mono.doOnError(finisher).doFinally(finisher);
- InstrumentationContext.get(Publisher.class, Context.class).put(mono, span);
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)
+ .put(mono, HandoffContext.anyThread(span));
}
scope.close();
// span finished in MonoSpanFinisher
diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java
index 9488b2e6950..94a340dbcd7 100644
--- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java
+++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java
@@ -7,9 +7,9 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Collections;
import java.util.Map;
@@ -24,7 +24,8 @@ public String instrumentedType() {
@Override
public Map contextStore() {
- return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ return Collections.singletonMap(
+ "org.reactivestreams.Publisher", HandoffContext.class.getName());
}
@Override
diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java
index 52fe8d67558..6cfd3f903d4 100644
--- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java
+++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java
@@ -1,8 +1,8 @@
package datadog.trace.instrumentation.springwebflux.server;
-import datadog.context.Context;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
import org.springframework.web.server.ServerWebExchange;
@@ -15,7 +15,8 @@ public static void methodExit(
@Advice.Return(readOnly = false) Mono mono) {
final AgentSpan span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
if (span != null && mono != null) {
- InstrumentationContext.get(Publisher.class, Context.class).put(mono, span);
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)
+ .put(mono, HandoffContext.anyThread(span));
}
}
}
diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java
index 592a594e624..326ddce3ff4 100644
--- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java
+++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java
@@ -5,10 +5,10 @@
import static datadog.trace.instrumentation.springwebflux.server.AdviceUtils.constructOperationName;
import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE;
-import datadog.context.Context;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpMethod;
@@ -72,7 +72,8 @@ public static void methodExit(
if (throwable != null) {
DECORATE.onError(scope, throwable);
} else if (mono != null) {
- InstrumentationContext.get(Publisher.class, Context.class).put(mono, scope.span());
+ InstrumentationContext.get(Publisher.class, HandoffContext.class)
+ .put(mono, HandoffContext.anyThread(scope.span()));
}
scope.close();
// span finished in SpanFinishingSubscriber
diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java
index f3196983e0a..7d0c9b48410 100644
--- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java
+++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java
@@ -9,9 +9,9 @@
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
-import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
+import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.description.type.TypeDescription;
@@ -33,7 +33,8 @@ public ElementMatcher hierarchyMatcher() {
@Override
public Map contextStore() {
- return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName());
+ return Collections.singletonMap(
+ "org.reactivestreams.Publisher", HandoffContext.class.getName());
}
@Override