From a71744473ca58c73e8082ec2dc9fe18ded5ca0d1 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 10:22:43 +0100 Subject: [PATCH 1/8] upadat to use schema version 2 --- src/glassflow/etl/models/__init__.py | 15 +- src/glassflow/etl/models/filter.py | 27 +++ src/glassflow/etl/models/metadata.py | 7 + src/glassflow/etl/models/pipeline.py | 191 +++++++--------------- src/glassflow/etl/models/schema.py | 74 +++++++++ src/glassflow/etl/models/sink.py | 14 +- src/glassflow/etl/models/source.py | 76 +-------- src/glassflow/etl/pipeline.py | 7 +- tests/data/error_scenarios.py | 73 ++++++--- tests/data/pipeline_configs.py | 113 ++++++------- tests/data/valid_pipeline.json | 71 ++++---- tests/data/valid_pipeline.yaml | 77 +++++---- tests/test_models/test_config_update.py | 24 --- tests/test_models/test_data_types.py | 4 +- tests/test_models/test_join.py | 1 + tests/test_models/test_pipeline_config.py | 8 + tests/test_models/test_schema.py | 27 +++ tests/test_models/test_sink.py | 24 --- tests/test_models/test_topic.py | 171 ++++++++++++------- tests/test_pipeline.py | 12 +- 20 files changed, 504 insertions(+), 512 deletions(-) create mode 100644 src/glassflow/etl/models/filter.py create mode 100644 src/glassflow/etl/models/metadata.py create mode 100644 src/glassflow/etl/models/schema.py create mode 100644 tests/test_models/test_schema.py delete mode 100644 tests/test_models/test_sink.py diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py index 91c3333..7955940 100644 --- a/src/glassflow/etl/models/__init__.py +++ b/src/glassflow/etl/models/__init__.py @@ -1,5 +1,6 @@ from .config import GlassFlowConfig from .data_types import ClickhouseDataType, KafkaDataType +from .filter import FilterConfig, FilterConfigPatch from .join import ( JoinConfig, JoinConfigPatch, @@ -7,8 +8,10 @@ JoinSourceConfig, JoinType, ) +from .metadata import MetadataConfig from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus -from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping +from .schema import Schema, SchemaField +from .sink import SinkConfig, SinkConfigPatch, SinkType from .source import ( ConsumerGroupOffset, DeduplicationConfig, @@ -16,9 +19,7 @@ KafkaConnectionParams, KafkaConnectionParamsPatch, KafkaMechanism, - Schema, - SchemaField, - SchemaType, + KafkaProtocol, SourceConfig, SourceConfigPatch, SourceType, @@ -29,23 +30,25 @@ "ClickhouseDataType", "ConsumerGroupOffset", "DeduplicationConfig", + "FilterConfig", + "FilterConfigPatch", "KafkaConnectionParams", "KafkaDataType", "KafkaMechanism", + "KafkaProtocol", "JoinConfig", "JoinOrientation", "JoinSourceConfig", "JoinType", + "MetadataConfig", "PipelineConfig", "PipelineConfigPatch", "PipelineStatus", "SinkConfig", "SinkConfigPatch", "SinkType", - "TableMapping", "Schema", "SchemaField", - "SchemaType", "SourceConfig", "SourceType", "TopicConfig", diff --git a/src/glassflow/etl/models/filter.py b/src/glassflow/etl/models/filter.py new file mode 100644 index 0000000..70fa603 --- /dev/null +++ b/src/glassflow/etl/models/filter.py @@ -0,0 +1,27 @@ +from typing import Any, Optional + +from pydantic import BaseModel, Field + + +class FilterConfig(BaseModel): + enabled: bool = Field(default=False) + expression: Optional[str] = Field(default=None, description="The filter expression") + + def update(self, patch: "FilterConfigPatch") -> "FilterConfig": + """Apply a patch to this filter config.""" + update_dict: dict[str, Any] = {} + + # Check each field explicitly to handle model instances properly + if patch.enabled is not None: + update_dict["enabled"] = patch.enabled + if patch.expression is not None: + update_dict["expression"] = patch.expression + + if update_dict: + return self.model_copy(update=update_dict) + return self + + +class FilterConfigPatch(BaseModel): + enabled: Optional[bool] = Field(default=None) + expression: Optional[str] = Field(default=None) diff --git a/src/glassflow/etl/models/metadata.py b/src/glassflow/etl/models/metadata.py new file mode 100644 index 0000000..3feb431 --- /dev/null +++ b/src/glassflow/etl/models/metadata.py @@ -0,0 +1,7 @@ +from typing import List + +from pydantic import BaseModel, Field + + +class MetadataConfig(BaseModel): + tags: List[str] = Field(default=[]) diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index e02368c..cc94d88 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -1,12 +1,13 @@ import re -from typing import Any, Optional +from typing import Optional from pydantic import BaseModel, Field, field_validator, model_validator -from ..errors import InvalidDataTypeMappingError from .base import CaseInsensitiveStrEnum -from .data_types import kafka_to_clickhouse_data_type_mappings +from .filter import FilterConfig, FilterConfigPatch from .join import JoinConfig, JoinConfigPatch +from .metadata import MetadataConfig +from .schema import Schema from .sink import SinkConfig, SinkConfigPatch from .source import SourceConfig, SourceConfigPatch @@ -24,11 +25,15 @@ class PipelineStatus(CaseInsensitiveStrEnum): class PipelineConfig(BaseModel): + version: str = Field(default="2.0.0") pipeline_id: str name: Optional[str] = Field(default=None) source: SourceConfig join: Optional[JoinConfig] = Field(default=JoinConfig()) + filter: Optional[FilterConfig] = Field(default=FilterConfig()) + metadata: Optional[MetadataConfig] = Field(default=MetadataConfig()) sink: SinkConfig + schema: Schema @field_validator("pipeline_id") @classmethod @@ -48,147 +53,55 @@ def validate_pipeline_id(cls, v: str) -> str: return v @model_validator(mode="after") - def set_pipeline_name(self) -> "PipelineConfig": + def validate_config(self) -> "PipelineConfig": """ - If name is not provided, use the pipeline_id and replace hyphens - with spaces. + Set pipeline name if not provided and validate configuration. """ + # Set pipeline name if not provided if self.name is None: self.name = self.pipeline_id.replace("-", " ").title() - return self - @field_validator("join") - @classmethod - def validate_join_config( - cls, - v: Optional[JoinConfig], - info: Any, - ) -> Optional[JoinConfig]: - if not v or not v.enabled: - return v - - # Get the source topics from the parent model's data - source = info.data.get("source", {}) - if isinstance(source, dict): - source_topics = source.get("topics", []) - else: - source_topics = source.topics - if not source_topics: - return v - - # Validate each source in the join config - for source in v.sources: - # Check if source_id exists in any topic - source_exists = any( - topic.name == source.source_id for topic in source_topics - ) - if not source_exists: + # Validate schema + topic_names = [topic.name for topic in self.source.topics] + for field in self.schema.fields: + if field.source_id not in topic_names: raise ValueError( - f"Source ID '{source.source_id}' does not exist in any topic" + f"Source '{field.source_id}' does not exist in any topic" ) - # Find the topic and check if join_key exists in its schema - topic = next((t for t in source_topics if t.name == source.source_id), None) - if not topic: + # Validate deduplication ID fields + for topic in self.source.topics: + if topic.deduplication is None or not topic.deduplication.enabled: continue - field_exists = any( - field.name == source.join_key for field in topic.event_schema.fields - ) - if not field_exists: + if not self.schema.is_field_in_schema( + topic.deduplication.id_field, topic.name + ): raise ValueError( - f"Join key '{source.join_key}' does not exist in source " - f"'{source.source_id}' schema" + f"Deduplication id_field '{topic.deduplication.id_field}' " + f"not found in schema from source '{topic.name}'" ) - return v - - @field_validator("sink") - @classmethod - def validate_sink_config(cls, v: SinkConfig, info: Any) -> SinkConfig: - # Get the source topics from the parent model's data - source = info.data.get("source", {}) - if isinstance(source, dict): - source_topics = source.get("topics", []) - else: - source_topics = source.topics - if not source_topics: - return v - - # Validate each table mapping - for mapping in v.table_mapping: - # Check if source_id exists in any topic - source_exists = any( - topic.name == mapping.source_id for topic in source_topics - ) - if not source_exists: - raise ValueError( - f"Source ID '{mapping.source_id}' does not exist in any topic" - ) - - # Find the topic and check if field_name exists in its schema - topic = next( - (t for t in source_topics if t.name == mapping.source_id), None - ) - if not topic: - continue - - field_exists = any( - field.name == mapping.field_name for field in topic.event_schema.fields - ) - if not field_exists: - raise ValueError( - f"Field '{mapping.field_name}' does not exist in source " - f"'{mapping.source_id}' event schema" - ) - - return v - - @field_validator("sink") - @classmethod - def validate_data_type_compatibility(cls, v: SinkConfig, info: Any) -> SinkConfig: - # Get the source topics from the parent model's data - source = info.data.get("source", {}) - if isinstance(source, dict): - source_topics = source.get("topics", []) - else: - source_topics = source.topics - if not source_topics: - return v - - # Validate each table mapping - for mapping in v.table_mapping: - # Find the topic - topic = next( - (t for t in source_topics if t.name == mapping.source_id), None - ) - if not topic: - continue - - # Find the source field - source_field = next( - (f for f in topic.event_schema.fields if f.name == mapping.field_name), - None, - ) - if not source_field: - continue - - # Get the source and target data types - source_type = source_field.type - target_type = mapping.column_type + # Validate join configuration + if self.join and self.join.enabled: + # Validate each source in the join config + for join_source in self.join.sources: + if join_source.source_id not in topic_names: + raise ValueError( + f"Join source '{join_source.source_id}' does not exist in any " + "topic" + ) + + if not self.schema.is_field_in_schema( + join_source.join_key, + join_source.source_id, + ): + raise ValueError( + f"Join key '{join_source.join_key}' does not exist in source " + f"'{join_source.source_id}' schema" + ) - # Check if the target type is compatible with the source type - compatible_types = kafka_to_clickhouse_data_type_mappings.get( - source_type, [] - ) - if target_type not in compatible_types: - raise InvalidDataTypeMappingError( - f"Data type '{target_type}' is not compatible with source type " - f"'{source_type}' for field '{mapping.field_name}' in source " - f"'{mapping.source_id}'" - ) - - return v + return self def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": """ @@ -217,15 +130,31 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": config_patch.join ) + # Update filter if provided + if config_patch.filter is not None: + updated_config.filter = (updated_config.filter or FilterConfig()).update( + config_patch.filter + ) + # Update sink if provided if config_patch.sink is not None: updated_config.sink = updated_config.sink.update(config_patch.sink) + # Update schema if provided + if config_patch.schema is not None: + updated_config.schema = config_patch.schema + + if config_patch.metadata is not None: + updated_config.metadata = config_patch.metadata + return updated_config class PipelineConfigPatch(BaseModel): name: Optional[str] = Field(default=None) - source: Optional[SourceConfigPatch] = Field(default=None) join: Optional[JoinConfigPatch] = Field(default=None) + filter: Optional[FilterConfigPatch] = Field(default=None) + metadata: Optional[MetadataConfig] = Field(default=None) + schema: Optional[Schema] = Field(default=None) sink: Optional[SinkConfigPatch] = Field(default=None) + source: Optional[SourceConfigPatch] = Field(default=None) diff --git a/src/glassflow/etl/models/schema.py b/src/glassflow/etl/models/schema.py new file mode 100644 index 0000000..ad8a233 --- /dev/null +++ b/src/glassflow/etl/models/schema.py @@ -0,0 +1,74 @@ +from typing import List, Optional + +from pydantic import BaseModel, Field, model_validator + +from ..errors import InvalidDataTypeMappingError +from .data_types import ( + ClickhouseDataType, + KafkaDataType, + kafka_to_clickhouse_data_type_mappings, +) + + +class SchemaField(BaseModel): + column_name: Optional[str] = Field( + default=None, + description="The name of the column in the sink table. If not provided, " + "the name of the source field will not be mapped to the sink table.", + ) + column_type: Optional[ClickhouseDataType] = Field( + default=None, + description="The data type of the column in the sink table. If not provided, " + "the data type of the source field will not be mapped to the sink table.", + ) + name: str = Field(description="The name of the source field") + source_id: Optional[str] = Field(description="The name of the source topic") + type: KafkaDataType = Field(description="The data type of the source field") + + @model_validator(mode="after") + def validate_schema_field(self) -> "SchemaField": + # Validate not mapped fields + if (self.column_name is None and self.column_type is not None) or ( + self.column_name is not None and self.column_type is None + ): + raise ValueError( + "column_name and column_type must both be provided or both be None" + ) + + # Validate column type compatibility + if self.column_type is not None: + compatible_types = kafka_to_clickhouse_data_type_mappings.get(self.type, []) + if self.column_type not in compatible_types: + raise InvalidDataTypeMappingError( + f"Data type '{self.column_type}' is not compatible with source type" + f" '{self.type}' for field '{self.name}' in source" + f" '{self.source_id}'" + ) + return self + + +class Schema(BaseModel): + fields: List[SchemaField] + + def is_field_in_schema( + self, + field_name: str, + source_id: Optional[str] = None, + ) -> bool: + """ + Check if a field exists in the schema. If source_id is provided, check if the + field exists in the specified source. + + Args: + field_name: The name of the field to check + source_id: The ID of the source to check the field in + + Returns: + True if the field exists in the schema, False otherwise + """ + for field in self.fields: + if field.name == field_name and ( + source_id is None or field.source_id == source_id + ): + return True + return False diff --git a/src/glassflow/etl/models/sink.py b/src/glassflow/etl/models/sink.py index db7cf49..36a3be2 100644 --- a/src/glassflow/etl/models/sink.py +++ b/src/glassflow/etl/models/sink.py @@ -1,16 +1,8 @@ -from typing import Any, List, Optional +from typing import Any, Optional from pydantic import BaseModel, Field from .base import CaseInsensitiveStrEnum -from .data_types import ClickhouseDataType - - -class TableMapping(BaseModel): - source_id: str - field_name: str - column_name: str - column_type: ClickhouseDataType class SinkType(CaseInsensitiveStrEnum): @@ -31,7 +23,6 @@ class SinkConfig(BaseModel): max_batch_size: int = Field(default=1000) max_delay_time: str = Field(default="10m") table: str - table_mapping: List[TableMapping] def update(self, patch: "SinkConfigPatch") -> "SinkConfig": """Apply a patch to this sink config.""" @@ -64,8 +55,6 @@ def update(self, patch: "SinkConfigPatch") -> "SinkConfig": update_dict["max_delay_time"] = patch.max_delay_time if patch.table is not None: update_dict["table"] = patch.table - if patch.table_mapping is not None: - update_dict["table_mapping"] = patch.table_mapping if update_dict: return self.model_copy(update=update_dict) @@ -85,4 +74,3 @@ class SinkConfigPatch(BaseModel): max_batch_size: Optional[int] = Field(default=None) max_delay_time: Optional[str] = Field(default=None) table: Optional[str] = Field(default=None) - table_mapping: Optional[List[TableMapping]] = Field(default=None) diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index d991d83..dbeeb8d 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -1,7 +1,6 @@ -import warnings from typing import Any, List, Optional -from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator +from pydantic import BaseModel, Field, field_validator, model_validator from .base import CaseInsensitiveStrEnum from .data_types import KafkaDataType @@ -22,20 +21,6 @@ class KafkaMechanism(CaseInsensitiveStrEnum): NO_AUTH = "NO_AUTH" -class SchemaField(BaseModel): - name: str - type: KafkaDataType - - -class SchemaType(CaseInsensitiveStrEnum): - JSON = "json" - - -class Schema(BaseModel): - type: SchemaType = SchemaType.JSON - fields: List[SchemaField] - - class DeduplicationConfig(BaseModel): enabled: bool = False id_field: Optional[str] = Field(default=None) @@ -109,43 +94,9 @@ class ConsumerGroupOffset(CaseInsensitiveStrEnum): class TopicConfig(BaseModel): consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST name: str - event_schema: Schema = Field(alias="schema") deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig()) replicas: Optional[int] = Field(default=1) - @field_validator("deduplication") - @classmethod - def validate_deduplication_id_field( - cls, v: DeduplicationConfig, info: ValidationInfo - ) -> DeduplicationConfig: - """ - Validate that the deduplication ID field exists in the - schema and has matching type. - """ - if v is None or not v.enabled: - return v - - # Skip validation if id_field is empty when deduplication is disabled - if not v.id_field or v.id_field == "": - return v - - # Get the schema from the parent model's data - schema = info.data.get("event_schema", {}) - if isinstance(schema, dict): - fields = schema.get("fields", []) - else: - fields = schema.fields - - # Find the field in the schema - field = next((f for f in fields if f.name == v.id_field), None) - if not field: - raise ValueError( - f"Deduplication ID field '{v.id_field}' does not exist in " - "the event schema" - ) - - return v - @field_validator("replicas") @classmethod def validate_replicas(cls, v: int) -> int: @@ -165,12 +116,6 @@ class KafkaConnectionParams(BaseModel): kerberos_keytab: Optional[str] = Field(default=None) kerberos_realm: Optional[str] = Field(default=None) kerberos_config: Optional[str] = Field(default=None) - skip_auth: bool = Field( - default=False, - exclude=True, - description='skip_auth is deprecated, use mechanism "NO_AUTH" instead or \ - set skip_tls_verification to True if you want to skip TLS verification', - ) skip_tls_verification: bool = Field(default=False) @model_validator(mode="before") @@ -186,23 +131,6 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams" merged_dict = {**current_dict, **patch_dict} return KafkaConnectionParams.model_validate(merged_dict) - @model_validator(mode="after") - def correct_skip_auth(self) -> "KafkaConnectionParams": - """ - While the skip_auth field is deprecated, - we need to make sure the mechanism is NO_AUTH if skip_auth is True - """ - if self.skip_auth: - if self.mechanism is not None and self.mechanism != KafkaMechanism.NO_AUTH: - warnings.warn( - f"Mechanism is set to {self.mechanism}, but skip_auth is True. \ - Setting mechanism to NO_AUTH.", - category=RuntimeWarning, - stacklevel=1, - ) - self.mechanism = KafkaMechanism.NO_AUTH - return self - class SourceType(CaseInsensitiveStrEnum): KAFKA = "kafka" @@ -256,7 +184,7 @@ class KafkaConnectionParamsPatch(BaseModel): kerberos_keytab: Optional[str] = Field(default=None) kerberos_realm: Optional[str] = Field(default=None) kerberos_config: Optional[str] = Field(default=None) - skip_auth: Optional[bool] = Field(default=None) + skip_tls_verification: Optional[bool] = Field(default=None) class SourceConfigPatch(BaseModel): diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index f4eb909..4409afd 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -357,7 +357,7 @@ def _tracking_info(self) -> dict[str, Any]: # Extract join info join_enabled = getattr(self.config.join, "enabled", False) - + filter_enabled = getattr(self.config.filter, "enabled", False) # Extract deduplication info deduplication_enabled = any( t.deduplication and t.deduplication.enabled @@ -368,18 +368,19 @@ def _tracking_info(self) -> dict[str, Any]: conn_params = self.config.source.connection_params root_ca_provided = conn_params.root_ca is not None - skip_auth = conn_params.skip_auth + skip_tls_verification = conn_params.skip_tls_verification protocol = str(conn_params.protocol) mechanism = str(conn_params.mechanism) return { "pipeline_id": self.config.pipeline_id, "join_enabled": join_enabled, + "filter_enabled": filter_enabled, "deduplication_enabled": deduplication_enabled, "source_auth_method": mechanism, "source_security_protocol": protocol, "source_root_ca_provided": root_ca_provided, - "source_skip_auth": skip_auth, + "source_skip_tls_verification": skip_tls_verification, } def _track_event(self, event_name: str, **kwargs: Any) -> None: diff --git a/tests/data/error_scenarios.py b/tests/data/error_scenarios.py index 715adca..b5484e9 100644 --- a/tests/data/error_scenarios.py +++ b/tests/data/error_scenarios.py @@ -287,44 +287,65 @@ def get_join_with_invalid_type(valid_config): ] -def get_sink_validation_error_scenarios(): - """Get sink validation error test scenarios.""" +def get_schema_validation_error_scenarios(): + """Get schema validation error test scenarios.""" - def get_sink_with_source_id_not_found(valid_config): - sink = valid_config["sink"] - sink["table_mapping"] = [ - models.TableMapping( - source_id="non-existent-topic", - field_name="id", - column_name="id", - column_type="String", - ) + def get_schema_with_source_id_not_found(valid_config): + schema = valid_config["schema"] + schema["fields"] = schema["fields"] + [ + { + "source_id": "non-existent-topic", + "name": "id", + "type": "string", + "column_name": "id", + "column_type": "String", + } ] - return sink + return schema - def get_sink_with_field_name_not_found(valid_config): - sink = valid_config["sink"] - sink["table_mapping"] = [ - models.TableMapping( - source_id=valid_config["source"]["topics"][0]["name"], - field_name="non-existent-field", - column_name="id", - column_type="String", - ) + def get_schema_with_missing_column_name(valid_config): + schema = valid_config["schema"] + schema["fields"] = schema["fields"] + [ + { + "source_id": valid_config["source"]["topics"][0]["name"], + "name": "id", + "type": "string", + "column_type": "String", + } ] - return sink + return schema + + def get_schema_with_missing_column_type(valid_config): + schema = valid_config["schema"] + schema["fields"] = schema["fields"] + [ + { + "source_id": valid_config["source"]["topics"][0]["name"], + "name": "id", + "type": "string", + "column_name": "id", + } + ] + return schema return [ { "name": "source_id_not_found", - "sink": get_sink_with_source_id_not_found, + "schema": get_schema_with_source_id_not_found, "expected_error": ValueError, "error_message": "does not exist in any topic", }, { - "name": "field_name_not_found", - "sink": get_sink_with_field_name_not_found, + "name": "missing_column_name", + "schema": get_schema_with_missing_column_name, "expected_error": ValueError, - "error_message": "does not exist in source", + "error_message": "column_name and column_type must both be provided or both" + " be None", + }, + { + "name": "missing_column_type", + "schema": get_schema_with_missing_column_type, + "expected_error": ValueError, + "error_message": "column_name and column_type must both be provided or both" + " be None", }, ] diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index d77d6a3..d9c2f87 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -26,23 +26,6 @@ def get_valid_pipeline_config() -> dict: "consumer_group_initial_offset": "earliest", "name": "user_logins", "replicas": 3, - "schema": { - "type": "json", - "fields": [ - { - "name": "session_id", - "type": "string", - }, - { - "name": "user_id", - "type": "string", - }, - { - "name": "timestamp", - "type": "String", - }, - ], - }, "deduplication": { "enabled": True, "id_field": "session_id", @@ -54,23 +37,6 @@ def get_valid_pipeline_config() -> dict: "consumer_group_initial_offset": "earliest", "name": "orders", "replicas": 1, - "schema": { - "type": "json", - "fields": [ - { - "name": "user_id", - "type": "string", - }, - { - "name": "order_id", - "type": "string", - }, - { - "name": "timestamp", - "type": "string", - }, - ], - }, "deduplication": { "enabled": True, "id_field": "order_id", @@ -98,6 +64,10 @@ def get_valid_pipeline_config() -> dict: }, ], }, + "filter": { + "enabled": True, + "expression": "user_id = '123'", + }, "sink": { "type": "clickhouse", "provider": "aiven", @@ -110,37 +80,56 @@ def get_valid_pipeline_config() -> dict: "secure": True, "max_batch_size": 1, "table": "user_orders", - "table_mapping": [ + }, + "schema": { + "fields": [ { "source_id": "user_logins", - "field_name": "session_id", + "name": "session_id", + "type": "string", "column_name": "session_id", - "column_type": "string", + "column_type": "String", }, { "source_id": "user_logins", - "field_name": "user_id", + "name": "user_id", + "type": "string", "column_name": "user_id", - "column_type": "STRING", + "column_type": "String", }, { "source_id": "orders", - "field_name": "order_id", + "name": "order_id", + "type": "string", "column_name": "order_id", - "column_type": "string", + "column_type": "String", + }, + { + "source_id": "orders", + "name": "user_id", + "type": "string", + "column_name": "user_id", + "column_type": "String", }, { "source_id": "user_logins", - "field_name": "timestamp", + "name": "timestamp", + "type": "string", "column_name": "login_at", "column_type": "DateTime", }, { "source_id": "orders", - "field_name": "timestamp", + "name": "timestamp", + "type": "string", "column_name": "order_placed_at", "column_type": "DateTime", }, + { + "source_id": "orders", + "name": "skip_sink_field", + "type": "string", + }, ], }, } @@ -168,23 +157,6 @@ def get_valid_config_without_joins() -> dict: { "consumer_group_initial_offset": "earliest", "name": "user_logins", - "schema": { - "type": "json", - "fields": [ - { - "name": "session_id", - "type": "string", - }, - { - "name": "user_id", - "type": "string", - }, - { - "name": "timestamp", - "type": "String", - }, - ], - }, "deduplication": { "enabled": True, "id_field": "session_id", @@ -206,22 +178,27 @@ def get_valid_config_without_joins() -> dict: "secure": True, "max_batch_size": 1, "table": "user_orders", - "table_mapping": [ + }, + "schema": { + "fields": [ { "source_id": "user_logins", - "field_name": "session_id", + "name": "session_id", + "type": "string", "column_name": "session_id", - "column_type": "string", + "column_type": "String", }, { "source_id": "user_logins", - "field_name": "user_id", + "name": "user_id", + "type": "string", "column_name": "user_id", - "column_type": "STRING", + "column_type": "String", }, { "source_id": "user_logins", - "field_name": "timestamp", + "name": "timestamp", + "type": "string", "column_name": "login_at", "column_type": "DateTime", }, @@ -269,7 +246,9 @@ def get_invalid_config() -> dict: "username": "default", "password": "pass", "table": "test_table", - "table_mapping": [], # Empty table mapping should trigger validation error + }, + "schema": { + "fields": [], # Empty schema fields should trigger validation error }, } diff --git a/tests/data/valid_pipeline.json b/tests/data/valid_pipeline.json index ee8d7d7..57559a1 100644 --- a/tests/data/valid_pipeline.json +++ b/tests/data/valid_pipeline.json @@ -20,23 +20,6 @@ { "consumer_group_initial_offset": "earliest", "name": "user_logins", - "schema": { - "type": "json", - "fields": [ - { - "name": "session_id", - "type": "string" - }, - { - "name": "user_id", - "type": "string" - }, - { - "name": "timestamp", - "type": "string" - } - ] - }, "deduplication": { "enabled": true, "id_field": "session_id", @@ -47,23 +30,6 @@ { "consumer_group_initial_offset": "earliest", "name": "orders", - "schema": { - "type": "json", - "fields": [ - { - "name": "user_id", - "type": "string" - }, - { - "name": "order_id", - "type": "string" - }, - { - "name": "timestamp", - "type": "string" - } - ] - }, "deduplication": { "enabled": true, "id_field": "order_id", @@ -91,6 +57,10 @@ } ] }, + "filter": { + "enabled": true, + "expression": "user_id = '123'" + }, "sink": { "type": "clickhouse", "provider": "aiven", @@ -103,37 +73,56 @@ "secure": true, "max_batch_size": 1, "max_delay_time": "10m", - "table": "user_orders", - "table_mapping": [ + "table": "user_orders" + }, + "schema": { + "fields": [ { "source_id": "user_logins", - "field_name": "session_id", + "name": "session_id", + "type": "string", "column_name": "session_id", "column_type": "String" }, { "source_id": "user_logins", - "field_name": "user_id", + "name": "user_id", + "type": "string", "column_name": "user_id", "column_type": "String" }, { "source_id": "orders", - "field_name": "order_id", + "name": "order_id", + "type": "string", "column_name": "order_id", "column_type": "String" }, + { + "source_id": "orders", + "name": "user_id", + "type": "string", + "column_name": "user_id", + "column_type": "String" + }, { "source_id": "user_logins", - "field_name": "timestamp", + "name": "timestamp", + "type": "string", "column_name": "login_at", "column_type": "DateTime" }, { "source_id": "orders", - "field_name": "timestamp", + "name": "timestamp", + "type": "string", "column_name": "order_placed_at", "column_type": "DateTime" + }, + { + "source_id": "orders", + "name": "skip_sink_field", + "type": "string" } ] } diff --git a/tests/data/valid_pipeline.yaml b/tests/data/valid_pipeline.yaml index bff8106..b19e662 100644 --- a/tests/data/valid_pipeline.yaml +++ b/tests/data/valid_pipeline.yaml @@ -12,6 +12,9 @@ join: type: temporal name: Test Pipeline pipeline_id: test-pipeline +filter: + enabled: true + expression: user_id = '123' sink: database: default host: @@ -23,27 +26,6 @@ sink: provider: aiven secure: true table: user_orders - table_mapping: - - column_name: session_id - column_type: String - field_name: session_id - source_id: user_logins - - column_name: user_id - column_type: String - field_name: user_id - source_id: user_logins - - column_name: order_id - column_type: String - field_name: order_id - source_id: orders - - column_name: login_at - column_type: DateTime - field_name: timestamp - source_id: user_logins - - column_name: order_placed_at - column_type: DateTime - field_name: timestamp - source_id: orders type: clickhouse username: source: @@ -66,15 +48,6 @@ source: id_field_type: string time_window: 12h name: user_logins - schema: - fields: - - name: session_id - type: string - - name: user_id - type: string - - name: timestamp - type: string - type: json - consumer_group_initial_offset: earliest deduplication: enabled: true @@ -82,13 +55,39 @@ source: id_field_type: string time_window: 12h name: orders - schema: - fields: - - name: user_id - type: string - - name: order_id - type: string - - name: timestamp - type: string - type: json type: kafka +schema: + fields: + - column_name: session_id + column_type: String + name: session_id + source_id: user_logins + type: string + - column_name: user_id + column_type: String + name: user_id + source_id: user_logins + type: string + - column_name: order_id + column_type: String + name: order_id + source_id: orders + type: string + - column_name: user_id + column_type: String + name: user_id + source_id: orders + type: string + - column_name: login_at + column_type: DateTime + name: timestamp + source_id: user_logins + type: string + - column_name: order_placed_at + column_type: DateTime + name: timestamp + source_id: orders + type: string + - name: skip_sink_field + source_id: orders + type: string \ No newline at end of file diff --git a/tests/test_models/test_config_update.py b/tests/test_models/test_config_update.py index 951d3a5..0052ead 100644 --- a/tests/test_models/test_config_update.py +++ b/tests/test_models/test_config_update.py @@ -177,12 +177,6 @@ def test_update_topics(self, valid_config): source = models.SourceConfig(**valid_config["source"]) new_topic = models.TopicConfig( name="new-topic", - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField(name="id", type=models.KafkaDataType.STRING) - ], - ), ) patch = models.SourceConfigPatch(topics=[new_topic]) @@ -231,24 +225,6 @@ def test_update_credentials(self, valid_config): assert updated.password == "new-password" assert updated.host == sink.host - def test_update_table_mapping(self, valid_config): - """Test updating table mapping.""" - sink = models.SinkConfig(**valid_config["sink"]) - new_mapping = [ - models.TableMapping( - source_id="test", - field_name="id", - column_name="id", - column_type=models.ClickhouseDataType.STRING, - ) - ] - patch = models.SinkConfigPatch(table_mapping=new_mapping) - - updated = sink.update(patch) - - assert len(updated.table_mapping) == 1 - assert updated.table_mapping[0].source_id == "test" - def test_update_multiple_sink_fields(self, valid_config): """Test updating multiple sink fields at once.""" sink = models.SinkConfig(**valid_config["sink"]) diff --git a/tests/test_models/test_data_types.py b/tests/test_models/test_data_types.py index a3c86d5..4d8dc56 100644 --- a/tests/test_models/test_data_types.py +++ b/tests/test_models/test_data_types.py @@ -8,8 +8,8 @@ class TestDataTypeCompatibility: def test_validate_data_type_compatibility_invalid_mapping(self, valid_config): """Test data type compatibility validation with invalid type mappings.""" - # Modify the sink configuration to have an invalid type mapping - valid_config["sink"]["table_mapping"][0]["column_type"] = ( + # Modify the schema to have an invalid type mapping + valid_config["schema"]["fields"][0]["column_type"] = ( models.ClickhouseDataType.INT32 ) diff --git a/tests/test_models/test_join.py b/tests/test_models/test_join.py index 3d4ea99..c6c6bbd 100644 --- a/tests/test_models/test_join.py +++ b/tests/test_models/test_join.py @@ -57,5 +57,6 @@ def test_join_validation_error_scenarios(self, valid_config, scenario): source=valid_config["source"], join=scenario["join"](valid_config), sink=valid_config["sink"], + schema=valid_config["schema"], ) assert scenario["error_message"] in str(exc_info.value) diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py index 08877df..6a5d221 100644 --- a/tests/test_models/test_pipeline_config.py +++ b/tests/test_models/test_pipeline_config.py @@ -26,6 +26,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert config.pipeline_id == "test-pipeline-123a" @@ -36,6 +37,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert "pipeline_id cannot be empty" in str(exc_info.value) @@ -45,6 +47,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert ( "pipeline_id can only contain lowercase letters, numbers, and hyphens" @@ -57,6 +60,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert "pipeline_id cannot be longer than 40 characters" in str(exc_info.value) @@ -66,6 +70,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert "pipeline_id must start with a lowercase alphanumeric" in str( exc_info.value @@ -77,6 +82,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert "pipeline_id must end with a lowercase alphanumeric" in str( exc_info.value @@ -90,6 +96,7 @@ def test_pipeline_config_pipeline_name_provided(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert config.pipeline_id == "test-pipeline" assert config.name == "My Custom Pipeline Name" @@ -101,6 +108,7 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config): source=valid_config["source"], join=valid_config["join"], sink=valid_config["sink"], + schema=valid_config["schema"], ) assert config.pipeline_id == "test-pipeline" assert config.name == "Test Pipeline" diff --git a/tests/test_models/test_schema.py b/tests/test_models/test_schema.py new file mode 100644 index 0000000..550b024 --- /dev/null +++ b/tests/test_models/test_schema.py @@ -0,0 +1,27 @@ +import pytest + +from glassflow.etl import models +from tests.data import error_scenarios + + +class TestSchemaConfig: + """Tests for SchemaConfig validation.""" + + @pytest.mark.parametrize( + "scenario", + error_scenarios.get_schema_validation_error_scenarios(), + ids=lambda s: s["name"], + ) + def test_schema_validation_error_scenarios(self, valid_config, scenario): + """Test schema validation with various error scenarios.""" + + with pytest.raises(scenario["expected_error"]) as exc_info: + models.PipelineConfig( + pipeline_id="test-pipeline", + source=valid_config["source"], + sink=valid_config["sink"], + schema=scenario["schema"](valid_config), + filter=valid_config["filter"], + join=valid_config["join"], + ) + assert scenario["error_message"] in str(exc_info.value) diff --git a/tests/test_models/test_sink.py b/tests/test_models/test_sink.py deleted file mode 100644 index 77d8d06..0000000 --- a/tests/test_models/test_sink.py +++ /dev/null @@ -1,24 +0,0 @@ -import pytest - -from glassflow.etl import models -from tests.data import error_scenarios - - -class TestSinkConfig: - """Tests for SinkConfig validation.""" - - @pytest.mark.parametrize( - "scenario", - error_scenarios.get_sink_validation_error_scenarios(), - ids=lambda s: s["name"], - ) - def test_sink_validation_error_scenarios(self, valid_config, scenario): - """Test sink validation with various error scenarios.""" - - with pytest.raises(scenario["expected_error"]) as exc_info: - models.PipelineConfig( - pipeline_id="test-pipeline", - source=valid_config["source"], - sink=scenario["sink"](valid_config), - ) - assert scenario["error_message"] in str(exc_info.value) diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py index f91f33a..2ed1ed7 100644 --- a/tests/test_models/test_topic.py +++ b/tests/test_models/test_topic.py @@ -4,21 +4,13 @@ class TestTopicConfig: - """Tests for TopicConfig deduplication validation.""" + """Tests for TopicConfig.""" - def test_topic_config_deduplication_id_field_validation(self): - """Test TopicConfig validation for deduplication ID field.""" - # Test with valid configuration + def test_topic_config_creation(self): + """Test TopicConfig creation.""" config = models.TopicConfig( name="test-topic", consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField(name="id", type=models.KafkaDataType.STRING), - models.SchemaField(name="name", type=models.KafkaDataType.STRING), - ], - ), deduplication=models.DeduplicationConfig( enabled=True, id_field="id", @@ -30,44 +22,13 @@ def test_topic_config_deduplication_id_field_validation(self): assert config.deduplication.id_field == "id" assert config.deduplication.id_field_type == models.KafkaDataType.STRING - # Test with non-existent ID field - with pytest.raises(ValueError) as exc_info: - models.TopicConfig( - name="test-topic", - consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField( - name="name", - type=models.KafkaDataType.STRING, - ), - ], - ), - deduplication=models.DeduplicationConfig( - enabled=True, - id_field="non-existent-field", - id_field_type=models.KafkaDataType.STRING, - time_window="1h", - ), - ) - assert "does not exist in the event schema" in str(exc_info.value) - - # Test with disabled deduplication (should not validate) + def test_topic_config_with_disabled_deduplication(self): + """Test TopicConfig with disabled deduplication.""" config = models.TopicConfig( name="test-topic", consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField(name="name", type=models.KafkaDataType.STRING), - ], - ), deduplication=models.DeduplicationConfig( enabled=False, - id_field="non-existent-field", - id_field_type=models.KafkaDataType.STRING, - time_window="1h", ), ) assert config.deduplication.enabled is False @@ -77,12 +38,6 @@ def test_topic_config_replicas_validation(self): config = models.TopicConfig( name="test-topic", consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField(name="name", type=models.KafkaDataType.STRING), - ], - ), replicas=3, ) assert config.replicas == 3 @@ -91,14 +46,114 @@ def test_topic_config_replicas_validation(self): models.TopicConfig( name="test-topic", consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST, - schema=models.Schema( - type=models.SchemaType.JSON, - fields=[ - models.SchemaField( - name="name", type=models.KafkaDataType.STRING - ), - ], - ), replicas=0, ) assert "Replicas must be at least 1" in str(exc_info.value) + + +class TestPipelineConfigDeduplicationValidation: + """Tests for PipelineConfig deduplication ID field validation.""" + + def test_deduplication_id_field_validation_valid(self): + """ + Test PipelineConfig validation for deduplication ID field with valid field. + """ + schema = models.Schema( + fields=[ + models.SchemaField( + source_id="test-topic", + name="id", + type=models.KafkaDataType.STRING, + column_name="id", + column_type=models.ClickhouseDataType.STRING, + ), + ] + ) + source = models.SourceConfig( + type=models.SourceType.KAFKA, + connection_params=models.KafkaConnectionParams( + brokers=["localhost:9092"], + protocol=models.KafkaProtocol.PLAINTEXT, + ), + topics=[ + models.TopicConfig( + name="test-topic", + deduplication=models.DeduplicationConfig( + enabled=True, + id_field="id", + id_field_type=models.KafkaDataType.STRING, + time_window="1h", + ), + ), + ], + ) + sink = models.SinkConfig( + type=models.SinkType.CLICKHOUSE, + host="localhost", + port="9000", + database="test", + username="default", + password="", + table="test_table", + ) + + # Should not raise an error + config = models.PipelineConfig( + pipeline_id="test-pipeline", + source=source, + sink=sink, + schema=schema, + ) + assert config.pipeline_id == "test-pipeline" + + def test_deduplication_id_field_validation_invalid(self): + """ + Test PipelineConfig validation for deduplication ID field with invalid field. + """ + schema = models.Schema( + fields=[ + models.SchemaField( + source_id="test-topic", + name="name", + type=models.KafkaDataType.STRING, + column_name="name", + column_type=models.ClickhouseDataType.STRING, + ), + ] + ) + source = models.SourceConfig( + type=models.SourceType.KAFKA, + connection_params=models.KafkaConnectionParams( + brokers=["localhost:9092"], + protocol=models.KafkaProtocol.PLAINTEXT, + ), + topics=[ + models.TopicConfig( + name="test-topic", + deduplication=models.DeduplicationConfig( + enabled=True, + id_field="non-existent-field", + id_field_type=models.KafkaDataType.STRING, + time_window="1h", + ), + ), + ], + ) + sink = models.SinkConfig( + type=models.SinkType.CLICKHOUSE, + host="localhost", + port="9000", + database="test", + username="default", + password="", + table="test_table", + ) + + with pytest.raises(ValueError) as exc_info: + models.PipelineConfig( + pipeline_id="test-pipeline", + source=source, + sink=sink, + schema=schema, + ) + assert "not found in schema" in str(exc_info.value) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 256be77..fe602a1 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -315,11 +315,12 @@ def test_tracking_info( assert pipeline._tracking_info() == { "pipeline_id": valid_config["pipeline_id"], "join_enabled": True, + "filter_enabled": True, "deduplication_enabled": True, "source_auth_method": "SCRAM-SHA-256", "source_security_protocol": "SASL_SSL", "source_root_ca_provided": True, - "source_skip_auth": False, + "source_skip_tls_verification": False, } pipeline = Pipeline( @@ -329,11 +330,12 @@ def test_tracking_info( assert pipeline._tracking_info() == { "pipeline_id": valid_config_with_dedup_disabled["pipeline_id"], "join_enabled": True, + "filter_enabled": True, "deduplication_enabled": False, "source_auth_method": "SCRAM-SHA-256", "source_security_protocol": "SASL_SSL", "source_root_ca_provided": True, - "source_skip_auth": False, + "source_skip_tls_verification": False, } pipeline = Pipeline( @@ -342,11 +344,12 @@ def test_tracking_info( assert pipeline._tracking_info() == { "pipeline_id": valid_config_without_joins["pipeline_id"], "join_enabled": False, + "filter_enabled": False, "deduplication_enabled": True, "source_auth_method": "SCRAM-SHA-256", "source_security_protocol": "SASL_SSL", "source_root_ca_provided": True, - "source_skip_auth": False, + "source_skip_tls_verification": False, } pipeline = Pipeline( @@ -357,11 +360,12 @@ def test_tracking_info( assert pipeline._tracking_info() == { "pipeline_id": pipeline_id, "join_enabled": False, + "filter_enabled": False, "deduplication_enabled": False, "source_auth_method": "SCRAM-SHA-256", "source_security_protocol": "SASL_SSL", "source_root_ca_provided": True, - "source_skip_auth": False, + "source_skip_tls_verification": False, } From c2794b21d673614ea69e289ca8adcba0a103ba8f Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 11:56:35 +0100 Subject: [PATCH 2/8] rename schema to avoid collisions --- src/glassflow/etl/models/pipeline.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index cc94d88..9f31b0a 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -33,7 +33,7 @@ class PipelineConfig(BaseModel): filter: Optional[FilterConfig] = Field(default=FilterConfig()) metadata: Optional[MetadataConfig] = Field(default=MetadataConfig()) sink: SinkConfig - schema: Schema + pipeline_schema: Schema = Field(alias="schema") @field_validator("pipeline_id") @classmethod @@ -63,7 +63,7 @@ def validate_config(self) -> "PipelineConfig": # Validate schema topic_names = [topic.name for topic in self.source.topics] - for field in self.schema.fields: + for field in self.pipeline_schema.fields: if field.source_id not in topic_names: raise ValueError( f"Source '{field.source_id}' does not exist in any topic" @@ -74,7 +74,7 @@ def validate_config(self) -> "PipelineConfig": if topic.deduplication is None or not topic.deduplication.enabled: continue - if not self.schema.is_field_in_schema( + if not self.pipeline_schema.is_field_in_schema( topic.deduplication.id_field, topic.name ): raise ValueError( @@ -92,7 +92,7 @@ def validate_config(self) -> "PipelineConfig": "topic" ) - if not self.schema.is_field_in_schema( + if not self.pipeline_schema.is_field_in_schema( join_source.join_key, join_source.source_id, ): @@ -141,8 +141,8 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig": updated_config.sink = updated_config.sink.update(config_patch.sink) # Update schema if provided - if config_patch.schema is not None: - updated_config.schema = config_patch.schema + if config_patch.pipeline_schema is not None: + updated_config.pipeline_schema = config_patch.pipeline_schema if config_patch.metadata is not None: updated_config.metadata = config_patch.metadata @@ -155,6 +155,6 @@ class PipelineConfigPatch(BaseModel): join: Optional[JoinConfigPatch] = Field(default=None) filter: Optional[FilterConfigPatch] = Field(default=None) metadata: Optional[MetadataConfig] = Field(default=None) - schema: Optional[Schema] = Field(default=None) + pipeline_schema: Optional[Schema] = Field(default=None, alias="schema") sink: Optional[SinkConfigPatch] = Field(default=None) source: Optional[SourceConfigPatch] = Field(default=None) From dee863877217902c1abb7bee970c573c2ff10ea8 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 11:56:56 +0100 Subject: [PATCH 3/8] add utils tool to migrate v1 to v2 pipeline shemas --- src/glassflow/etl/utils.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/glassflow/etl/utils.py diff --git a/src/glassflow/etl/utils.py b/src/glassflow/etl/utils.py new file mode 100644 index 0000000..8b2a5d3 --- /dev/null +++ b/src/glassflow/etl/utils.py @@ -0,0 +1,38 @@ +from typing import Any + +from . import models + + +def migrate_pipeline_v1_to_v2(pipeline: dict[str, Any]) -> models.PipelineConfig: + """Migrate a pipeline from v1 to v2.""" + schema_fields = {} + for topic in pipeline["source"]["topics"]: + for field in topic["schema"]["fields"]: + schema_fields[f"{topic["name"]}_{field["name"]}"] = { + "source_id": topic["name"], + "name": field["name"], + "type": field["type"] + } + + for field in pipeline["sink"]["table_mapping"]: + schema_fields[f"{field["source_id"]}_{field["field_name"]}"].update( + { + "column_name": field["column_name"], + "column_type": field["column_type"] + } + ) + + schema = models.Schema( + fields=[ + models.SchemaField.model_validate(field) for field in schema_fields.values() + ] + ) + + return models.PipelineConfig( + pipeline_id=pipeline["pipeline_id"], + name=pipeline.get("name", None), + source=models.SourceConfig.model_validate(pipeline["source"]), + join=models.JoinConfig.model_validate(pipeline.get("join", {})), + sink=models.SinkConfig.model_validate(pipeline["sink"]), + schema=schema, + ) From 76658d39610a614981983ca05522a16a8ace7f79 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 12:27:26 +0100 Subject: [PATCH 4/8] make metadata tags optional --- src/glassflow/etl/models/metadata.py | 4 ++-- src/glassflow/etl/utils.py | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/glassflow/etl/models/metadata.py b/src/glassflow/etl/models/metadata.py index 3feb431..746d9a6 100644 --- a/src/glassflow/etl/models/metadata.py +++ b/src/glassflow/etl/models/metadata.py @@ -1,7 +1,7 @@ -from typing import List +from typing import List, Optional from pydantic import BaseModel, Field class MetadataConfig(BaseModel): - tags: List[str] = Field(default=[]) + tags: Optional[List[str]] = Field(default=None) diff --git a/src/glassflow/etl/utils.py b/src/glassflow/etl/utils.py index 8b2a5d3..93190fc 100644 --- a/src/glassflow/etl/utils.py +++ b/src/glassflow/etl/utils.py @@ -8,18 +8,15 @@ def migrate_pipeline_v1_to_v2(pipeline: dict[str, Any]) -> models.PipelineConfig schema_fields = {} for topic in pipeline["source"]["topics"]: for field in topic["schema"]["fields"]: - schema_fields[f"{topic["name"]}_{field["name"]}"] = { + schema_fields[f"{topic['name']}_{field['name']}"] = { "source_id": topic["name"], "name": field["name"], - "type": field["type"] + "type": field["type"], } for field in pipeline["sink"]["table_mapping"]: - schema_fields[f"{field["source_id"]}_{field["field_name"]}"].update( - { - "column_name": field["column_name"], - "column_type": field["column_type"] - } + schema_fields[f"{field['source_id']}_{field['field_name']}"].update( + {"column_name": field["column_name"], "column_type": field["column_type"]} ) schema = models.Schema( From 826eb4f8cf84b08978f870edbe9503b7c71b8de2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 4 Dec 2025 11:28:51 +0000 Subject: [PATCH 5/8] chore: bump version to 3.6.0 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 87ce492..40c341b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.5.2 +3.6.0 From 55706fe2f17488733b46c5c1db8e1257effb3615 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 12:30:47 +0100 Subject: [PATCH 6/8] fix format --- tests/data/pipeline_configs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index d9c2f87..57503b8 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -129,7 +129,7 @@ def get_valid_pipeline_config() -> dict: "source_id": "orders", "name": "skip_sink_field", "type": "string", - }, + } ], }, } From 5abd366606cd501da5e6699b5d3996132b275b90 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Thu, 4 Dec 2025 12:31:47 +0100 Subject: [PATCH 7/8] fix format --- tests/data/pipeline_configs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index 57503b8..d9c2f87 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -129,7 +129,7 @@ def get_valid_pipeline_config() -> dict: "source_id": "orders", "name": "skip_sink_field", "type": "string", - } + }, ], }, } From 65693bfe794b54063a31bdbc385427bdccd450d3 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 4 Dec 2025 11:32:35 +0000 Subject: [PATCH 8/8] docs: update coverage badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e39b758..a8fc942 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ - +