Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ class SpringBootBasedTest extends HttpServerTest<ConfigurableApplicationContext>
"$Tags.HTTP_ROUTE" "/success"
"servlet.context" "/boot-context"
"servlet.path" "/success"
if ({ isDataStreamsEnabled() }) {
"$DDTags.PATHWAY_HASH" { String }
}
defaultTags()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.core.datastreams;

import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V01_DATASTREAMS_ENDPOINT;
import static datadog.trace.api.DDTags.PATHWAY_HASH;
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.INBOUND;
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
Expand Down Expand Up @@ -309,6 +310,10 @@ public void setCheckpoint(AgentSpan span, DataStreamsContext context) {
PathwayContext pathwayContext = span.spanContext().getPathwayContext();
if (pathwayContext != null) {
pathwayContext.setCheckpoint(context, this::add);
long pathwayHash = pathwayContext.getHash();
if (pathwayHash != 0) {
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayHash));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the PR description does this mean there are integrations where we are doing this manually on a consume already? In which case, should we remove them so it only happens once here?

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,29 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_SMART_NULLS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.metrics.api.Histograms;
import datadog.metrics.impl.DDSketchHistograms;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.TraceConfig;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.KafkaConfigReport;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.api.datastreams.SchemaRegistryUsage;
import datadog.trace.api.datastreams.StatsPoint;
import datadog.trace.api.experimental.DataStreamsContextCarrier;
import datadog.trace.api.time.ControllableTimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.common.metrics.EventListener;
import datadog.trace.common.metrics.Sink;
import datadog.trace.core.DDCoreJavaSpecification;
Expand Down Expand Up @@ -1559,6 +1566,67 @@ void close() {
}
}

private DefaultDataStreamsMonitoring newDataStreamsMonitoring() {
return new DefaultDataStreamsMonitoring(
mock(Sink.class),
stubFeatures(true),
new ControllableTimeSource(),
() -> stubTraceConfig(true),
mock(DatastreamsPayloadWriter.class),
DEFAULT_BUCKET_DURATION_NANOS);
}

@Test
void setCheckpointTagsTheSpanWithThePathwayHash() {
DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring();

// Negative so the signed and unsigned string representations differ, proving the tag uses
// Long.toUnsignedString (pathway hashes are unsigned 64-bit values).
long hash = -1234567890123456789L;
PathwayContext pathwayContext = mock(PathwayContext.class);
when(pathwayContext.getHash()).thenReturn(hash);
AgentSpanContext spanContext = mock(AgentSpanContext.class);
when(spanContext.getPathwayContext()).thenReturn(pathwayContext);
AgentSpan span = mock(AgentSpan.class);
when(span.spanContext()).thenReturn(spanContext);
DataStreamsContext context = mock(DataStreamsContext.class);

dataStreams.setCheckpoint(span, context);

verify(pathwayContext).setCheckpoint(eq(context), any());
verify(span).setTag(DDTags.PATHWAY_HASH, Long.toUnsignedString(hash));
}

@Test
void setCheckpointDoesNotTagTheSpanWhenThePathwayHashIsZero() {
DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring();

PathwayContext pathwayContext = mock(PathwayContext.class);
when(pathwayContext.getHash()).thenReturn(0L);
AgentSpanContext spanContext = mock(AgentSpanContext.class);
when(spanContext.getPathwayContext()).thenReturn(pathwayContext);
AgentSpan span = mock(AgentSpan.class);
when(span.spanContext()).thenReturn(spanContext);

dataStreams.setCheckpoint(span, mock(DataStreamsContext.class));

verify(span, never()).setTag(eq(DDTags.PATHWAY_HASH), any(String.class));
}

@Test
void setCheckpointDoesNotTagTheSpanWhenThereIsNoPathwayContext() {
DefaultDataStreamsMonitoring dataStreams = newDataStreamsMonitoring();

AgentSpanContext spanContext = mock(AgentSpanContext.class);
when(spanContext.getPathwayContext()).thenReturn(null);
AgentSpan span = mock(AgentSpan.class);
when(span.spanContext()).thenReturn(spanContext);

dataStreams.setCheckpoint(span, mock(DataStreamsContext.class));

verify(span, never()).setTag(eq(DDTags.PATHWAY_HASH), any(String.class));
}

static class CustomContextCarrier implements DataStreamsContextCarrier {
private final Map<String, Object> data = new HashMap<>();

Expand Down