Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 46 additions & 33 deletions src/main/kotlin/dev/restate/sdktesting/infra/KafkaContainer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,58 +11,71 @@ package dev.restate.sdktesting.infra
import com.github.dockerjava.api.command.InspectContainerResponse
import kotlin.time.Duration.Companion.minutes
import kotlin.time.toJavaDuration
import org.testcontainers.images.builder.Transferable
import org.testcontainers.kafka.KafkaContainer
import org.testcontainers.utility.DockerImageName

/**
* This class overrides the testcontainers [org.testcontainers.containers.KafkaContainer] to
* introduce two additional behaviours:
* This class overrides the testcontainers [org.testcontainers.kafka.KafkaContainer] to introduce
* two additional behaviours:
* * Fix the `advertised.listeners` override with the correct hostname (maybe we can upstream this
* fix?)
* * Create topics after the container is started
*/
class KafkaContainer(private vararg val topics: String) :
org.testcontainers.containers.KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.1.0-1-ubi8")) {
KafkaContainer(
DockerImageName.parse("docker.io/apache/kafka-native:4.1.1")
.asCompatibleSubstituteFor("apache/kafka")) {

companion object {
const val EXTERNAL_PORT = KAFKA_PORT
const val KAFKA_NETWORK_PORT = 9092
const val KAFKA_EXTERNAL_PORT = 9094
const val STARTER_SCRIPT = "/tmp/testcontainers_start.sh"
}

init {
// Make sure we have auto.create.topics.enable as true
withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
withStartupTimeout(2.minutes.toJavaDuration())

// Set network alias so it's accessible as kafka:9092 within docker network
withNetworkAliases("kafka")

// Expose the external port for host access
addExposedPort(KAFKA_EXTERNAL_PORT)
}

// This is copied and pasted from KafkaContainer original class to add the listeners from the
// network aliases and remove the brokerAdvertisedListener
override fun containerIsStarted(containerInfo: InspectContainerResponse) {
val currentBrokerAdvertisedAddress = brokerAdvertisedListener(containerInfo)
override fun containerIsStarting(containerInfo: InspectContainerResponse) {
// Don't call super - we're doing our own thing

// Get the dynamically mapped external port
val externalPort = getMappedPort(KAFKA_EXTERNAL_PORT)

// Build advertised listeners:
// - INTERNAL: for docker network access (kafka:9092)
// - EXTERNAL: for host access (localhost:random-port)
// - BROKER: for inter-broker communication
val advertisedListeners =
listOf(
"INTERNAL://kafka:9092",
"EXTERNAL://${host}:${externalPort}",
"BROKER://${containerInfo.config.hostName}:9093")
.joinToString(",")

// Create topics first
topics.forEach { topic ->
execInContainer(
"kafka-topics",
"--create",
"--topic",
topic,
"--bootstrap-server",
currentBrokerAdvertisedAddress)
}
// Create startup script that exports the advertised listeners
// INTERNAL binds to 9092, EXTERNAL binds to 9094 (different ports!)
val command =
"""
#!/bin/bash
export KAFKA_ADVERTISED_LISTENERS=$advertisedListeners
export KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9095
export KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
export KAFKA_INTER_BROKER_LISTENER_NAME=BROKER
export KAFKA_CONTROLLER_QUORUM_VOTERS=1@${containerInfo.config.hostName}:9095
/etc/kafka/docker/run
"""
.trimIndent()

val listenerList = networkAliases.map { "BROKER://$it:9092" } + bootstrapServers
val result =
execInContainer(
"kafka-configs",
"--alter",
"--bootstrap-server",
currentBrokerAdvertisedAddress,
"--entity-type",
"brokers",
"--entity-name",
envMap["KAFKA_BROKER_ID"],
"--add-config",
"advertised.listeners=[${listenerList.joinToString(separator = ",")}]")
check(result.exitCode == 0) { result.toString() }
copyFileToContainer(Transferable.of(command, 0x1ff), STARTER_SCRIPT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ import org.junit.jupiter.api.parallel.ExecutionMode
import org.junit.jupiter.api.parallel.Isolated

/**
* Tests verifying backward compatibility (newer Restate version can read data written by older version).
* Tests verifying backward compatibility (newer Restate version can read data written by older
* version).
*/
@Tag("version-compatibility")
@Isolated
Expand Down Expand Up @@ -180,7 +181,8 @@ class BackwardCompatibilityTest {

@Test
fun startRetryableOperation(@InjectClient ingressClient: Client) = runTest {
val retryableClient = BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient)
val retryableClient =
BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient)

// Send the request and expect it to fail
retryableClient.send().runRetryableOperation { idempotencyKey = idempotencyKeyRunBlockTest }
Expand Down Expand Up @@ -311,7 +313,8 @@ class BackwardCompatibilityTest {

@Test
fun completeRetryableOperation(@InjectClient ingressClient: Client) = runTest {
val retryableClient = BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient)
val retryableClient =
BackwardCompatibilityTestRetryableServiceClient.fromClient(ingressClient)

val result =
retryableClient.send().runRetryableOperation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
import org.junit.jupiter.api.parallel.Isolated

/**
* Tests to verify forward compatibility (older version can read data written by newer version).
*/
/** Tests to verify forward compatibility (older version can read data written by newer version). */
@Tag("version-compatibility")
@Isolated
@Execution(ExecutionMode.SAME_THREAD)
Expand Down
4 changes: 3 additions & 1 deletion src/main/kotlin/dev/restate/sdktesting/tests/Kafka.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package dev.restate.sdktesting.tests
import dev.restate.admin.api.SubscriptionApi
import dev.restate.admin.client.ApiClient
import dev.restate.admin.model.CreateSubscriptionRequest
import dev.restate.sdktesting.infra.KafkaContainer
import dev.restate.sdktesting.infra.runtimeconfig.IngressOptions
import dev.restate.sdktesting.infra.runtimeconfig.KafkaClusterOptions
import dev.restate.sdktesting.infra.runtimeconfig.RestateConfigSchema
Expand Down Expand Up @@ -56,6 +57,7 @@ object Kafka {
listOf(
KafkaClusterOptions()
.withName("my-cluster")
.withBrokers(listOf("PLAINTEXT://kafka:9092")))))
.withBrokers(
listOf("PLAINTEXT://kafka:${KafkaContainer.KAFKA_NETWORK_PORT}")))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class KafkaAndWorkflowAPITest {
@Execution(ExecutionMode.CONCURRENT)
fun callWorkflowHandler(
@InjectAdminURI adminURI: URI,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT)
kafkaPort: Int,
@InjectClient ingressClient: Client
) = runTest {
// Create subscription
Expand Down Expand Up @@ -116,7 +117,8 @@ class KafkaAndWorkflowAPITest {
@Execution(ExecutionMode.CONCURRENT)
fun callSharedWorkflowHandler(
@InjectAdminURI adminURI: URI,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT)
kafkaPort: Int,
@InjectClient ingressClient: Client
) = runTest {
// Create subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class KafkaTracingTest {
@InjectClient ingressClient: Client,
@InjectAdminURI adminURI: URI,
@InjectContainerPort(hostName = JAEGER_HOSTNAME, port = JAEGER_QUERY_PORT) jaegerPort: Int,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.EXTERNAL_PORT) kafkaPort: Int,
@InjectContainerPort(hostName = "kafka", port = KafkaContainer.KAFKA_EXTERNAL_PORT)
kafkaPort: Int,
) = runTest {
Kafka.createKafkaSubscription(
adminURI, TOPIC, KafkaTracingTestCounterHandlers.Metadata.SERVICE_NAME, "set")
Expand Down
Loading