diff --git a/VERSION b/VERSION index b502146..fd2a018 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.0.2 +3.1.0 diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index a4ec7b5..a86a37c 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -85,6 +85,7 @@ class TopicConfig(BaseModel): name: str event_schema: Schema = Field(alias="schema") deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig()) + replicas: Optional[int] = Field(default=1) @field_validator("deduplication") @classmethod @@ -119,6 +120,13 @@ def validate_deduplication_id_field( return v + @field_validator("replicas") + @classmethod + def validate_replicas(cls, v: int) -> int: + if v < 1: + raise ValueError("Replicas must be at least 1") + return v + class KafkaConnectionParams(BaseModel): brokers: List[str] diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index be48ff0..bcda32e 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -25,6 +25,7 @@ def get_valid_pipeline_config() -> dict: { "consumer_group_initial_offset": "earliest", "name": "user_logins", + "replicas": 3, "schema": { "type": "json", "fields": [ @@ -52,6 +53,7 @@ def get_valid_pipeline_config() -> dict: { "consumer_group_initial_offset": "earliest", "name": "orders", + "replicas": 1, "schema": { "type": "json", "fields": [ diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py index 319dc12..f91f33a 100644 --- a/tests/test_models/test_topic.py +++ b/tests/test_models/test_topic.py @@ -71,3 +71,34 @@ def test_topic_config_deduplication_id_field_validation(self): ), ) assert config.deduplication.enabled is False + + def test_topic_config_replicas_validation(self): + """Test TopicConfig validation for replicas.""" + config = models.TopicConfig( + name="test-topic", + consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, + schema=models.Schema( + type=models.SchemaType.JSON, + fields=[ + models.SchemaField(name="name", type=models.KafkaDataType.STRING), + ], + ), + replicas=3, + ) + assert config.replicas == 3 + + with pytest.raises(ValueError) as exc_info: + models.TopicConfig( + name="test-topic", + consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, + schema=models.Schema( + type=models.SchemaType.JSON, + fields=[ + models.SchemaField( + name="name", type=models.KafkaDataType.STRING + ), + ], + ), + replicas=0, + ) + assert "Replicas must be at least 1" in str(exc_info.value)