Skip to content

Commit

Permalink
Introduce toxiproxy to proxy incoming requests to runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 8, 2023
1 parent 6e26efb commit 6991707
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 16 deletions.
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ dependencyResolutionManagement {
library("testcontainers-core", "org.testcontainers", "testcontainers")
.versionRef("testcontainers")
library("testcontainers-kafka", "org.testcontainers", "kafka").versionRef("testcontainers")
library("testcontainers-toxiproxy", "org.testcontainers", "toxiproxy")
.versionRef("testcontainers")
library("awaitility", "org.awaitility", "awaitility-kotlin").versionRef("awaitility")

library("errorprone", "com.google.errorprone", "error_prone_core").versionRef("errorprone")
Expand Down
2 changes: 2 additions & 0 deletions test-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ dependencies {
implementation(libs.grpc.netty.shaded)
implementation(libs.grpc.protobuf)

implementation(libs.testcontainers.toxiproxy)

implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.0")

testImplementation(libs.junit.all)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import kotlin.time.Duration.Companion.seconds
import org.apache.logging.log4j.LogManager
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.LogUtils

/** Handle to interact with deployed containers */
class ContainerHandle internal constructor(private val container: GenericContainer<*>) {
class ContainerHandle
internal constructor(
private val container: GenericContainer<*>,
private val getMappedPort: (GenericContainer<*>, Int) -> Int? = { c, p -> c.getMappedPort(p) },
private val waitStrategy: () -> Unit = {},
) {

private val logger = LogManager.getLogger(ContainerHandle::class.java)

Expand Down Expand Up @@ -87,7 +91,7 @@ class ContainerHandle internal constructor(private val container: GenericContain
}

fun getMappedPort(port: Int): Int? {
return container.getMappedPort(port)
return this.getMappedPort.invoke(this.container, port)
}

private fun postStart() {
Expand All @@ -99,7 +103,7 @@ class ContainerHandle internal constructor(private val container: GenericContain
// Wait for running start, and wait on ports available
IsRunningStartupCheckStrategy()
.waitUntilStartupSuccessful(container.dockerClient, container.containerId)
Wait.forListeningPort().waitUntilReady(container)
waitStrategy()
}

private fun <T> retryDockerClientCommand(fn: (DockerClient, String) -> T): T {
Expand Down
81 changes: 81 additions & 0 deletions test-utils/src/main/kotlin/dev/restate/e2e/utils/ProxyContainer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package dev.restate.e2e.utils

import com.github.dockerjava.api.command.InspectContainerResponse
import eu.rekawek.toxiproxy.ToxiproxyClient
import org.testcontainers.containers.Network
import org.testcontainers.containers.ToxiproxyContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.wait.strategy.WaitStrategyTarget

internal class ProxyContainer(private val container: ToxiproxyContainer) {

companion object {
// From https://www.testcontainers.org/modules/toxiproxy/
private const val PORT_OFFSET = 8666
private const val MAX_PORTS = 31
}

private val client by lazy { ToxiproxyClient(container.host, container.controlPort) }

private var assignedPorts = 0
private val portMappings = mutableMapOf<String, Int>()

// -- Lifecycle

internal fun start(network: Network, testReportDir: String) {
container
.withNetwork(network)
.withLogConsumer(ContainerLogger(testReportDir, "toxiproxy"))
.start()
}

internal fun stop() {
container.stop()
}

// -- Port mapping

internal fun mapPort(hostName: String, port: Int): Int {
assert(assignedPorts != MAX_PORTS) { "Cannot assign more ports" }

val portName = "${hostName}-${port}"
val upstreamAddress = address(hostName, port)
val chosenPort = PORT_OFFSET + assignedPorts
assignedPorts++

client.createProxy(portName, address("0.0.0.0", chosenPort), upstreamAddress)
portMappings[upstreamAddress] = chosenPort

return chosenPort
}

internal fun getMappedPort(hostName: String, port: Int): Int? {
return portMappings[address(hostName, port)]?.let { container.getMappedPort(it) }
}

internal fun waitPorts(hostName: String, vararg ports: Int) {
val ports =
ports.map {
portMappings[address(hostName, it)]
?: throw IllegalArgumentException("Cannot wait on non existing port")
}
Wait.forListeningPort().waitUntilReady(WaitOnSpecificPortsTarget(ports, container))
}

private fun address(hostName: String, port: Int): String {
return "${hostName}:${port}"
}

private class WaitOnSpecificPortsTarget(
val ports: List<Int>,
val proxyContainer: ToxiproxyContainer
) : WaitStrategyTarget {
override fun getExposedPorts(): MutableList<Int> {
return ports.toMutableList()
}

override fun getContainerInfo(): InspectContainerResponse {
return proxyContainer.containerInfo
}
}
}
54 changes: 42 additions & 12 deletions test-utils/src/main/kotlin/dev/restate/e2e/utils/RestateDeployer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ import kotlinx.serialization.json.*
import org.apache.logging.log4j.LogManager
import org.junit.jupiter.api.extension.ExtensionContext
import org.junit.jupiter.api.fail
import org.testcontainers.containers.BindMode
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.Network
import org.testcontainers.containers.SelinuxContext
import org.testcontainers.containers.*
import org.testcontainers.images.PullPolicy
import org.testcontainers.utility.DockerImageName

Expand Down Expand Up @@ -83,9 +80,21 @@ private constructor(
functionSpecs.associate { spec -> spec.hostName to (spec to spec.toContainer()) }
private val network = Network.newNetwork()
private val tmpDir = Files.createTempDirectory("restate-e2e").toFile()

private val proxyContainer = ProxyContainer(ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0"))
private val runtimeContainer = GenericContainer(DockerImageName.parse(runtimeContainerName))
private val deployedContainers: Map<String, ContainerHandle> =
mapOf(RESTATE_RUNTIME to ContainerHandle(runtimeContainer)) +
mapOf(
RESTATE_RUNTIME to
ContainerHandle(
runtimeContainer,
{ _, port -> proxyContainer.getMappedPort(RESTATE_RUNTIME, port) },
{
proxyContainer.waitPorts(
RESTATE_RUNTIME,
RUNTIME_META_ENDPOINT_PORT,
RUNTIME_GRPC_INGRESS_ENDPOINT_PORT)
})) +
functionContainers.map { it.key to ContainerHandle(it.value.second) } +
additionalContainers.map { it.key to ContainerHandle(it.value) }

Expand Down Expand Up @@ -153,6 +162,10 @@ private constructor(
deployFunctions(testReportDir)
deployAdditionalContainers(testReportDir)
deployRuntime(testReportDir)
deployProxy(testReportDir)

// Let's execute service discovery to register the services
functionContainers.values.forEach { (spec, _) -> discoverServiceEndpoint(spec) }
}

private fun deployFunctions(testReportDir: String) {
Expand Down Expand Up @@ -209,7 +222,6 @@ private constructor(
runtimeContainer
.dependsOn(functionContainers.values.map { it.second })
.dependsOn(additionalContainers.values)
.withExposedPorts(RUNTIME_META_ENDPOINT_PORT, RUNTIME_GRPC_INGRESS_ENDPOINT_PORT)
.withEnv(additionalEnv)
// These envs should not be overriden by additionalEnv
.withEnv("RESTATE_META__REST_ADDRESS", "0.0.0.0:${RUNTIME_META_ENDPOINT_PORT}")
Expand Down Expand Up @@ -241,15 +253,25 @@ private constructor(
}

runtimeContainer.start()
logger.debug("Restate runtime started. Container id {}", runtimeContainer.containerId)
}

private fun deployProxy(testReportDir: String) {
// We use an external proxy to access from the test code to the restate container in order to
// retain
// the tcp port binding across restarts

// Configure toxiproxy and start
proxyContainer.start(network, testReportDir)

// Proxy runtime ports
proxyContainer.mapPort(RESTATE_RUNTIME, RUNTIME_META_ENDPOINT_PORT)
proxyContainer.mapPort(RESTATE_RUNTIME, RUNTIME_GRPC_INGRESS_ENDPOINT_PORT)

logger.debug(
"Restate runtime started. gRPC ingress port: {}. Meta REST API port: {}",
"Toxiproxy started. gRPC ingress port: {}. Meta REST API port: {}",
getContainerPort(RESTATE_RUNTIME, RUNTIME_GRPC_INGRESS_ENDPOINT_PORT),
getContainerPort(RESTATE_RUNTIME, RUNTIME_META_ENDPOINT_PORT))
logger.debug("Runtime container id {}", runtimeContainer.containerId)

// Let's execute service discovery to register the services
functionContainers.values.forEach { (spec, _) -> discoverServiceEndpoint(spec) }
}

@Serializable
Expand Down Expand Up @@ -313,16 +335,24 @@ private constructor(
runtimeContainer.stop()
}

private fun teardownProxy() {
proxyContainer.stop()
}

private fun teardownAll() {
teardownRuntime()
teardownAdditionalContainers()
teardownFunctions()
teardownProxy()
network!!.close()
}

internal fun createRuntimeChannel(): ManagedChannel {
return getContainerPort(RESTATE_RUNTIME, RUNTIME_GRPC_INGRESS_ENDPOINT_PORT).let { port ->
NettyChannelBuilder.forAddress("127.0.0.1", port).disableRetry().usePlaintext().build()
NettyChannelBuilder.forAddress("127.0.0.1", port)
.disableServiceConfigLookUp()
.usePlaintext()
.build()
}
}

Expand Down

0 comments on commit 6991707

Please sign in to comment.