From f92ea91d072f38a1707a206e0d09287649a22bed Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Sep 2025 15:48:51 +0200 Subject: [PATCH 1/3] add topic replicas parameter --- src/glassflow/etl/models/source.py | 8 ++++++++ tests/data/pipeline_configs.py | 2 ++ tests/test_models/test_topic.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 39 insertions(+) diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index a4ec7b5..5165a92 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -85,6 +85,7 @@ class TopicConfig(BaseModel): name: str event_schema: Schema = Field(alias="schema") deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig()) + replicas: Optional[int] = Field(default=1) @field_validator("deduplication") @classmethod @@ -118,6 +119,13 @@ def validate_deduplication_id_field( ) return v + + @field_validator("replicas") + @classmethod + def validate_replicas(cls, v: int) -> int: + if v < 1: + raise ValueError("Replicas must be at least 1") + return v class KafkaConnectionParams(BaseModel): diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index be48ff0..bcda32e 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -25,6 +25,7 @@ def get_valid_pipeline_config() -> dict: { "consumer_group_initial_offset": "earliest", "name": "user_logins", + "replicas": 3, "schema": { "type": "json", "fields": [ @@ -52,6 +53,7 @@ def get_valid_pipeline_config() -> dict: { "consumer_group_initial_offset": "earliest", "name": "orders", + "replicas": 1, "schema": { "type": "json", "fields": [ diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py index 319dc12..2c4e934 100644 --- a/tests/test_models/test_topic.py +++ b/tests/test_models/test_topic.py @@ -71,3 +71,32 @@ def test_topic_config_deduplication_id_field_validation(self): ), ) assert config.deduplication.enabled is False + + def test_topic_config_replicas_validation(self): + """Test TopicConfig validation for replicas.""" + config = models.TopicConfig( + name="test-topic", + consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, + schema=models.Schema( + type=models.SchemaType.JSON, + fields=[ + models.SchemaField(name="name", type=models.KafkaDataType.STRING), + ], + ), + replicas=3, + ) + assert config.replicas == 3 + + with pytest.raises(ValueError) as exc_info: + models.TopicConfig( + name="test-topic", + consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, + schema=models.Schema( + type=models.SchemaType.JSON, + fields=[ + models.SchemaField(name="name", type=models.KafkaDataType.STRING), + ], + ), + replicas=0, + ) + assert "Replicas must be at least 1" in str(exc_info.value) From 936c7562403b619f3b0924341b5f98d046b266ed Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 11 Sep 2025 15:50:48 +0200 Subject: [PATCH 2/3] chore: format files --- src/glassflow/etl/models/source.py | 2 +- tests/test_models/test_topic.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index 5165a92..a86a37c 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -119,7 +119,7 @@ def validate_deduplication_id_field( ) return v - + @field_validator("replicas") @classmethod def validate_replicas(cls, v: int) -> int: diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py index 2c4e934..f91f33a 100644 --- a/tests/test_models/test_topic.py +++ b/tests/test_models/test_topic.py @@ -94,7 +94,9 @@ def test_topic_config_replicas_validation(self): schema=models.Schema( type=models.SchemaType.JSON, fields=[ - models.SchemaField(name="name", type=models.KafkaDataType.STRING), + models.SchemaField( + name="name", type=models.KafkaDataType.STRING + ), ], ), replicas=0, From e63bdd986c9d9d276e1a9e9c2627878bfc0e6670 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 16 Sep 2025 08:11:04 +0000 Subject: [PATCH 3/3] chore: bump version to 3.1.0 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index b502146..fd2a018 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.0.2 +3.1.0