Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 105 additions & 50 deletions buildSrc/src/main/kotlin/datadog/gradle/plugin/ci/CIJobsExtensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,59 +3,95 @@ package datadog.gradle.plugin.ci
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.provider.Provider
import org.gradle.api.tasks.testing.Test
import org.gradle.kotlin.dsl.extra
import kotlin.math.abs

private sealed class SlotSelection {
object All : SlotSelection()
data class Active(val index: Int, val total: Int) : SlotSelection()
}

private const val SLOT_CACHE_KEY = "datadog.ci.slotSelection"

/**
* Determines if the current project is in the selected slot.
*
* The "slot" property should be provided in the format "X/Y", where X is the selected slot (1-based)
* and Y is the total number of slots.
*
* If the "slot" property is not provided, all projects are considered to be in the selected slot.
* Parsed -Pslot=X/Y, cached on the root project so we parse + warn at most once per build.
*/
val Project.isInSelectedSlot: Provider<Boolean>
get() = rootProject.providers.gradleProperty("slot").map { slot ->
val parts = slot.split("/")
if (parts.size != 2) {
project.logger.warn("Invalid slot format '{}', expected 'X/Y'. Treating all projects as selected.", slot)
return@map true
}
private fun Project.slotSelection(): SlotSelection {
val root = rootProject
if (root.extra.has(SLOT_CACHE_KEY)) {
return root.extra.get(SLOT_CACHE_KEY) as SlotSelection
}
val raw = root.providers.gradleProperty("slot").orNull
val parsed = parseSlot(raw, root)
root.extra.set(SLOT_CACHE_KEY, parsed)
if (parsed is SlotSelection.Active) {
root.logger.lifecycle("CI slot sharding active: {}/{}", parsed.index, parsed.total)
}
return parsed
}

// When CI_NODE_INDEX or CI_NODE_TOTAL is unset in non-parallel jobs, one part may be empty (e.g. slot="/1") — treat as no filtering
if (parts[0].isBlank() || parts[1].isBlank()) {
project.logger.info("Incomplete slot value '{}', CI_NODE_INDEX or CI_NODE_TOTAL not set. Treating all projects as selected.", slot)
return@map true
}
private fun parseSlot(raw: String?, root: Project): SlotSelection {
if (raw.isNullOrBlank()) return SlotSelection.All
val parts = raw.split("/")
if (parts.size != 2 || parts[0].isBlank() || parts[1].isBlank()) {
root.logger.warn("Invalid -Pslot='{}', expected 'X/Y'. Disabling slot sharding.", raw)
return SlotSelection.All
}
val index = parts[0].toIntOrNull()
val total = parts[1].toIntOrNull()
if (index == null || total == null || total <= 0 || index < 1 || index > total) {
root.logger.warn("Invalid -Pslot='{}', expected 'X/Y' with 1 <= X <= Y. Disabling slot sharding.", raw)
return SlotSelection.All
}
return SlotSelection.Active(index, total)
}

/**
* Murmur3 32-bit finalizer. Avalanches bits so similar inputs (paths sharing a long common
* prefix like `:dd-java-agent:instrumentation:...`) land in different slots.
*/
private fun avalanche(hash: Int): Int {
var h = hash
h = h xor (h ushr 16)
h *= 0x85ebca6b.toInt()
h = h xor (h ushr 13)
h *= 0xc2b2ae35.toInt()
h = h xor (h ushr 16)
return h
}

val selectedSlot = parts[0].toIntOrNull()
val totalSlots = parts[1].toIntOrNull()
/** 1-based slot index this identityPath hashes to, given a total of `total` slots. */
private fun slotOf(identityPath: String, total: Int): Int =
Math.floorMod(avalanche(identityPath.hashCode()), total) + 1

if (selectedSlot == null || totalSlots == null || totalSlots <= 0) {
project.logger.warn("Invalid slot values '{}', expected numeric 'X/Y' with Y > 0. Treating all projects as selected.", slot)
return@map true
}
private fun selectedSlotFor(project: Project, identityPath: String): Boolean =
when (val s = project.slotSelection()) {
SlotSelection.All -> true
is SlotSelection.Active -> slotOf(identityPath, s.total) == s.index
}

// Distribution numbers when running on rootProject.allprojects indicates
// bucket sizes are reasonably balanced:
//
// * size 4 distribution: {2=146, 0=143, 1=157, 3=145}
// * size 6 distribution: {4=100, 0=92, 3=97, 2=97, 1=108, 5=97}
// * size 8 distribution: {2=62, 4=72, 0=71, 5=70, 7=78, 6=84, 1=87, 3=67}
// * size 10 distribution: {8=62, 0=65, 5=70, 9=59, 3=54, 1=56, 6=63, 4=47, 2=52, 7=63}
// * size 12 distribution: {10=55, 0=47, 4=45, 9=46, 8=51, 3=51, 2=46, 1=59, 5=52, 7=49, 11=45, 6=45}
val projectSlot = abs(project.path.hashCode() % totalSlots) + 1 // Convert to 1-based

project.logger.info(
"Project {} assigned to slot {}/{}, active slot is {}",
project.path,
projectSlot,
totalSlots,
selectedSlot,
)

projectSlot == selectedSlot
}.orElse(true)
/**
* Whether this project (or task) belongs in the currently selected slot.
*
* The "slot" property should be provided as "X/Y" where X is the 1-based selected slot and Y is
* the total number of slots. If unset, everything is in-slot.
*/
val Project.isInSelectedSlot: Provider<Boolean>
get() = providers.provider { selectedSlotFor(this, path) }

val Task.isInSelectedSlot: Provider<Boolean>
get() = project.providers.provider { selectedSlotFor(project, path) }

private fun Project.aggregateTestTasksFor(subproject: Project, aggregateTaskName: String): List<Test> =
when (aggregateTaskName) {
"allTests" -> subproject.tasks.withType(Test::class.java).matching { testTask ->
!testTask.name.contains("latest", ignoreCase = true) && testTask.name != "traceAgentTest"
}.toList()
"allLatestDepTests" -> subproject.tasks.withType(Test::class.java).matching { testTask ->
testTask.name.contains("latest", ignoreCase = true)
}.toList()
else -> emptyList()
}

/**
* Returns the task's path, given affected projects, if this task or its dependencies are affected by git changes.
Expand Down Expand Up @@ -97,22 +133,28 @@ private fun Project.createRootTask(
forceCoverage: Boolean
) {
val coverage = forceCoverage || rootProject.providers.gradleProperty("checkCoverage").isPresent
val taskLevelSlotting = !coverage && (subProjTaskName == "allTests" || subProjTaskName == "allLatestDepTests")
tasks.register(rootTaskName) {
var consideredTestTasks = 0
var selectedTestTasks = 0
subprojects.forEach { subproject ->
if (
subproject.isInSelectedSlot.get() &&
includePrefixes.any { subproject.path.startsWith(it) } &&
!excludePrefixes.any { subproject.path.startsWith(it) }
) {
val testTask = subproject.tasks.findByName(subProjTaskName)
if (!taskLevelSlotting && !subproject.isInSelectedSlot.get()) {
return@forEach
}

val aggregateTask = subproject.tasks.findByName(subProjTaskName)
var isAffected = true

if (testTask != null) {
if (aggregateTask != null) {
val useGitChanges = rootProject.extra.get("useGitChanges") as Boolean
if (useGitChanges) {
@Suppress("UNCHECKED_CAST")
val affectedProjects = rootProject.extra.get("affectedProjects") as Map<Project, Set<String>>
val affectedTaskPath = findAffectedTaskPath(testTask, affectedProjects)
val affectedTaskPath = findAffectedTaskPath(aggregateTask, affectedProjects)
if (affectedTaskPath != null) {
logger.warn("Selecting ${subproject.path}:$subProjTaskName (affected by $affectedTaskPath)")
} else {
Expand All @@ -121,7 +163,15 @@ private fun Project.createRootTask(
}
}
if (isAffected) {
dependsOn(testTask)
if (taskLevelSlotting) {
val candidates = aggregateTestTasksFor(subproject, subProjTaskName)
val selected = candidates.filter { it.isInSelectedSlot.get() }
consideredTestTasks += candidates.size
selectedTestTasks += selected.size
dependsOn(selected)
} else {
dependsOn(aggregateTask)
}
}
}

Expand All @@ -137,6 +187,11 @@ private fun Project.createRootTask(
}
}
}
if (taskLevelSlotting && consideredTestTasks > 0) {
logger.lifecycle(
"$rootTaskName: slot selected $selectedTestTasks of $consideredTestTasks Test tasks ($subProjTaskName)"
)
}
}
}

Expand Down
10 changes: 0 additions & 10 deletions buildSrc/src/main/kotlin/dd-trace-java.ci-jobs.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datadog.gradle.plugin.ci.isInSelectedSlot
import org.gradle.api.tasks.testing.Test
import java.io.File

/*
Expand All @@ -14,15 +13,6 @@ if (project != rootProject) {
logger.error("This plugin has been applied on a non-root project: ${project.path}")
}

allprojects {
// Enable tests only on the selected slot (if -Pslot=n/t is provided)
tasks.withType<Test>().configureEach {
onlyIf("Project is in selected slot") {
project.isInSelectedSlot.get()
}
}
}

fun relativeToGitRoot(f: File): File {
return rootProject.projectDir.toPath().relativize(f.absoluteFile.toPath()).toFile()
}
Expand Down
4 changes: 2 additions & 2 deletions gradle/java_no_deps.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ def tracerJavaExtension = extensions.create(TracerJavaExtension.NAME, TracerJava



// Only run one testcontainers test at a time
// Number of testcontainers test at a time
ext.testcontainersLimit = gradle.sharedServices.registerIfAbsent("testcontainersLimit", BuildService) {
maxParallelUsages = 1
maxParallelUsages = 2
}

// Task for tests that want to run forked in their own separate JVM
Expand Down
Loading