Skip to content

Commit

Permalink
fix(kubernetes): Handle empty trace IDs in KubernetesMessageSender
Browse files Browse the repository at this point in the history
Kubernetes has some restrictions for the names of jobs or pods; they
must be compliant to RFC 1123. Since the names are derived from the
trace ID, add a special treatment for empty trace IDs that lead to
invalid names and can sometimes occur if no trace ID is explicitly
provided.

Signed-off-by: Oliver Heger <[email protected]>
  • Loading branch information
oheger-bosch committed Dec 13, 2024
1 parent bfd6062 commit 419a4df
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 13 deletions.
19 changes: 15 additions & 4 deletions transport/kubernetes/src/main/kotlin/KubernetesMessageSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import io.kubernetes.client.openapi.models.V1SecretVolumeSource
import io.kubernetes.client.openapi.models.V1Volume
import io.kubernetes.client.openapi.models.V1VolumeMount

import java.util.UUID

import org.eclipse.apoapsis.ortserver.transport.Endpoint
import org.eclipse.apoapsis.ortserver.transport.Message
import org.eclipse.apoapsis.ortserver.transport.MessageSender
Expand Down Expand Up @@ -88,22 +90,23 @@ internal class KubernetesMessageSender<T : Any>(
private val serializer = JsonSerializer.forClass(endpoint.messageClass)

override fun send(message: Message<T>) {
val traceId = validTraceId(message)
val msgMap = mapOf(
TRACE_PROPERTY to message.header.traceId,
TRACE_PROPERTY to traceId,
RUN_ID_PROPERTY to message.header.ortRunId.toString(),
"payload" to serializer.toJson(message.payload)
)

val msgConfig = config.forMessage(message)
val envVars = createEnvironment()
val labels = createTraceIdLabels(message.header.traceId) + mapOf(
val labels = createTraceIdLabels(traceId) + mapOf(
RUN_ID_LABEL to message.header.ortRunId.toString(),
WORKER_LABEL to endpoint.configPrefix
)

val jobBody = V1JobBuilder()
.withNewMetadata()
.withName("${endpoint.configPrefix}-${message.header.traceId}".take(64))
.withName("${endpoint.configPrefix}-$traceId".take(64))
.withLabels<String, String>(labels)
.endMetadata()
.withNewSpec()
Expand All @@ -121,7 +124,7 @@ internal class KubernetesMessageSender<T : Any>(
)
.withServiceAccountName(msgConfig.serviceAccountName)
.addNewContainer()
.withName("${endpoint.configPrefix}-${message.header.traceId}".take(64))
.withName("${endpoint.configPrefix}-$traceId".take(64))
.withImage(msgConfig.imageName)
.withCommand(msgConfig.commands)
.withArgs(msgConfig.args)
Expand All @@ -141,6 +144,14 @@ internal class KubernetesMessageSender<T : Any>(
api.createNamespacedJob(msgConfig.namespace, jobBody, null, null, null, null)
}

/**
* Generate a valid trace ID from the given [message]. Check the ID contained in the message. If it is empty, a
* unique ID is generated. This is needed because the job name is derived from the trace ID, and Kubernetes
* as some restrictions for such names.
*/
private fun validTraceId(message: Message<T>): String =
(message.header.traceId.takeUnless { it.isBlank() } ?: UUID.randomUUID()).toString()

/**
* Prepare the environment for the job to create. This environment contains all the variables from the current
* environment, but if a variable starts with a prefix named like the target [Endpoint], this prefix is removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ import com.typesafe.config.ConfigFactory
import io.kotest.core.spec.style.StringSpec
import io.kotest.extensions.system.OverrideMode
import io.kotest.extensions.system.withEnvironment
import io.kotest.inspectors.forAll
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.collections.shouldContainOnly
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.collections.shouldNotContainAll
import io.kotest.matchers.ints.shouldBeGreaterThan
import io.kotest.matchers.maps.shouldContainAll
import io.kotest.matchers.nulls.beNull
import io.kotest.matchers.nulls.shouldNotBeNull
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.string.shouldStartWith

import io.kubernetes.client.custom.Quantity
import io.kubernetes.client.openapi.apis.BatchV1Api
Expand Down Expand Up @@ -168,6 +171,35 @@ class KubernetesMessageSenderTest : StringSpec({
requests shouldBe expectedRequests
}
}

"Valid job names are generated even if no trace ID is provided" {
val config = createConfig()
val msg = message.copy(header = header.copy(traceId = " "))

val (client, sender) = createClientAndSender(config)

val jobs = mutableListOf<V1Job>()
sender.send(msg)
sender.send(msg)

verify(exactly = 2) {
client.createNamespacedJob(
"test-namespace",
capture(jobs),
null,
null,
null,
null
)
}

val jobNames = jobs.mapNotNull { it.metadata?.name }.toSet()
jobNames shouldHaveSize 2
jobNames.forAll { jobName ->
jobName.shouldStartWith("analyzer-")
jobName.length shouldBeGreaterThan 20
}
}
})

private val annotations = mapOf(
Expand Down Expand Up @@ -201,15 +233,7 @@ private fun createJob(
config: KubernetesSenderConfig,
msg: Message<AnalyzerRequest> = message
): V1Job {
val client = mockk<BatchV1Api> {
every { createNamespacedJob(any(), any(), null, null, null, null) } returns mockk()
}

val sender = KubernetesMessageSender(
api = client,
config = config,
endpoint = AnalyzerEndpoint
)
val (client, sender) = createClientAndSender(config)

withEnvironment(envVars, OverrideMode.SetOrOverride) {
sender.send(msg)
Expand All @@ -230,6 +254,24 @@ private fun createJob(
return job.captured
}

/**
* Create a sender to be tested based on the given [config] together with a mocked Kubernetes client.
*/
private fun createClientAndSender(
config: KubernetesSenderConfig
): Pair<BatchV1Api, KubernetesMessageSender<AnalyzerRequest>> {
val client = mockk<BatchV1Api> {
every { createNamespacedJob(any(), any(), null, null, null, null) } returns mockk()
}

val sender = KubernetesMessageSender(
api = client,
config = config,
endpoint = AnalyzerEndpoint
)
return Pair(client, sender)
}

/**
* Create a [KubernetesSenderConfig] with default properties that can be overridden with the given [overrides].
*/
Expand Down

0 comments on commit 419a4df

Please sign in to comment.