Tag consumer spans with the pathway hash on every data streams checkpoint#11808
Open
ericfirth wants to merge 4 commits into
Open
Tag consumer spans with the pathway hash on every data streams checkpoint#11808ericfirth wants to merge 4 commits into
ericfirth wants to merge 4 commits into
Conversation
…oint The pathway.hash span tag was only set on the produce/inject path (DataStreamsPropagator.inject). Consumers that checkpoint without injecting (e.g. RabbitMQ) had no pathway.hash, unlike the JS and Python tracers which tag the span on every checkpoint. Set it centrally in DefaultDataStreamsMonitoring.setCheckpoint so all consume-side integrations get it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per review on #11808; the rationale lives in the PR description. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the Java Data Streams Monitoring (DSM) core checkpointing path so spans are tagged with pathway.hash on every DSM checkpoint that has an associated span (including consumer-side checkpoints), aligning behavior with other tracers and improving pathway/span correlation.
Changes:
- Tag spans with
pathway.hashinsideDefaultDataStreamsMonitoring.setCheckpoint(span, context)when a non-zero pathway hash is available. - Add unit tests to validate tagging behavior (non-zero hash, zero hash, and missing pathway context).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java | Tags spans with pathway.hash after PathwayContext.setCheckpoint(...) when hash is non-zero. |
| dd-trace-core/src/test/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.java | Adds unit tests covering new span-tagging behavior for setCheckpoint. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- cache pathwayContext.getHash() in a local instead of calling it twice - use a negative hash in the test so it actually exercises Long.toUnsignedString Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…bled Now that DefaultDataStreamsMonitoring.setCheckpoint tags the span on every checkpoint, the inferred-proxy (API Gateway) HTTP server span gets a pathway.hash like any other DSM-enabled server span. The shared HttpServerTest already asserts this conditionally; SpringBootBasedTest's hand-rolled inferred-proxy assertions were missing it. Mirror the same guard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| pathwayContext.setCheckpoint(context, this::add); | ||
| long pathwayHash = pathwayContext.getHash(); | ||
| if (pathwayHash != 0) { | ||
| span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayHash)); |
Contributor
There was a problem hiding this comment.
Based on the PR description does this mean there are integrations where we are doing this manually on a consume already? In which case, should we remove them so it only happens once here?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Set the
pathway.hashspan tag insideDefaultDataStreamsMonitoring.setCheckpoint(span, context)so it is applied on every checkpoint that has a span — consume side included — not only on produce/inject.Why
Today
pathway.hashis written in exactly one place:DataStreamsPropagator.inject(...), which only runs on the produce/inject path. Consumers that checkpoint without injecting downstream context — e.g. RabbitMQ'sbasicGet/basicConsume, which callgetDataStreamsMonitoring().setCheckpoint(span, ...)— compute and emit the pathway hash but never tag their span with it. So consumer spans are missingpathway.hashand can't be correlated back to the pathway.Some consumers (e.g. SQS) happen to get the tag only because their integration also calls
dsmPropagator.inject(...)during receive — an integration-specific side effect, not a guarantee.The JS (
processor.js→recordCheckpoint) and Python (processor.py→set_checkpoint) tracers already set the tag in their central checkpoint method, so it lands on both produce and consume. This change brings Java to parity by doing the same in the coresetCheckpoint.Change
After
pathwayContext.setCheckpoint(...), tag the span withpathway.hashwhen the hash is non-zero. The produce path is unchanged (the propagator still tags on inject; the value is identical, so this is idempotent for producers that also flow throughsetCheckpoint).Tests
Added three unit tests in
DefaultDataStreamsMonitoringTest:./gradlew :dd-trace-core:test --tests "datadog.trace.core.datastreams.DefaultDataStreamsMonitoringTest"→ 36 tests, 0 failures.Context
Found while working on RabbitMQ DSM (the consumer span had no
pathway.hashwhile the producer did). Independent of, and complementary to, the RabbitMQ default-exchange topic fix (#11805).