Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')
addTestSuiteForDir('lettuce51Test', 'test')
addTestSuiteForDir('lettuce60Test', 'test')
addTestSuiteForDir('lettuce61Test', 'test')
addTestSuiteForDir('lettuce62Test', 'test')

dependencies {
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '5.0.0.RELEASE'
Expand All @@ -24,6 +28,10 @@ dependencies {


latestDepTestImplementation group: 'io.lettuce', name: 'lettuce-core', version: '+'
lettuce51TestImplementation group: 'io.lettuce', name: 'lettuce-core', version: '5.1.0.RELEASE'
lettuce60TestImplementation group: 'io.lettuce', name: 'lettuce-core', version: '6.0.0.RELEASE'
lettuce61TestImplementation group: 'io.lettuce', name: 'lettuce-core', version: '6.1.0.RELEASE'
lettuce62TestImplementation group: 'io.lettuce', name: 'lettuce-core', version: '6.2.0.RELEASE'

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datadog.trace.instrumentation.lettuce5;

import static datadog.trace.instrumentation.lettuce5.LettuceClientDecorator.DECORATE;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import java.util.function.BiConsumer;

public final class MasterReplicaConnectionHelper {

private MasterReplicaConnectionHelper() {}

public static boolean isRedisClientSpan(final AgentSpan span) {
return span != null && LettuceClientDecorator.REDIS_CLIENT.equals(span.getTag(Tags.COMPONENT));
}

public static void onConnection(
final AgentSpan span,
final StatefulConnection connection,
final ContextStore<StatefulConnection, RedisURI> contextStore) {
if (connection == null) {
return;
}

final RedisURI redisURI = contextStore.get(connection);
if (redisURI != null) {
DECORATE.onConnection(span, redisURI);
}
}

public static BiConsumer<StatefulConnection, Throwable> onConnectionComplete(
final AgentSpan span, final ContextStore<StatefulConnection, RedisURI> contextStore) {
return (connection, _throwable) -> onConnection(span, connection, contextStore);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package datadog.trace.instrumentation.lettuce5;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;

/**
* Master/replica APIs expose a routing connection ({@code
* StatefulRedisMasterReplicaConnectionImpl}; legacy {@code MasterSlave} wraps it in {@code
* MasterSlaveConnectionWrapper}). The real node connection is selected only after a command span
* has started and the command is dispatched, so this decorates the active span with the RedisURI
* that is available on the real connection, not the wrapper.
*/
@AutoService(InstrumenterModule.class)
public class MasterReplicaConnectionProviderInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {

public MasterReplicaConnectionProviderInstrumentation() {
super("lettuce", "lettuce-5");
}

@Override
public String[] knownMatchingTypes() {
return new String[] {
// Legacy Lettuce 5.x
"io.lettuce.core.masterslave.MasterSlaveConnectionProvider",
// Transitional Lettuce 6.0 provider
"io.lettuce.core.masterreplica.UpstreamReplicaConnectionProvider",
// Lettuce 6.1+
"io.lettuce.core.masterreplica.MasterReplicaConnectionProvider"
};
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
"io.lettuce.core.api.StatefulConnection", "io.lettuce.core.RedisURI");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".LettuceClientDecorator",
packageName + ".MasterReplicaConnectionHelper",
packageName + ".LettuceInstrumentationUtil"
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
// Intent argument types move across Lettuce versions, but only the returned connection is used.
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("getConnection"))
.and(takesArguments(1))
.and(returns(named("io.lettuce.core.api.StatefulRedisConnection"))),
MasterReplicaConnectionProviderInstrumentation.class.getName() + "$SyncAdvice");
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("getConnectionAsync"))
.and(takesArguments(1))
.and(returns(named("java.util.concurrent.CompletableFuture"))),
MasterReplicaConnectionProviderInstrumentation.class.getName() + "$AsyncAdvice");
}

public static class SyncAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return final StatefulRedisConnection<?, ?> connection) {
final AgentSpan span = activeSpan();
if (!MasterReplicaConnectionHelper.isRedisClientSpan(span)) {
return;
}

MasterReplicaConnectionHelper.onConnection(
span, connection, InstrumentationContext.get(StatefulConnection.class, RedisURI.class));
}
}

public static class AsyncAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Return final CompletableFuture<? extends StatefulConnection> connectionFuture) {
final AgentSpan span = activeSpan();
if (!MasterReplicaConnectionHelper.isRedisClientSpan(span) || connectionFuture == null) {
return;
}

connectionFuture.whenComplete(
MasterReplicaConnectionHelper.onConnectionComplete(
span, InstrumentationContext.get(StatefulConnection.class, RedisURI.class)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.redis.testcontainers.RedisContainer;
import datadog.trace.agent.test.AbstractInstrumentationTest;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.core.DDSpan;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

class Lettuce5MasterReplicaTest extends AbstractInstrumentationTest {
private RedisContainer redisServer;
private RedisClient redisClient;
private StatefulRedisConnection<String, String> connection;
private String host;
private int port;

@BeforeEach
void setUpRedis() throws Exception {
redisServer =
new RedisContainer(DockerImageName.parse("redis:6.2.6"))
.waitingFor(Wait.forListeningPort());
redisServer.start();

host = redisServer.getHost();
port = redisServer.getFirstMappedPort();

RedisURI redisURI = RedisURI.Builder.redis(host, port).withDatabase(0).build();
redisClient = RedisClient.create();
redisClient.setOptions(ClientOptions.builder().autoReconnect(false).build());
connection = connectMasterReplica(redisClient, redisURI);
connection.sync().ping();

writer.waitForTraces(2);
tracer.flush();
writer.clear();
}

@AfterEach
void cleanUpRedis() {
if (connection != null) {
connection.close();
}

if (redisClient != null) {
redisClient.shutdown(5, 10, TimeUnit.SECONDS);
}

if (redisServer != null) {
redisServer.stop();
}
}

@Test
void staticMasterReplicaCommandSpanHasPeerHostname() throws Exception {
String result = connection.sync().set("TESTSETKEY", "TESTSETVAL");

assertEquals("OK", result);
writer.waitForTraces(1);

List<DDSpan> setSpans = new ArrayList<>();
for (List<DDSpan> trace : writer) {
for (DDSpan span : trace) {
if ("SET".contentEquals(span.getResourceName())
&& "redis-client".equals(String.valueOf(span.getTag(Tags.COMPONENT)))) {
setSpans.add(span);
}
}
}

assertEquals(1, setSpans.size(), "expected exactly one SET command span");
DDSpan span = setSpans.get(0);
assertEquals("SET", String.valueOf(span.getResourceName()));
assertEquals("redis-client", String.valueOf(span.getTag(Tags.COMPONENT)));
assertEquals("redis", span.getTag(Tags.DB_TYPE));
assertNotNull(span.getTag(Tags.PEER_HOSTNAME), "command span should include peer.hostname");
assertEquals(host, span.getTag(Tags.PEER_HOSTNAME));
}

@SuppressWarnings("unchecked")
private static StatefulRedisConnection<String, String> connectMasterReplica(
RedisClient redisClient, RedisURI redisURI) throws Exception {
// Prefer the newer MasterReplica facade when this source is compiled for latestDepTest, but
// resolve both APIs reflectively so the same test still compiles with the Lettuce 5.0 baseline
// and can keep compiling if the deprecated MasterSlave facade disappears later.
Class<?> facade;
try {
facade = Class.forName("io.lettuce.core.masterreplica.MasterReplica");
} catch (ClassNotFoundException ignored) {
facade = Class.forName("io.lettuce.core.masterslave.MasterSlave");
}
Method connect =
facade.getMethod("connect", RedisClient.class, RedisCodec.class, Iterable.class);
try {
return (StatefulRedisConnection<String, String>)
connect.invoke(null, redisClient, StringCodec.UTF8, singletonList(redisURI));
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw e;
}
}
}