Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package datadog.trace.bootstrap.instrumentation.reactivestreams;

import datadog.context.Context;

/**
* Value of the {@code (Publisher, HandoffContext)} store used to hand a context from a publisher's
* subscribe to that publisher's own subscriber (or a blocking call).
*
* <p>{@link #threadConfined} deposits are only adopted on the producing thread. The reactor-core
* bridge uses them because a shared publisher — a multicast/replay {@code Sinks.Many} with several
* consumers — is subscribed concurrently, and keying by publisher identity alone would let one
* consumer adopt another's context. This is safe because a subscribe chain runs synchronously on
* one thread. {@link #anyThread} deposits are adopted anywhere, for producers (resilience4j,
* spring-webflux, spring-messaging) that attach to a unique publisher subscribed later, possibly on
* another thread.
*/
public final class HandoffContext {

private static final long ANY_THREAD = 0L;

private final long threadId;
private final Context context;

private HandoffContext(final Context context, final long threadId) {
this.context = context;
this.threadId = threadId;
}

public static HandoffContext anyThread(final Context context) {
return new HandoffContext(context, ANY_THREAD);
}

public static HandoffContext threadConfined(final Context context) {
return new HandoffContext(context, Thread.currentThread().getId());
}

/** The context, or {@code null} if this is a thread-confined deposit read on another thread. */
public Context contextForCurrentThread() {
return threadId == ANY_THREAD || threadId == Thread.currentThread().getId() ? context : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -55,7 +56,7 @@ public static ContextScope onSubscribe(
return ReactiveStreamsContextPropagation.captureOnSubscribe(
self,
s,
InstrumentationContext.get(Publisher.class, Context.class),
InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

Expand All @@ -13,7 +14,7 @@ private ReactiveStreamsContextPropagation() {}
public static ContextScope captureOnSubscribe(
final Publisher<?> publisher,
final Subscriber<?> subscriber,
final ContextStore<Publisher, Context> publisherContexts,
final ContextStore<Publisher, HandoffContext> publisherContexts,
final ContextStore<Subscriber, Context> subscriberContexts) {
// Don't consume the publisher context until we've verified the subscriber is non-null. For
// subscribe(null), Reactive Streams mandates an NPE after this advice returns. Consuming the
Expand All @@ -22,7 +23,8 @@ public static ContextScope captureOnSubscribe(
return null;
}

final Context contextFromPublisher = publisherContexts.remove(publisher);
final HandoffContext handoff = publisherContexts.remove(publisher);
final Context contextFromPublisher = handoff == null ? null : handoff.contextForCurrentThread();
final Context activeContext = Context.current();
final Context context = contextFromPublisher != null ? contextFromPublisher : activeContext;
if (context == Context.root()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +32,7 @@ public String[] helperClassNames() {
public Map<String, String> contextStore() {
final Map<String, String> store = new HashMap<>();
store.put("org.reactivestreams.Subscriber", Context.class.getName());
store.put("org.reactivestreams.Publisher", Context.class.getName());
store.put("org.reactivestreams.Publisher", HandoffContext.class.getName());
return store;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import datadog.context.ContextKey;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.IdentityHashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
Expand All @@ -23,12 +24,12 @@ class ReactiveStreamsContextPropagationTest {
void publisherCapturedContextOverridesActiveContext() {
final Publisher<Object> publisher = subscriber -> {};
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Publisher, Context> publisherContexts = new MapContextStore<>();
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();

// A context was captured on the publisher (e.g. at assembly / cross-thread subscribe).
// A context was handed off on the publisher, confined to this (the producing) thread.
final Context captured = Context.root().with(KEY, "captured");
publisherContexts.put(publisher, captured);
publisherContexts.put(publisher, HandoffContext.threadConfined(captured));

// The current thread already carries a different, non-root active context.
final Context active = Context.root().with(KEY, "active");
Expand All @@ -52,11 +53,66 @@ void publisherCapturedContextOverridesActiveContext() {
assertSame(active, Context.current());
}

// The captured context is remembered for the subscriber, and removed from the publisher store.
// The captured context is remembered for the subscriber, and consumed from the publisher store.
assertSame(captured, subscriberContexts.get(subscriber));
assertNull(publisherContexts.get(publisher));
}

@Test
void publisherContextFromAnotherThreadIsIgnored() throws InterruptedException {
// A thread-confined deposit from another thread (concurrent multicast subscribe) must be
// ignored.
final Publisher<Object> publisher = subscriber -> {};
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();

final Context foreign = Context.root().with(KEY, "foreign");
final Thread producer =
new Thread(() -> publisherContexts.put(publisher, HandoffContext.threadConfined(foreign)));
producer.start();
producer.join();

final Context active = Context.root().with(KEY, "active");
try (ContextScope activeScope = active.attach()) {
final ContextScope scope =
ReactiveStreamsContextPropagation.captureOnSubscribe(
publisher, subscriber, publisherContexts, subscriberContexts);
if (scope != null) {
scope.close();
}
}

// The foreign deposit is ignored; the subscriber keeps this thread's active context.
assertSame(active, subscriberContexts.get(subscriber));
}

@Test
void anyThreadPublisherContextIsAdoptedAcrossThreads() throws InterruptedException {
// An any-thread deposit (resilience4j/spring-messaging: attached early, subscribed later) is
// adopted even when the subscribe runs on a different thread.
final Publisher<Object> publisher = subscriber -> {};
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();

final Context captured = Context.root().with(KEY, "captured");
final Thread producer =
new Thread(() -> publisherContexts.put(publisher, HandoffContext.anyThread(captured)));
producer.start();
producer.join();

final ContextScope scope =
ReactiveStreamsContextPropagation.captureOnSubscribe(
publisher, subscriber, publisherContexts, subscriberContexts);
if (scope != null) {
scope.close();
}

// The any-thread deposit is adopted despite the cross-thread subscribe.
assertSame(captured, subscriberContexts.get(subscriber));
}

@Test
void signalActivationIsSkippedWhenAnotherContextIsActive() {
final Subscriber<Object> subscriber = new NoopSubscriber();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification {
"immediate" | Schedulers.immediate()
}

def "subscribe-time context propagates across threads with #name"() {
Comment thread
amarziali marked this conversation as resolved.
// Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread
// propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still
// be children of the subscribe-time parent.
when:
runUnderTrace("parent") {
pipeline.call().collectList().block()
}

then:
assertTraces(1) {
trace(3) {
sortSpansByStart()
span {
operationName "parent"
parent()
}
span {
operationName "addOne"
childOf span(0)
}
span {
operationName "addOne"
childOf span(0)
}
}
}

where:
name | pipeline
"publishOn" | {
Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne)
}
"subscribeOn" | {
Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne)
}
"subscribeOn+publishOn" | {
Flux.just(1, 2)
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.parallel())
.map(addOne)
}
}

def "Context propagation through reactor context with span #spanType"() {
when:
runUnderTrace("parent", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -42,7 +42,7 @@ public static class BlockingAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope before(@Advice.This final Publisher self) {
return ReactorContextBridge.activateForBlocking(
self, InstrumentationContext.get(Publisher.class, Context.class));
self, InstrumentationContext.get(Publisher.class, HandoffContext.class));
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -58,7 +59,7 @@ public static ContextScope before(
return ReactorContextBridge.captureOnSubscribe(
self,
subscriber,
InstrumentationContext.get(Publisher.class, Context.class),
InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datadog.context.Context;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -54,7 +55,7 @@ public static void onSubscribe(
self,
arg,
s,
InstrumentationContext.get(Publisher.class, Context.class),
InstrumentationContext.get(Publisher.class, HandoffContext.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -48,38 +49,43 @@ public static ContextScope activateStoredContext(
/**
* On subscribe, hands the explicit context recorded for {@code subscriber} (a context-writing
* subscriber) to the publisher store so the reactive-streams layer can propagate it, and attaches
* it.
* it. The deposit is {@linkplain HandoffContext#threadConfined thread-confined} since the
* subscribed publisher may be a concurrently-subscribed shared sink.
*/
public static ContextScope captureOnSubscribe(
final Publisher<?> publisher,
final Subscriber<?> subscriber,
final ContextStore<Publisher, Context> publisherContexts,
final ContextStore<Publisher, HandoffContext> publisherContexts,
final ContextStore<Subscriber, Context> subscriberContexts) {
final Context context = subscriberContexts.get(subscriber);
if (context == null) {
return null;
}

publisherContexts.put(publisher, context);
publisherContexts.put(publisher, HandoffContext.threadConfined(context));
return attachIfRequired(context, Context.current());
}

public static ContextScope activateForBlocking(
final Publisher<?> publisher, final ContextStore<Publisher, Context> publisherContexts) {
return attachIfRequired(publisherContexts.get(publisher), Context.current());
final Publisher<?> publisher,
final ContextStore<Publisher, HandoffContext> publisherContexts) {
final HandoffContext handoff = publisherContexts.get(publisher);
return attachIfRequired(
handoff == null ? null : handoff.contextForCurrentThread(), Context.current());
}

public static void transferToOptimizedSubscriber(
final Publisher<?> publisher,
final Subscriber<?> source,
final Subscriber<?> target,
final ContextStore<Publisher, Context> publisherContexts,
final ContextStore<Publisher, HandoffContext> publisherContexts,
final ContextStore<Subscriber, Context> subscriberContexts) {
if (source == null || target == null) {
return;
}

Context context = publisherContexts.get(publisher);
final HandoffContext handoff = publisherContexts.get(publisher);
Context context = handoff == null ? null : handoff.contextForCurrentThread();
if (context == null) {
context = subscriberContexts.get(source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -33,7 +34,7 @@ public String[] helperClassNames() {
public Map<String, String> contextStore() {
final Map<String, String> store = new HashMap<>();
store.put("org.reactivestreams.Subscriber", Context.class.getName());
store.put("org.reactivestreams.Publisher", Context.class.getName());
store.put("org.reactivestreams.Publisher", HandoffContext.class.getName());
return store;
}

Expand Down
Loading
Loading