Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,25 @@ public void setupOpenLineage(DDTraceId traceId) {
openLineageSparkConf.set(
"spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
openLineageSparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
openLineageSparkConf.set(
"spark.openlineage.run.tags",
String runTags =
"_dd.trace_id:"
+ traceId.toString()
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
+ getServiceForOpenLineage(sparkConf, isRunningOnDatabricks)
+ ";_dd.ol_intake.process_tags:"
+ ProcessTags.getTagsForSerialization()
+ ";_dd.ol_app_id:"
+ appId);
+ appId;
// _dd.ol_env carries the run environment so the lineage-processor can use it
// as the Spark application's UGP namespace, letting the OpenLineage-created
// node and the tracer-only node (djm-span-processor) resolve to the same
// entity_id. Omitted when env is unset so the consumer falls back to the
// OpenLineage namespace.
String olEnv = Config.get().getEnv();
if (!olEnv.isEmpty()) {
runTags += ";_dd.ol_env:" + olEnv;
}
openLineageSparkConf.set("spark.openlineage.run.tags", runTags);
setupOpenLineageCircuitBreaker();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,22 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
.contains("_dd.ol_service:databricks.job-cluster.some-run-name")
}

def "test setupOpenLineage sets ol_env from dd.env"() {
setup:
injectSysConfig("dd.env", "my-env")
def listener = getTestDatadogSparkListener()
listener.openLineageSparkListener = Mock(SparkListenerInterface)
listener.openLineageSparkConf = new SparkConf()
listener.setupOpenLineage(Mock(DDTraceId))

expect:
assert listener
.openLineageSparkConf
.get("spark.openlineage.run.tags")
.split(";")
.contains("_dd.ol_env:my-env")
}

def "test setupOpenLineage fills ProcessTags"() {
setup:
def listener = getTestDatadogSparkListener()
Expand Down
Loading