diff --git a/transport/kubernetes/src/main/kotlin/KubernetesMessageSender.kt b/transport/kubernetes/src/main/kotlin/KubernetesMessageSender.kt index 94108ae62..de870c091 100644 --- a/transport/kubernetes/src/main/kotlin/KubernetesMessageSender.kt +++ b/transport/kubernetes/src/main/kotlin/KubernetesMessageSender.kt @@ -31,6 +31,8 @@ import io.kubernetes.client.openapi.models.V1SecretVolumeSource import io.kubernetes.client.openapi.models.V1Volume import io.kubernetes.client.openapi.models.V1VolumeMount +import java.util.UUID + import org.eclipse.apoapsis.ortserver.transport.Endpoint import org.eclipse.apoapsis.ortserver.transport.Message import org.eclipse.apoapsis.ortserver.transport.MessageSender @@ -88,22 +90,23 @@ internal class KubernetesMessageSender( private val serializer = JsonSerializer.forClass(endpoint.messageClass) override fun send(message: Message) { + val traceId = validTraceId(message) val msgMap = mapOf( - TRACE_PROPERTY to message.header.traceId, + TRACE_PROPERTY to traceId, RUN_ID_PROPERTY to message.header.ortRunId.toString(), "payload" to serializer.toJson(message.payload) ) val msgConfig = config.forMessage(message) val envVars = createEnvironment() - val labels = createTraceIdLabels(message.header.traceId) + mapOf( + val labels = createTraceIdLabels(traceId) + mapOf( RUN_ID_LABEL to message.header.ortRunId.toString(), WORKER_LABEL to endpoint.configPrefix ) val jobBody = V1JobBuilder() .withNewMetadata() - .withName("${endpoint.configPrefix}-${message.header.traceId}".take(64)) + .withName("${endpoint.configPrefix}-$traceId".take(64)) .withLabels(labels) .endMetadata() .withNewSpec() @@ -121,7 +124,7 @@ internal class KubernetesMessageSender( ) .withServiceAccountName(msgConfig.serviceAccountName) .addNewContainer() - .withName("${endpoint.configPrefix}-${message.header.traceId}".take(64)) + .withName("${endpoint.configPrefix}-$traceId".take(64)) .withImage(msgConfig.imageName) .withCommand(msgConfig.commands) .withArgs(msgConfig.args) @@ -141,6 +144,14 @@ internal class KubernetesMessageSender( api.createNamespacedJob(msgConfig.namespace, jobBody, null, null, null, null) } + /** + * Generate a valid trace ID from the given [message]. Check the ID contained in the message. If it is empty, a + * unique ID is generated. This is needed because the job name is derived from the trace ID, and Kubernetes + * as some restrictions for such names. + */ + private fun validTraceId(message: Message): String = + (message.header.traceId.takeUnless { it.isBlank() } ?: UUID.randomUUID()).toString() + /** * Prepare the environment for the job to create. This environment contains all the variables from the current * environment, but if a variable starts with a prefix named like the target [Endpoint], this prefix is removed. diff --git a/transport/kubernetes/src/test/kotlin/KubernetesMessageSenderTest.kt b/transport/kubernetes/src/test/kotlin/KubernetesMessageSenderTest.kt index b5f740a85..1ca1fb895 100644 --- a/transport/kubernetes/src/test/kotlin/KubernetesMessageSenderTest.kt +++ b/transport/kubernetes/src/test/kotlin/KubernetesMessageSenderTest.kt @@ -24,15 +24,18 @@ import com.typesafe.config.ConfigFactory import io.kotest.core.spec.style.StringSpec import io.kotest.extensions.system.OverrideMode import io.kotest.extensions.system.withEnvironment +import io.kotest.inspectors.forAll import io.kotest.matchers.collections.shouldContainExactly import io.kotest.matchers.collections.shouldContainOnly import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.collections.shouldNotContainAll +import io.kotest.matchers.ints.shouldBeGreaterThan import io.kotest.matchers.maps.shouldContainAll import io.kotest.matchers.nulls.beNull import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.should import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldStartWith import io.kubernetes.client.custom.Quantity import io.kubernetes.client.openapi.apis.BatchV1Api @@ -168,6 +171,35 @@ class KubernetesMessageSenderTest : StringSpec({ requests shouldBe expectedRequests } } + + "Valid job names are generated even if no trace ID is provided" { + val config = createConfig() + val msg = message.copy(header = header.copy(traceId = " ")) + + val (client, sender) = createClientAndSender(config) + + val jobs = mutableListOf() + sender.send(msg) + sender.send(msg) + + verify(exactly = 2) { + client.createNamespacedJob( + "test-namespace", + capture(jobs), + null, + null, + null, + null + ) + } + + val jobNames = jobs.mapNotNull { it.metadata?.name }.toSet() + jobNames shouldHaveSize 2 + jobNames.forAll { jobName -> + jobName.shouldStartWith("analyzer-") + jobName.length shouldBeGreaterThan 20 + } + } }) private val annotations = mapOf( @@ -201,15 +233,7 @@ private fun createJob( config: KubernetesSenderConfig, msg: Message = message ): V1Job { - val client = mockk { - every { createNamespacedJob(any(), any(), null, null, null, null) } returns mockk() - } - - val sender = KubernetesMessageSender( - api = client, - config = config, - endpoint = AnalyzerEndpoint - ) + val (client, sender) = createClientAndSender(config) withEnvironment(envVars, OverrideMode.SetOrOverride) { sender.send(msg) @@ -230,6 +254,24 @@ private fun createJob( return job.captured } +/** + * Create a sender to be tested based on the given [config] together with a mocked Kubernetes client. + */ +private fun createClientAndSender( + config: KubernetesSenderConfig +): Pair> { + val client = mockk { + every { createNamespacedJob(any(), any(), null, null, null, null) } returns mockk() + } + + val sender = KubernetesMessageSender( + api = client, + config = config, + endpoint = AnalyzerEndpoint + ) + return Pair(client, sender) +} + /** * Create a [KubernetesSenderConfig] with default properties that can be overridden with the given [overrides]. */