From c77fa1087f2da09597bfcdbb282dfc16c7980772 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 1 Jul 2026 11:19:16 +0200 Subject: [PATCH] Fix multicast context propagation race in reactor/reactive-streams --- .../reactivestreams/HandoffContext.java | 41 ++++++++++++ .../PublisherInstrumentation.java | 3 +- .../ReactiveStreamsContextPropagation.java | 6 +- .../ReactiveStreamsModule.java | 3 +- ...ReactiveStreamsContextPropagationTest.java | 64 +++++++++++++++++-- .../groovy/ReactorCoreTest.groovy | 44 +++++++++++++ .../BlockingPublisherInstrumentation.java | 4 +- .../core/CorePublisherInstrumentation.java | 3 +- .../OptimizableOperatorInstrumentation.java | 3 +- .../reactor/core/ReactorContextBridge.java | 20 ++++-- .../reactor/core/ReactorCoreModule.java | 3 +- .../src/test/groovy/ReactorCoreTest.groovy | 44 +++++++++++++ ...CircuitBreakerOperatorInstrumentation.java | 5 +- .../FallbackOperatorInstrumentation.java | 6 +- .../resilience4j/ReactorHelper.java | 16 +++++ .../Resilience4jReactorModule.java | 5 +- .../RetryOperatorInstrumentation.java | 5 +- .../KotlinAwareHandlerInstrumentation.java | 8 ++- .../server/DispatcherHandlerAdvice.java | 5 +- .../DispatcherHandlerInstrumentation.java | 5 +- .../server/HandleResultAdvice.java | 5 +- .../server/HandlerAdapterAdvice.java | 5 +- .../server/HandlerAdapterInstrumentation.java | 5 +- 23 files changed, 267 insertions(+), 41 deletions(-) create mode 100644 dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java new file mode 100644 index 00000000000..353c8345423 --- /dev/null +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/reactivestreams/HandoffContext.java @@ -0,0 +1,41 @@ +package datadog.trace.bootstrap.instrumentation.reactivestreams; + +import datadog.context.Context; + +/** + * Value of the {@code (Publisher, HandoffContext)} store used to hand a context from a publisher's + * subscribe to that publisher's own subscriber (or a blocking call). + * + *

{@link #threadConfined} deposits are only adopted on the producing thread. The reactor-core + * bridge uses them because a shared publisher — a multicast/replay {@code Sinks.Many} with several + * consumers — is subscribed concurrently, and keying by publisher identity alone would let one + * consumer adopt another's context. This is safe because a subscribe chain runs synchronously on + * one thread. {@link #anyThread} deposits are adopted anywhere, for producers (resilience4j, + * spring-webflux, spring-messaging) that attach to a unique publisher subscribed later, possibly on + * another thread. + */ +public final class HandoffContext { + + private static final long ANY_THREAD = 0L; + + private final long threadId; + private final Context context; + + private HandoffContext(final Context context, final long threadId) { + this.context = context; + this.threadId = threadId; + } + + public static HandoffContext anyThread(final Context context) { + return new HandoffContext(context, ANY_THREAD); + } + + public static HandoffContext threadConfined(final Context context) { + return new HandoffContext(context, Thread.currentThread().getId()); + } + + /** The context, or {@code null} if this is a thread-confined deposit read on another thread. */ + public Context contextForCurrentThread() { + return threadId == ANY_THREAD || threadId == Thread.currentThread().getId() ? context : null; + } +} diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java index 77c59d8799e..0636a144017 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java @@ -13,6 +13,7 @@ import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -55,7 +56,7 @@ public static ContextScope onSubscribe( return ReactiveStreamsContextPropagation.captureOnSubscribe( self, s, - InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Publisher.class, HandoffContext.class), InstrumentationContext.get(Subscriber.class, Context.class)); } diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java index 22c9ed9ac2b..32bdf7b3299 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java @@ -3,6 +3,7 @@ import datadog.context.Context; import datadog.context.ContextScope; import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -13,7 +14,7 @@ private ReactiveStreamsContextPropagation() {} public static ContextScope captureOnSubscribe( final Publisher publisher, final Subscriber subscriber, - final ContextStore publisherContexts, + final ContextStore publisherContexts, final ContextStore subscriberContexts) { // Don't consume the publisher context until we've verified the subscriber is non-null. For // subscribe(null), Reactive Streams mandates an NPE after this advice returns. Consuming the @@ -22,7 +23,8 @@ public static ContextScope captureOnSubscribe( return null; } - final Context contextFromPublisher = publisherContexts.remove(publisher); + final HandoffContext handoff = publisherContexts.remove(publisher); + final Context contextFromPublisher = handoff == null ? null : handoff.contextForCurrentThread(); final Context activeContext = Context.current(); final Context context = contextFromPublisher != null ? contextFromPublisher : activeContext; if (context == Context.root()) { diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java index 80b92265419..9749b838254 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java @@ -6,6 +6,7 @@ import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,7 +32,7 @@ public String[] helperClassNames() { public Map contextStore() { final Map store = new HashMap<>(); store.put("org.reactivestreams.Subscriber", Context.class.getName()); - store.put("org.reactivestreams.Publisher", Context.class.getName()); + store.put("org.reactivestreams.Publisher", HandoffContext.class.getName()); return store; } diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java index 177adf279e7..46b0e241e48 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java @@ -8,6 +8,7 @@ import datadog.context.ContextKey; import datadog.context.ContextScope; import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.IdentityHashMap; import java.util.Map; import org.junit.jupiter.api.Test; @@ -23,12 +24,12 @@ class ReactiveStreamsContextPropagationTest { void publisherCapturedContextOverridesActiveContext() { final Publisher publisher = subscriber -> {}; final Subscriber subscriber = new NoopSubscriber(); - final ContextStore publisherContexts = new MapContextStore<>(); + final ContextStore publisherContexts = new MapContextStore<>(); final ContextStore subscriberContexts = new MapContextStore<>(); - // A context was captured on the publisher (e.g. at assembly / cross-thread subscribe). + // A context was handed off on the publisher, confined to this (the producing) thread. final Context captured = Context.root().with(KEY, "captured"); - publisherContexts.put(publisher, captured); + publisherContexts.put(publisher, HandoffContext.threadConfined(captured)); // The current thread already carries a different, non-root active context. final Context active = Context.root().with(KEY, "active"); @@ -52,11 +53,66 @@ void publisherCapturedContextOverridesActiveContext() { assertSame(active, Context.current()); } - // The captured context is remembered for the subscriber, and removed from the publisher store. + // The captured context is remembered for the subscriber, and consumed from the publisher store. assertSame(captured, subscriberContexts.get(subscriber)); assertNull(publisherContexts.get(publisher)); } + @Test + void publisherContextFromAnotherThreadIsIgnored() throws InterruptedException { + // A thread-confined deposit from another thread (concurrent multicast subscribe) must be + // ignored. + final Publisher publisher = subscriber -> {}; + final Subscriber subscriber = new NoopSubscriber(); + final ContextStore publisherContexts = new MapContextStore<>(); + final ContextStore subscriberContexts = new MapContextStore<>(); + + final Context foreign = Context.root().with(KEY, "foreign"); + final Thread producer = + new Thread(() -> publisherContexts.put(publisher, HandoffContext.threadConfined(foreign))); + producer.start(); + producer.join(); + + final Context active = Context.root().with(KEY, "active"); + try (ContextScope activeScope = active.attach()) { + final ContextScope scope = + ReactiveStreamsContextPropagation.captureOnSubscribe( + publisher, subscriber, publisherContexts, subscriberContexts); + if (scope != null) { + scope.close(); + } + } + + // The foreign deposit is ignored; the subscriber keeps this thread's active context. + assertSame(active, subscriberContexts.get(subscriber)); + } + + @Test + void anyThreadPublisherContextIsAdoptedAcrossThreads() throws InterruptedException { + // An any-thread deposit (resilience4j/spring-messaging: attached early, subscribed later) is + // adopted even when the subscribe runs on a different thread. + final Publisher publisher = subscriber -> {}; + final Subscriber subscriber = new NoopSubscriber(); + final ContextStore publisherContexts = new MapContextStore<>(); + final ContextStore subscriberContexts = new MapContextStore<>(); + + final Context captured = Context.root().with(KEY, "captured"); + final Thread producer = + new Thread(() -> publisherContexts.put(publisher, HandoffContext.anyThread(captured))); + producer.start(); + producer.join(); + + final ContextScope scope = + ReactiveStreamsContextPropagation.captureOnSubscribe( + publisher, subscriber, publisherContexts, subscriberContexts); + if (scope != null) { + scope.close(); + } + + // The any-thread deposit is adopted despite the cross-thread subscribe. + assertSame(captured, subscriberContexts.get(subscriber)); + } + @Test void signalActivationIsSkippedWhenAnotherContextIsActive() { final Subscriber subscriber = new NoopSubscriber(); diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy index 260f5af9f9a..09823e83e9e 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy @@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification { "immediate" | Schedulers.immediate() } + def "subscribe-time context propagates across threads with #name"() { + // Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread + // propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still + // be children of the subscribe-time parent. + when: + runUnderTrace("parent") { + pipeline.call().collectList().block() + } + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span { + operationName "parent" + parent() + } + span { + operationName "addOne" + childOf span(0) + } + span { + operationName "addOne" + childOf span(0) + } + } + } + + where: + name | pipeline + "publishOn" | { + Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne) + } + "subscribeOn" | { + Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne) + } + "subscribeOn+publishOn" | { + Flux.just(1, 2) + .subscribeOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) + .map(addOne) + } + } + def "Context propagation through reactor context with span #spanType"() { when: runUnderTrace("parent", { diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java index e68665cc4bd..6e0bb195f9c 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java @@ -5,10 +5,10 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import datadog.context.Context; import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -42,7 +42,7 @@ public static class BlockingAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope before(@Advice.This final Publisher self) { return ReactorContextBridge.activateForBlocking( - self, InstrumentationContext.get(Publisher.class, Context.class)); + self, InstrumentationContext.get(Publisher.class, HandoffContext.class)); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java index f27fc0ebd35..11e025b0978 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java @@ -13,6 +13,7 @@ import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -58,7 +59,7 @@ public static ContextScope before( return ReactorContextBridge.captureOnSubscribe( self, subscriber, - InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Publisher.class, HandoffContext.class), InstrumentationContext.get(Subscriber.class, Context.class)); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java index af7743c6fff..73c963dd898 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java @@ -10,6 +10,7 @@ import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -54,7 +55,7 @@ public static void onSubscribe( self, arg, s, - InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Publisher.class, HandoffContext.class), InstrumentationContext.get(Subscriber.class, Context.class)); } } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java index 7d1e3b523af..df2f9a91a0c 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java @@ -4,6 +4,7 @@ import datadog.context.ContextScope; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.core.CoreSubscriber; @@ -48,38 +49,43 @@ public static ContextScope activateStoredContext( /** * On subscribe, hands the explicit context recorded for {@code subscriber} (a context-writing * subscriber) to the publisher store so the reactive-streams layer can propagate it, and attaches - * it. + * it. The deposit is {@linkplain HandoffContext#threadConfined thread-confined} since the + * subscribed publisher may be a concurrently-subscribed shared sink. */ public static ContextScope captureOnSubscribe( final Publisher publisher, final Subscriber subscriber, - final ContextStore publisherContexts, + final ContextStore publisherContexts, final ContextStore subscriberContexts) { final Context context = subscriberContexts.get(subscriber); if (context == null) { return null; } - publisherContexts.put(publisher, context); + publisherContexts.put(publisher, HandoffContext.threadConfined(context)); return attachIfRequired(context, Context.current()); } public static ContextScope activateForBlocking( - final Publisher publisher, final ContextStore publisherContexts) { - return attachIfRequired(publisherContexts.get(publisher), Context.current()); + final Publisher publisher, + final ContextStore publisherContexts) { + final HandoffContext handoff = publisherContexts.get(publisher); + return attachIfRequired( + handoff == null ? null : handoff.contextForCurrentThread(), Context.current()); } public static void transferToOptimizedSubscriber( final Publisher publisher, final Subscriber source, final Subscriber target, - final ContextStore publisherContexts, + final ContextStore publisherContexts, final ContextStore subscriberContexts) { if (source == null || target == null) { return; } - Context context = publisherContexts.get(publisher); + final HandoffContext handoff = publisherContexts.get(publisher); + Context context = handoff == null ? null : handoff.contextForCurrentThread(); if (context == null) { context = subscriberContexts.get(source); } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java index d729c5a7947..14bd109c458 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java @@ -8,6 +8,7 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -33,7 +34,7 @@ public String[] helperClassNames() { public Map contextStore() { final Map store = new HashMap<>(); store.put("org.reactivestreams.Subscriber", Context.class.getName()); - store.put("org.reactivestreams.Publisher", Context.class.getName()); + store.put("org.reactivestreams.Publisher", HandoffContext.class.getName()); return store; } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy index 6a2dfcd14e4..f2654e4e1a4 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/test/groovy/ReactorCoreTest.groovy @@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification { "immediate" | Schedulers.immediate() } + def "subscribe-time context propagates across threads with #name"() { + // Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread + // propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still + // be children of the subscribe-time parent. + when: + runUnderTrace("parent") { + pipeline.call().collectList().block() + } + + then: + assertTraces(1) { + trace(3) { + sortSpansByStart() + span { + operationName "parent" + parent() + } + span { + operationName "addOne" + childOf span(0) + } + span { + operationName "addOne" + childOf span(0) + } + } + } + + where: + name | pipeline + "publishOn" | { + Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne) + } + "subscribeOn" | { + Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne) + } + "subscribeOn+publishOn" | { + Flux.just(1, 2) + .subscribeOn(Schedulers.single()) + .publishOn(Schedulers.parallel()) + .map(addOne) + } + } + def "Context propagation through reactor context with span #spanType"() { when: runUnderTrace("parent", { diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java index b45d865f34b..873d57f7bce 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/CircuitBreakerOperatorInstrumentation.java @@ -4,9 +4,9 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; @@ -40,7 +40,8 @@ public static void after( result, CircuitBreakerDecorator.DECORATE, circuitBreaker, - InstrumentationContext.get(Publisher.class, Context.class)::put); + ReactorHelper.putInto( + InstrumentationContext.get(Publisher.class, HandoffContext.class))); } } } diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java index 8a87951497a..33e8d636906 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/FallbackOperatorInstrumentation.java @@ -5,9 +5,9 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import io.github.resilience4j.core.functions.CheckedSupplier; import java.util.function.Function; import net.bytebuddy.asm.Advice; @@ -40,7 +40,9 @@ public static void after( result = ReactorHelper.wrapFunction( - result, InstrumentationContext.get(Publisher.class, Context.class)::putIfAbsent); + result, + ReactorHelper.putIfAbsentInto( + InstrumentationContext.get(Publisher.class, HandoffContext.class))); } // 2.0.0+ diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java index 84b8c878b5e..bfb2bb7cdbe 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/ReactorHelper.java @@ -3,7 +3,9 @@ import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; import datadog.context.ContextScope; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -19,6 +21,20 @@ public class ReactorHelper { private static final Logger log = LoggerFactory.getLogger(ReactorHelper.class); + // These build the hand-off BiConsumer here rather than in @Advice code on purpose: a lambda + // defined in advice desugars to an invokedynamic that cannot be linked once the advice is inlined + // into the (third-party) operator, and fails silently. anyThread: the span is attached at + // assembly and the publisher may be subscribed later on another thread. + public static BiConsumer, AgentSpan> putInto( + final ContextStore store) { + return (publisher, span) -> store.put(publisher, HandoffContext.anyThread(span)); + } + + public static BiConsumer, AgentSpan> putIfAbsentInto( + final ContextStore store) { + return (publisher, span) -> store.putIfAbsent(publisher, HandoffContext.anyThread(span)); + } + public static Function, Publisher> wrapFunction( Function, Publisher> operator, BiConsumer, AgentSpan> attachContext) { diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java index f1d3266c1da..53d90f8919b 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/Resilience4jReactorModule.java @@ -1,9 +1,9 @@ package datadog.trace.instrumentation.resilience4j; import com.google.auto.service.AutoService; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -33,7 +33,8 @@ public String[] helperClassNames() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + return Collections.singletonMap( + "org.reactivestreams.Publisher", HandoffContext.class.getName()); } @Override diff --git a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java index d740571b9ae..6369e683971 100644 --- a/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/resilience4j/resilience4j-reactor-2.0/src/main/java/datadog/trace/instrumentation/resilience4j/RetryOperatorInstrumentation.java @@ -4,9 +4,9 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import io.github.resilience4j.retry.Retry; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; @@ -39,7 +39,8 @@ public static void after( result, RetryDecorator.DECORATE, retry, - InstrumentationContext.get(Publisher.class, Context.class)::put); + ReactorHelper.putInto( + InstrumentationContext.get(Publisher.class, HandoffContext.class))); } } } diff --git a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java index bf43161cce4..dc2020327f2 100644 --- a/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-messaging-4.0/src/main/java/datadog/trace/instrumentation/springmessaging/KotlinAwareHandlerInstrumentation.java @@ -8,6 +8,7 @@ import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.Collections; import java.util.Map; import net.bytebuddy.asm.Advice; @@ -38,7 +39,8 @@ public KotlinAwareHandlerInstrumentation() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + return Collections.singletonMap( + "org.reactivestreams.Publisher", HandoffContext.class.getName()); } @Override @@ -58,8 +60,8 @@ public static class DoInvokeAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.Return Object result) { if (result instanceof Publisher) { - InstrumentationContext.get(Publisher.class, Context.class) - .put((Publisher) result, Context.current()); + InstrumentationContext.get(Publisher.class, HandoffContext.class) + .put((Publisher) result, HandoffContext.anyThread(Context.current())); } } } diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java index efcbab555d1..2f5cdcbeae8 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerAdvice.java @@ -6,10 +6,10 @@ import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE; import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DISPATCHER_HANDLE_HANDLER; -import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.function.Consumer; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; @@ -51,7 +51,8 @@ public static void methodExit( final AgentSpan span = scope.span(); final Consumer finisher = new AdviceUtils.MonoSpanFinisher(span); mono = mono.doOnError(finisher).doFinally(finisher); - InstrumentationContext.get(Publisher.class, Context.class).put(mono, span); + InstrumentationContext.get(Publisher.class, HandoffContext.class) + .put(mono, HandoffContext.anyThread(span)); } scope.close(); // span finished in MonoSpanFinisher diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java index 9488b2e6950..94a340dbcd7 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java @@ -7,9 +7,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.Collections; import java.util.Map; @@ -24,7 +24,8 @@ public String instrumentedType() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + return Collections.singletonMap( + "org.reactivestreams.Publisher", HandoffContext.class.getName()); } @Override diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java index 52fe8d67558..6cfd3f903d4 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandleResultAdvice.java @@ -1,8 +1,8 @@ package datadog.trace.instrumentation.springwebflux.server; -import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; import org.springframework.web.server.ServerWebExchange; @@ -15,7 +15,8 @@ public static void methodExit( @Advice.Return(readOnly = false) Mono mono) { final AgentSpan span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE); if (span != null && mono != null) { - InstrumentationContext.get(Publisher.class, Context.class).put(mono, span); + InstrumentationContext.get(Publisher.class, HandoffContext.class) + .put(mono, HandoffContext.anyThread(span)); } } } diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java index 592a594e624..326ddce3ff4 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterAdvice.java @@ -5,10 +5,10 @@ import static datadog.trace.instrumentation.springwebflux.server.AdviceUtils.constructOperationName; import static datadog.trace.instrumentation.springwebflux.server.SpringWebfluxHttpServerDecorator.DECORATE; -import datadog.context.Context; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import net.bytebuddy.asm.Advice; import org.reactivestreams.Publisher; import org.springframework.http.HttpMethod; @@ -72,7 +72,8 @@ public static void methodExit( if (throwable != null) { DECORATE.onError(scope, throwable); } else if (mono != null) { - InstrumentationContext.get(Publisher.class, Context.class).put(mono, scope.span()); + InstrumentationContext.get(Publisher.class, HandoffContext.class) + .put(mono, HandoffContext.anyThread(scope.span())); } scope.close(); // span finished in SpanFinishingSubscriber diff --git a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java index f3196983e0a..7d0c9b48410 100644 --- a/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java +++ b/dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/main/java/datadog/trace/instrumentation/springwebflux/server/HandlerAdapterInstrumentation.java @@ -9,9 +9,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; -import datadog.context.Context; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext; import java.util.Collections; import java.util.Map; import net.bytebuddy.description.type.TypeDescription; @@ -33,7 +33,8 @@ public ElementMatcher hierarchyMatcher() { @Override public Map contextStore() { - return Collections.singletonMap("org.reactivestreams.Publisher", Context.class.getName()); + return Collections.singletonMap( + "org.reactivestreams.Publisher", HandoffContext.class.getName()); } @Override