From 7a6529a7d7f430736b09b0fbce41c41797f9f396 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 14 Jan 2026 12:36:07 +0100 Subject: [PATCH] Use kafka-native container, change to use the new kafka container test class --- .../sdktesting/infra/KafkaContainer.kt | 79 +++++++++++-------- .../tests/BackwardCompatibilityTest.kt | 9 ++- .../tests/ForwardCompatibilityTest.kt | 4 +- .../dev/restate/sdktesting/tests/Kafka.kt | 4 +- .../tests/KafkaAndWorkflowAPITest.kt | 6 +- .../sdktesting/tests/KafkaTracingTest.kt | 3 +- 6 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/main/kotlin/dev/restate/sdktesting/infra/KafkaContainer.kt b/src/main/kotlin/dev/restate/sdktesting/infra/KafkaContainer.kt index 1596aca7..ab74afb1 100644 --- a/src/main/kotlin/dev/restate/sdktesting/infra/KafkaContainer.kt +++ b/src/main/kotlin/dev/restate/sdktesting/infra/KafkaContainer.kt @@ -11,58 +11,71 @@ package dev.restate.sdktesting.infra import com.github.dockerjava.api.command.InspectContainerResponse import kotlin.time.Duration.Companion.minutes import kotlin.time.toJavaDuration +import org.testcontainers.images.builder.Transferable +import org.testcontainers.kafka.KafkaContainer import org.testcontainers.utility.DockerImageName /** - * This class overrides the testcontainers [org.testcontainers.containers.KafkaContainer] to - * introduce two additional behaviours: + * This class overrides the testcontainers [org.testcontainers.kafka.KafkaContainer] to introduce + * two additional behaviours: * * Fix the `advertised.listeners` override with the correct hostname (maybe we can upstream this * fix?) * * Create topics after the container is started */ class KafkaContainer(private vararg val topics: String) : - org.testcontainers.containers.KafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka:7.1.0-1-ubi8")) { + KafkaContainer( + DockerImageName.parse("docker.io/apache/kafka-native:4.1.1") + .asCompatibleSubstituteFor("apache/kafka")) { companion object { - const val EXTERNAL_PORT = KAFKA_PORT + const val KAFKA_NETWORK_PORT = 9092 + const val KAFKA_EXTERNAL_PORT = 9094 + const val STARTER_SCRIPT = "/tmp/testcontainers_start.sh" } init { // Make sure we have auto.create.topics.enable as true withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") withStartupTimeout(2.minutes.toJavaDuration()) + + // Set network alias so it's accessible as kafka:9092 within docker network + withNetworkAliases("kafka") + + // Expose the external port for host access + addExposedPort(KAFKA_EXTERNAL_PORT) } - // This is copied and pasted from KafkaContainer original class to add the listeners from the - // network aliases and remove the brokerAdvertisedListener - override fun containerIsStarted(containerInfo: InspectContainerResponse) { - val currentBrokerAdvertisedAddress = brokerAdvertisedListener(containerInfo) + override fun containerIsStarting(containerInfo: InspectContainerResponse) { + // Don't call super - we're doing our own thing + + // Get the dynamically mapped external port + val externalPort = getMappedPort(KAFKA_EXTERNAL_PORT) + + // Build advertised listeners: + // - INTERNAL: for docker network access (kafka:9092) + // - EXTERNAL: for host access (localhost:random-port) + // - BROKER: for inter-broker communication + val advertisedListeners = + listOf( + "INTERNAL://kafka:9092", + "EXTERNAL://${host}:${externalPort}", + "BROKER://${containerInfo.config.hostName}:9093") + .joinToString(",") - // Create topics first - topics.forEach { topic -> - execInContainer( - "kafka-topics", - "--create", - "--topic", - topic, - "--bootstrap-server", - currentBrokerAdvertisedAddress) - } + // Create startup script that exports the advertised listeners + // INTERNAL binds to 9092, EXTERNAL binds to 9094 (different ports!) + val command = + """ + #!/bin/bash + export KAFKA_ADVERTISED_LISTENERS=$advertisedListeners + export KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9095 + export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + export KAFKA_INTER_BROKER_LISTENER_NAME=BROKER + export KAFKA_CONTROLLER_QUORUM_VOTERS=1@${containerInfo.config.hostName}:9095 + /etc/kafka/docker/run + """ + .trimIndent() - val listenerList = networkAliases.map { "BROKER://$it:9092" } + bootstrapServers - val result = - execInContainer( - "kafka-configs", - "--alter", - "--bootstrap-server", - currentBrokerAdvertisedAddress, - "--entity-type", - "brokers", - "--entity-name", - envMap["KAFKA_BROKER_ID"], - "--add-config", - "advertised.listeners=[${listenerList.joinToString(separator = ",")}]") - check(result.exitCode == 0) { result.toString() } + copyFileToContainer(Transferable.of(command, 0x1ff), STARTER_SCRIPT) } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt index b3d6e151..6df35d9a 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/BackwardCompatibilityTest.kt @@ -54,7 +54,8 @@ import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.api.parallel.Isolated /** - * Tests verifying backward compatibility (newer Restate version can read data written by older version). + * Tests verifying backward compatibility (newer Restate version can read data written by older + * version). */ @Tag("version-compatibility") @Isolated @@ -180,7 +181,8 @@ class BackwardCompatibilityTest { @Test fun startRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = + BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) // Send the request and expect it to fail retryableClient.send().runRetryableOperation { idempotencyKey = idempotencyKeyRunBlockTest } @@ -311,7 +313,8 @@ class BackwardCompatibilityTest { @Test fun completeRetryableOperation(@InjectClient ingressClient: Client) = runTest { - val retryableClient = BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) + val retryableClient = + BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient) val result = retryableClient.send().runRetryableOperation { diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt index 6d850ea0..3b179b0e 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/ForwardCompatibilityTest.kt @@ -46,9 +46,7 @@ import org.junit.jupiter.api.parallel.Execution import org.junit.jupiter.api.parallel.ExecutionMode import org.junit.jupiter.api.parallel.Isolated -/** - * Tests to verify forward compatibility (older version can read data written by newer version). - */ +/** Tests to verify forward compatibility (older version can read data written by newer version). */ @Tag("version-compatibility") @Isolated @Execution(ExecutionMode.SAME_THREAD) diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt b/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt index 0b923e23..3dda243b 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt @@ -11,6 +11,7 @@ package dev.restate.sdktesting.tests import dev.restate.admin.api.SubscriptionApi import dev.restate.admin.client.ApiClient import dev.restate.admin.model.CreateSubscriptionRequest +import dev.restate.sdktesting.infra.KafkaContainer import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions import dev.restate.sdktesting.infra.runtimeconfig.KafkaClusterOptions import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema @@ -56,6 +57,7 @@ object Kafka { listOf( KafkaClusterOptions() .withName("my-cluster") - .withBrokers(listOf("PLAINTEXT://kafka:9092"))))) + .withBrokers( + listOf("PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}"))))) } } diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt index f9114be1..a59d39c8 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaAndWorkflowAPITest.kt @@ -65,7 +65,8 @@ class KafkaAndWorkflowAPITest { @Execution(ExecutionMode.CONCURRENT) fun callWorkflowHandler( @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, @InjectClient ingressClient: Client ) = runTest { // Create subscription @@ -116,7 +117,8 @@ class KafkaAndWorkflowAPITest { @Execution(ExecutionMode.CONCURRENT) fun callSharedWorkflowHandler( @InjectAdminURI adminURI: URI, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, @InjectClient ingressClient: Client ) = runTest { // Create subscription diff --git a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt index 0594675c..85649921 100644 --- a/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt +++ b/src/main/kotlin/dev/restate/sdktesting/tests/KafkaTracingTest.kt @@ -69,7 +69,8 @@ class KafkaTracingTest { @InjectClient ingressClient: Client, @InjectAdminURI adminURI: URI, @InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int, - @InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int, + @InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT) + kafkaPort: Int, ) = runTest { Kafka.createKafkaSubscription( adminURI, TOPIC, KafkaTracingTestCounterHandlers.Metadata.SERVICE_NAME, "set")