diff --git a/README.md b/README.md
index e39b758..a8fc942 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
-
+
diff --git a/VERSION b/VERSION
index 87ce492..40c341b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.5.2
+3.6.0
diff --git a/src/glassflow/etl/models/__init__.py b/src/glassflow/etl/models/__init__.py
index 91c3333..7955940 100644
--- a/src/glassflow/etl/models/__init__.py
+++ b/src/glassflow/etl/models/__init__.py
@@ -1,5 +1,6 @@
from .config import GlassFlowConfig
from .data_types import ClickhouseDataType, KafkaDataType
+from .filter import FilterConfig, FilterConfigPatch
from .join import (
JoinConfig,
JoinConfigPatch,
@@ -7,8 +8,10 @@
JoinSourceConfig,
JoinType,
)
+from .metadata import MetadataConfig
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
-from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping
+from .schema import Schema, SchemaField
+from .sink import SinkConfig, SinkConfigPatch, SinkType
from .source import (
ConsumerGroupOffset,
DeduplicationConfig,
@@ -16,9 +19,7 @@
KafkaConnectionParams,
KafkaConnectionParamsPatch,
KafkaMechanism,
- Schema,
- SchemaField,
- SchemaType,
+ KafkaProtocol,
SourceConfig,
SourceConfigPatch,
SourceType,
@@ -29,23 +30,25 @@
"ClickhouseDataType",
"ConsumerGroupOffset",
"DeduplicationConfig",
+ "FilterConfig",
+ "FilterConfigPatch",
"KafkaConnectionParams",
"KafkaDataType",
"KafkaMechanism",
+ "KafkaProtocol",
"JoinConfig",
"JoinOrientation",
"JoinSourceConfig",
"JoinType",
+ "MetadataConfig",
"PipelineConfig",
"PipelineConfigPatch",
"PipelineStatus",
"SinkConfig",
"SinkConfigPatch",
"SinkType",
- "TableMapping",
"Schema",
"SchemaField",
- "SchemaType",
"SourceConfig",
"SourceType",
"TopicConfig",
diff --git a/src/glassflow/etl/models/filter.py b/src/glassflow/etl/models/filter.py
new file mode 100644
index 0000000..70fa603
--- /dev/null
+++ b/src/glassflow/etl/models/filter.py
@@ -0,0 +1,27 @@
+from typing import Any, Optional
+
+from pydantic import BaseModel, Field
+
+
+class FilterConfig(BaseModel):
+ enabled: bool = Field(default=False)
+ expression: Optional[str] = Field(default=None, description="The filter expression")
+
+ def update(self, patch: "FilterConfigPatch") -> "FilterConfig":
+ """Apply a patch to this filter config."""
+ update_dict: dict[str, Any] = {}
+
+ # Check each field explicitly to handle model instances properly
+ if patch.enabled is not None:
+ update_dict["enabled"] = patch.enabled
+ if patch.expression is not None:
+ update_dict["expression"] = patch.expression
+
+ if update_dict:
+ return self.model_copy(update=update_dict)
+ return self
+
+
+class FilterConfigPatch(BaseModel):
+ enabled: Optional[bool] = Field(default=None)
+ expression: Optional[str] = Field(default=None)
diff --git a/src/glassflow/etl/models/metadata.py b/src/glassflow/etl/models/metadata.py
new file mode 100644
index 0000000..746d9a6
--- /dev/null
+++ b/src/glassflow/etl/models/metadata.py
@@ -0,0 +1,7 @@
+from typing import List, Optional
+
+from pydantic import BaseModel, Field
+
+
+class MetadataConfig(BaseModel):
+ tags: Optional[List[str]] = Field(default=None)
diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py
index e02368c..9f31b0a 100644
--- a/src/glassflow/etl/models/pipeline.py
+++ b/src/glassflow/etl/models/pipeline.py
@@ -1,12 +1,13 @@
import re
-from typing import Any, Optional
+from typing import Optional
from pydantic import BaseModel, Field, field_validator, model_validator
-from ..errors import InvalidDataTypeMappingError
from .base import CaseInsensitiveStrEnum
-from .data_types import kafka_to_clickhouse_data_type_mappings
+from .filter import FilterConfig, FilterConfigPatch
from .join import JoinConfig, JoinConfigPatch
+from .metadata import MetadataConfig
+from .schema import Schema
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch
@@ -24,11 +25,15 @@ class PipelineStatus(CaseInsensitiveStrEnum):
class PipelineConfig(BaseModel):
+ version: str = Field(default="2.0.0")
pipeline_id: str
name: Optional[str] = Field(default=None)
source: SourceConfig
join: Optional[JoinConfig] = Field(default=JoinConfig())
+ filter: Optional[FilterConfig] = Field(default=FilterConfig())
+ metadata: Optional[MetadataConfig] = Field(default=MetadataConfig())
sink: SinkConfig
+ pipeline_schema: Schema = Field(alias="schema")
@field_validator("pipeline_id")
@classmethod
@@ -48,147 +53,55 @@ def validate_pipeline_id(cls, v: str) -> str:
return v
@model_validator(mode="after")
- def set_pipeline_name(self) -> "PipelineConfig":
+ def validate_config(self) -> "PipelineConfig":
"""
- If name is not provided, use the pipeline_id and replace hyphens
- with spaces.
+ Set pipeline name if not provided and validate configuration.
"""
+ # Set pipeline name if not provided
if self.name is None:
self.name = self.pipeline_id.replace("-", " ").title()
- return self
- @field_validator("join")
- @classmethod
- def validate_join_config(
- cls,
- v: Optional[JoinConfig],
- info: Any,
- ) -> Optional[JoinConfig]:
- if not v or not v.enabled:
- return v
-
- # Get the source topics from the parent model's data
- source = info.data.get("source", {})
- if isinstance(source, dict):
- source_topics = source.get("topics", [])
- else:
- source_topics = source.topics
- if not source_topics:
- return v
-
- # Validate each source in the join config
- for source in v.sources:
- # Check if source_id exists in any topic
- source_exists = any(
- topic.name == source.source_id for topic in source_topics
- )
- if not source_exists:
+ # Validate schema
+ topic_names = [topic.name for topic in self.source.topics]
+ for field in self.pipeline_schema.fields:
+ if field.source_id not in topic_names:
raise ValueError(
- f"Source ID '{source.source_id}' does not exist in any topic"
+ f"Source '{field.source_id}' does not exist in any topic"
)
- # Find the topic and check if join_key exists in its schema
- topic = next((t for t in source_topics if t.name == source.source_id), None)
- if not topic:
+ # Validate deduplication ID fields
+ for topic in self.source.topics:
+ if topic.deduplication is None or not topic.deduplication.enabled:
continue
- field_exists = any(
- field.name == source.join_key for field in topic.event_schema.fields
- )
- if not field_exists:
+ if not self.pipeline_schema.is_field_in_schema(
+ topic.deduplication.id_field, topic.name
+ ):
raise ValueError(
- f"Join key '{source.join_key}' does not exist in source "
- f"'{source.source_id}' schema"
+ f"Deduplication id_field '{topic.deduplication.id_field}' "
+ f"not found in schema from source '{topic.name}'"
)
- return v
-
- @field_validator("sink")
- @classmethod
- def validate_sink_config(cls, v: SinkConfig, info: Any) -> SinkConfig:
- # Get the source topics from the parent model's data
- source = info.data.get("source", {})
- if isinstance(source, dict):
- source_topics = source.get("topics", [])
- else:
- source_topics = source.topics
- if not source_topics:
- return v
-
- # Validate each table mapping
- for mapping in v.table_mapping:
- # Check if source_id exists in any topic
- source_exists = any(
- topic.name == mapping.source_id for topic in source_topics
- )
- if not source_exists:
- raise ValueError(
- f"Source ID '{mapping.source_id}' does not exist in any topic"
- )
-
- # Find the topic and check if field_name exists in its schema
- topic = next(
- (t for t in source_topics if t.name == mapping.source_id), None
- )
- if not topic:
- continue
-
- field_exists = any(
- field.name == mapping.field_name for field in topic.event_schema.fields
- )
- if not field_exists:
- raise ValueError(
- f"Field '{mapping.field_name}' does not exist in source "
- f"'{mapping.source_id}' event schema"
- )
-
- return v
-
- @field_validator("sink")
- @classmethod
- def validate_data_type_compatibility(cls, v: SinkConfig, info: Any) -> SinkConfig:
- # Get the source topics from the parent model's data
- source = info.data.get("source", {})
- if isinstance(source, dict):
- source_topics = source.get("topics", [])
- else:
- source_topics = source.topics
- if not source_topics:
- return v
-
- # Validate each table mapping
- for mapping in v.table_mapping:
- # Find the topic
- topic = next(
- (t for t in source_topics if t.name == mapping.source_id), None
- )
- if not topic:
- continue
-
- # Find the source field
- source_field = next(
- (f for f in topic.event_schema.fields if f.name == mapping.field_name),
- None,
- )
- if not source_field:
- continue
-
- # Get the source and target data types
- source_type = source_field.type
- target_type = mapping.column_type
+ # Validate join configuration
+ if self.join and self.join.enabled:
+ # Validate each source in the join config
+ for join_source in self.join.sources:
+ if join_source.source_id not in topic_names:
+ raise ValueError(
+ f"Join source '{join_source.source_id}' does not exist in any "
+ "topic"
+ )
+
+ if not self.pipeline_schema.is_field_in_schema(
+ join_source.join_key,
+ join_source.source_id,
+ ):
+ raise ValueError(
+ f"Join key '{join_source.join_key}' does not exist in source "
+ f"'{join_source.source_id}' schema"
+ )
- # Check if the target type is compatible with the source type
- compatible_types = kafka_to_clickhouse_data_type_mappings.get(
- source_type, []
- )
- if target_type not in compatible_types:
- raise InvalidDataTypeMappingError(
- f"Data type '{target_type}' is not compatible with source type "
- f"'{source_type}' for field '{mapping.field_name}' in source "
- f"'{mapping.source_id}'"
- )
-
- return v
+ return self
def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
"""
@@ -217,15 +130,31 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
config_patch.join
)
+ # Update filter if provided
+ if config_patch.filter is not None:
+ updated_config.filter = (updated_config.filter or FilterConfig()).update(
+ config_patch.filter
+ )
+
# Update sink if provided
if config_patch.sink is not None:
updated_config.sink = updated_config.sink.update(config_patch.sink)
+ # Update schema if provided
+ if config_patch.pipeline_schema is not None:
+ updated_config.pipeline_schema = config_patch.pipeline_schema
+
+ if config_patch.metadata is not None:
+ updated_config.metadata = config_patch.metadata
+
return updated_config
class PipelineConfigPatch(BaseModel):
name: Optional[str] = Field(default=None)
- source: Optional[SourceConfigPatch] = Field(default=None)
join: Optional[JoinConfigPatch] = Field(default=None)
+ filter: Optional[FilterConfigPatch] = Field(default=None)
+ metadata: Optional[MetadataConfig] = Field(default=None)
+ pipeline_schema: Optional[Schema] = Field(default=None, alias="schema")
sink: Optional[SinkConfigPatch] = Field(default=None)
+ source: Optional[SourceConfigPatch] = Field(default=None)
diff --git a/src/glassflow/etl/models/schema.py b/src/glassflow/etl/models/schema.py
new file mode 100644
index 0000000..ad8a233
--- /dev/null
+++ b/src/glassflow/etl/models/schema.py
@@ -0,0 +1,74 @@
+from typing import List, Optional
+
+from pydantic import BaseModel, Field, model_validator
+
+from ..errors import InvalidDataTypeMappingError
+from .data_types import (
+ ClickhouseDataType,
+ KafkaDataType,
+ kafka_to_clickhouse_data_type_mappings,
+)
+
+
+class SchemaField(BaseModel):
+ column_name: Optional[str] = Field(
+ default=None,
+ description="The name of the column in the sink table. If not provided, "
+ "the name of the source field will not be mapped to the sink table.",
+ )
+ column_type: Optional[ClickhouseDataType] = Field(
+ default=None,
+ description="The data type of the column in the sink table. If not provided, "
+ "the data type of the source field will not be mapped to the sink table.",
+ )
+ name: str = Field(description="The name of the source field")
+ source_id: Optional[str] = Field(description="The name of the source topic")
+ type: KafkaDataType = Field(description="The data type of the source field")
+
+ @model_validator(mode="after")
+ def validate_schema_field(self) -> "SchemaField":
+ # Validate not mapped fields
+ if (self.column_name is None and self.column_type is not None) or (
+ self.column_name is not None and self.column_type is None
+ ):
+ raise ValueError(
+ "column_name and column_type must both be provided or both be None"
+ )
+
+ # Validate column type compatibility
+ if self.column_type is not None:
+ compatible_types = kafka_to_clickhouse_data_type_mappings.get(self.type, [])
+ if self.column_type not in compatible_types:
+ raise InvalidDataTypeMappingError(
+ f"Data type '{self.column_type}' is not compatible with source type"
+ f" '{self.type}' for field '{self.name}' in source"
+ f" '{self.source_id}'"
+ )
+ return self
+
+
+class Schema(BaseModel):
+ fields: List[SchemaField]
+
+ def is_field_in_schema(
+ self,
+ field_name: str,
+ source_id: Optional[str] = None,
+ ) -> bool:
+ """
+ Check if a field exists in the schema. If source_id is provided, check if the
+ field exists in the specified source.
+
+ Args:
+ field_name: The name of the field to check
+ source_id: The ID of the source to check the field in
+
+ Returns:
+ True if the field exists in the schema, False otherwise
+ """
+ for field in self.fields:
+ if field.name == field_name and (
+ source_id is None or field.source_id == source_id
+ ):
+ return True
+ return False
diff --git a/src/glassflow/etl/models/sink.py b/src/glassflow/etl/models/sink.py
index db7cf49..36a3be2 100644
--- a/src/glassflow/etl/models/sink.py
+++ b/src/glassflow/etl/models/sink.py
@@ -1,16 +1,8 @@
-from typing import Any, List, Optional
+from typing import Any, Optional
from pydantic import BaseModel, Field
from .base import CaseInsensitiveStrEnum
-from .data_types import ClickhouseDataType
-
-
-class TableMapping(BaseModel):
- source_id: str
- field_name: str
- column_name: str
- column_type: ClickhouseDataType
class SinkType(CaseInsensitiveStrEnum):
@@ -31,7 +23,6 @@ class SinkConfig(BaseModel):
max_batch_size: int = Field(default=1000)
max_delay_time: str = Field(default="10m")
table: str
- table_mapping: List[TableMapping]
def update(self, patch: "SinkConfigPatch") -> "SinkConfig":
"""Apply a patch to this sink config."""
@@ -64,8 +55,6 @@ def update(self, patch: "SinkConfigPatch") -> "SinkConfig":
update_dict["max_delay_time"] = patch.max_delay_time
if patch.table is not None:
update_dict["table"] = patch.table
- if patch.table_mapping is not None:
- update_dict["table_mapping"] = patch.table_mapping
if update_dict:
return self.model_copy(update=update_dict)
@@ -85,4 +74,3 @@ class SinkConfigPatch(BaseModel):
max_batch_size: Optional[int] = Field(default=None)
max_delay_time: Optional[str] = Field(default=None)
table: Optional[str] = Field(default=None)
- table_mapping: Optional[List[TableMapping]] = Field(default=None)
diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py
index d991d83..dbeeb8d 100644
--- a/src/glassflow/etl/models/source.py
+++ b/src/glassflow/etl/models/source.py
@@ -1,7 +1,6 @@
-import warnings
from typing import Any, List, Optional
-from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator
+from pydantic import BaseModel, Field, field_validator, model_validator
from .base import CaseInsensitiveStrEnum
from .data_types import KafkaDataType
@@ -22,20 +21,6 @@ class KafkaMechanism(CaseInsensitiveStrEnum):
NO_AUTH = "NO_AUTH"
-class SchemaField(BaseModel):
- name: str
- type: KafkaDataType
-
-
-class SchemaType(CaseInsensitiveStrEnum):
- JSON = "json"
-
-
-class Schema(BaseModel):
- type: SchemaType = SchemaType.JSON
- fields: List[SchemaField]
-
-
class DeduplicationConfig(BaseModel):
enabled: bool = False
id_field: Optional[str] = Field(default=None)
@@ -109,43 +94,9 @@ class ConsumerGroupOffset(CaseInsensitiveStrEnum):
class TopicConfig(BaseModel):
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST
name: str
- event_schema: Schema = Field(alias="schema")
deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig())
replicas: Optional[int] = Field(default=1)
- @field_validator("deduplication")
- @classmethod
- def validate_deduplication_id_field(
- cls, v: DeduplicationConfig, info: ValidationInfo
- ) -> DeduplicationConfig:
- """
- Validate that the deduplication ID field exists in the
- schema and has matching type.
- """
- if v is None or not v.enabled:
- return v
-
- # Skip validation if id_field is empty when deduplication is disabled
- if not v.id_field or v.id_field == "":
- return v
-
- # Get the schema from the parent model's data
- schema = info.data.get("event_schema", {})
- if isinstance(schema, dict):
- fields = schema.get("fields", [])
- else:
- fields = schema.fields
-
- # Find the field in the schema
- field = next((f for f in fields if f.name == v.id_field), None)
- if not field:
- raise ValueError(
- f"Deduplication ID field '{v.id_field}' does not exist in "
- "the event schema"
- )
-
- return v
-
@field_validator("replicas")
@classmethod
def validate_replicas(cls, v: int) -> int:
@@ -165,12 +116,6 @@ class KafkaConnectionParams(BaseModel):
kerberos_keytab: Optional[str] = Field(default=None)
kerberos_realm: Optional[str] = Field(default=None)
kerberos_config: Optional[str] = Field(default=None)
- skip_auth: bool = Field(
- default=False,
- exclude=True,
- description='skip_auth is deprecated, use mechanism "NO_AUTH" instead or \
- set skip_tls_verification to True if you want to skip TLS verification',
- )
skip_tls_verification: bool = Field(default=False)
@model_validator(mode="before")
@@ -186,23 +131,6 @@ def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams"
merged_dict = {**current_dict, **patch_dict}
return KafkaConnectionParams.model_validate(merged_dict)
- @model_validator(mode="after")
- def correct_skip_auth(self) -> "KafkaConnectionParams":
- """
- While the skip_auth field is deprecated,
- we need to make sure the mechanism is NO_AUTH if skip_auth is True
- """
- if self.skip_auth:
- if self.mechanism is not None and self.mechanism != KafkaMechanism.NO_AUTH:
- warnings.warn(
- f"Mechanism is set to {self.mechanism}, but skip_auth is True. \
- Setting mechanism to NO_AUTH.",
- category=RuntimeWarning,
- stacklevel=1,
- )
- self.mechanism = KafkaMechanism.NO_AUTH
- return self
-
class SourceType(CaseInsensitiveStrEnum):
KAFKA = "kafka"
@@ -256,7 +184,7 @@ class KafkaConnectionParamsPatch(BaseModel):
kerberos_keytab: Optional[str] = Field(default=None)
kerberos_realm: Optional[str] = Field(default=None)
kerberos_config: Optional[str] = Field(default=None)
- skip_auth: Optional[bool] = Field(default=None)
+ skip_tls_verification: Optional[bool] = Field(default=None)
class SourceConfigPatch(BaseModel):
diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py
index f4eb909..4409afd 100644
--- a/src/glassflow/etl/pipeline.py
+++ b/src/glassflow/etl/pipeline.py
@@ -357,7 +357,7 @@ def _tracking_info(self) -> dict[str, Any]:
# Extract join info
join_enabled = getattr(self.config.join, "enabled", False)
-
+ filter_enabled = getattr(self.config.filter, "enabled", False)
# Extract deduplication info
deduplication_enabled = any(
t.deduplication and t.deduplication.enabled
@@ -368,18 +368,19 @@ def _tracking_info(self) -> dict[str, Any]:
conn_params = self.config.source.connection_params
root_ca_provided = conn_params.root_ca is not None
- skip_auth = conn_params.skip_auth
+ skip_tls_verification = conn_params.skip_tls_verification
protocol = str(conn_params.protocol)
mechanism = str(conn_params.mechanism)
return {
"pipeline_id": self.config.pipeline_id,
"join_enabled": join_enabled,
+ "filter_enabled": filter_enabled,
"deduplication_enabled": deduplication_enabled,
"source_auth_method": mechanism,
"source_security_protocol": protocol,
"source_root_ca_provided": root_ca_provided,
- "source_skip_auth": skip_auth,
+ "source_skip_tls_verification": skip_tls_verification,
}
def _track_event(self, event_name: str, **kwargs: Any) -> None:
diff --git a/src/glassflow/etl/utils.py b/src/glassflow/etl/utils.py
new file mode 100644
index 0000000..93190fc
--- /dev/null
+++ b/src/glassflow/etl/utils.py
@@ -0,0 +1,35 @@
+from typing import Any
+
+from . import models
+
+
+def migrate_pipeline_v1_to_v2(pipeline: dict[str, Any]) -> models.PipelineConfig:
+ """Migrate a pipeline from v1 to v2."""
+ schema_fields = {}
+ for topic in pipeline["source"]["topics"]:
+ for field in topic["schema"]["fields"]:
+ schema_fields[f"{topic['name']}_{field['name']}"] = {
+ "source_id": topic["name"],
+ "name": field["name"],
+ "type": field["type"],
+ }
+
+ for field in pipeline["sink"]["table_mapping"]:
+ schema_fields[f"{field['source_id']}_{field['field_name']}"].update(
+ {"column_name": field["column_name"], "column_type": field["column_type"]}
+ )
+
+ schema = models.Schema(
+ fields=[
+ models.SchemaField.model_validate(field) for field in schema_fields.values()
+ ]
+ )
+
+ return models.PipelineConfig(
+ pipeline_id=pipeline["pipeline_id"],
+ name=pipeline.get("name", None),
+ source=models.SourceConfig.model_validate(pipeline["source"]),
+ join=models.JoinConfig.model_validate(pipeline.get("join", {})),
+ sink=models.SinkConfig.model_validate(pipeline["sink"]),
+ schema=schema,
+ )
diff --git a/tests/data/error_scenarios.py b/tests/data/error_scenarios.py
index 715adca..b5484e9 100644
--- a/tests/data/error_scenarios.py
+++ b/tests/data/error_scenarios.py
@@ -287,44 +287,65 @@ def get_join_with_invalid_type(valid_config):
]
-def get_sink_validation_error_scenarios():
- """Get sink validation error test scenarios."""
+def get_schema_validation_error_scenarios():
+ """Get schema validation error test scenarios."""
- def get_sink_with_source_id_not_found(valid_config):
- sink = valid_config["sink"]
- sink["table_mapping"] = [
- models.TableMapping(
- source_id="non-existent-topic",
- field_name="id",
- column_name="id",
- column_type="String",
- )
+ def get_schema_with_source_id_not_found(valid_config):
+ schema = valid_config["schema"]
+ schema["fields"] = schema["fields"] + [
+ {
+ "source_id": "non-existent-topic",
+ "name": "id",
+ "type": "string",
+ "column_name": "id",
+ "column_type": "String",
+ }
]
- return sink
+ return schema
- def get_sink_with_field_name_not_found(valid_config):
- sink = valid_config["sink"]
- sink["table_mapping"] = [
- models.TableMapping(
- source_id=valid_config["source"]["topics"][0]["name"],
- field_name="non-existent-field",
- column_name="id",
- column_type="String",
- )
+ def get_schema_with_missing_column_name(valid_config):
+ schema = valid_config["schema"]
+ schema["fields"] = schema["fields"] + [
+ {
+ "source_id": valid_config["source"]["topics"][0]["name"],
+ "name": "id",
+ "type": "string",
+ "column_type": "String",
+ }
]
- return sink
+ return schema
+
+ def get_schema_with_missing_column_type(valid_config):
+ schema = valid_config["schema"]
+ schema["fields"] = schema["fields"] + [
+ {
+ "source_id": valid_config["source"]["topics"][0]["name"],
+ "name": "id",
+ "type": "string",
+ "column_name": "id",
+ }
+ ]
+ return schema
return [
{
"name": "source_id_not_found",
- "sink": get_sink_with_source_id_not_found,
+ "schema": get_schema_with_source_id_not_found,
"expected_error": ValueError,
"error_message": "does not exist in any topic",
},
{
- "name": "field_name_not_found",
- "sink": get_sink_with_field_name_not_found,
+ "name": "missing_column_name",
+ "schema": get_schema_with_missing_column_name,
"expected_error": ValueError,
- "error_message": "does not exist in source",
+ "error_message": "column_name and column_type must both be provided or both"
+ " be None",
+ },
+ {
+ "name": "missing_column_type",
+ "schema": get_schema_with_missing_column_type,
+ "expected_error": ValueError,
+ "error_message": "column_name and column_type must both be provided or both"
+ " be None",
},
]
diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py
index d77d6a3..d9c2f87 100644
--- a/tests/data/pipeline_configs.py
+++ b/tests/data/pipeline_configs.py
@@ -26,23 +26,6 @@ def get_valid_pipeline_config() -> dict:
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
"replicas": 3,
- "schema": {
- "type": "json",
- "fields": [
- {
- "name": "session_id",
- "type": "string",
- },
- {
- "name": "user_id",
- "type": "string",
- },
- {
- "name": "timestamp",
- "type": "String",
- },
- ],
- },
"deduplication": {
"enabled": True,
"id_field": "session_id",
@@ -54,23 +37,6 @@ def get_valid_pipeline_config() -> dict:
"consumer_group_initial_offset": "earliest",
"name": "orders",
"replicas": 1,
- "schema": {
- "type": "json",
- "fields": [
- {
- "name": "user_id",
- "type": "string",
- },
- {
- "name": "order_id",
- "type": "string",
- },
- {
- "name": "timestamp",
- "type": "string",
- },
- ],
- },
"deduplication": {
"enabled": True,
"id_field": "order_id",
@@ -98,6 +64,10 @@ def get_valid_pipeline_config() -> dict:
},
],
},
+ "filter": {
+ "enabled": True,
+ "expression": "user_id = '123'",
+ },
"sink": {
"type": "clickhouse",
"provider": "aiven",
@@ -110,37 +80,56 @@ def get_valid_pipeline_config() -> dict:
"secure": True,
"max_batch_size": 1,
"table": "user_orders",
- "table_mapping": [
+ },
+ "schema": {
+ "fields": [
{
"source_id": "user_logins",
- "field_name": "session_id",
+ "name": "session_id",
+ "type": "string",
"column_name": "session_id",
- "column_type": "string",
+ "column_type": "String",
},
{
"source_id": "user_logins",
- "field_name": "user_id",
+ "name": "user_id",
+ "type": "string",
"column_name": "user_id",
- "column_type": "STRING",
+ "column_type": "String",
},
{
"source_id": "orders",
- "field_name": "order_id",
+ "name": "order_id",
+ "type": "string",
"column_name": "order_id",
- "column_type": "string",
+ "column_type": "String",
+ },
+ {
+ "source_id": "orders",
+ "name": "user_id",
+ "type": "string",
+ "column_name": "user_id",
+ "column_type": "String",
},
{
"source_id": "user_logins",
- "field_name": "timestamp",
+ "name": "timestamp",
+ "type": "string",
"column_name": "login_at",
"column_type": "DateTime",
},
{
"source_id": "orders",
- "field_name": "timestamp",
+ "name": "timestamp",
+ "type": "string",
"column_name": "order_placed_at",
"column_type": "DateTime",
},
+ {
+ "source_id": "orders",
+ "name": "skip_sink_field",
+ "type": "string",
+ },
],
},
}
@@ -168,23 +157,6 @@ def get_valid_config_without_joins() -> dict:
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
- "schema": {
- "type": "json",
- "fields": [
- {
- "name": "session_id",
- "type": "string",
- },
- {
- "name": "user_id",
- "type": "string",
- },
- {
- "name": "timestamp",
- "type": "String",
- },
- ],
- },
"deduplication": {
"enabled": True,
"id_field": "session_id",
@@ -206,22 +178,27 @@ def get_valid_config_without_joins() -> dict:
"secure": True,
"max_batch_size": 1,
"table": "user_orders",
- "table_mapping": [
+ },
+ "schema": {
+ "fields": [
{
"source_id": "user_logins",
- "field_name": "session_id",
+ "name": "session_id",
+ "type": "string",
"column_name": "session_id",
- "column_type": "string",
+ "column_type": "String",
},
{
"source_id": "user_logins",
- "field_name": "user_id",
+ "name": "user_id",
+ "type": "string",
"column_name": "user_id",
- "column_type": "STRING",
+ "column_type": "String",
},
{
"source_id": "user_logins",
- "field_name": "timestamp",
+ "name": "timestamp",
+ "type": "string",
"column_name": "login_at",
"column_type": "DateTime",
},
@@ -269,7 +246,9 @@ def get_invalid_config() -> dict:
"username": "default",
"password": "pass",
"table": "test_table",
- "table_mapping": [], # Empty table mapping should trigger validation error
+ },
+ "schema": {
+ "fields": [], # Empty schema fields should trigger validation error
},
}
diff --git a/tests/data/valid_pipeline.json b/tests/data/valid_pipeline.json
index ee8d7d7..57559a1 100644
--- a/tests/data/valid_pipeline.json
+++ b/tests/data/valid_pipeline.json
@@ -20,23 +20,6 @@
{
"consumer_group_initial_offset": "earliest",
"name": "user_logins",
- "schema": {
- "type": "json",
- "fields": [
- {
- "name": "session_id",
- "type": "string"
- },
- {
- "name": "user_id",
- "type": "string"
- },
- {
- "name": "timestamp",
- "type": "string"
- }
- ]
- },
"deduplication": {
"enabled": true,
"id_field": "session_id",
@@ -47,23 +30,6 @@
{
"consumer_group_initial_offset": "earliest",
"name": "orders",
- "schema": {
- "type": "json",
- "fields": [
- {
- "name": "user_id",
- "type": "string"
- },
- {
- "name": "order_id",
- "type": "string"
- },
- {
- "name": "timestamp",
- "type": "string"
- }
- ]
- },
"deduplication": {
"enabled": true,
"id_field": "order_id",
@@ -91,6 +57,10 @@
}
]
},
+ "filter": {
+ "enabled": true,
+ "expression": "user_id = '123'"
+ },
"sink": {
"type": "clickhouse",
"provider": "aiven",
@@ -103,37 +73,56 @@
"secure": true,
"max_batch_size": 1,
"max_delay_time": "10m",
- "table": "user_orders",
- "table_mapping": [
+ "table": "user_orders"
+ },
+ "schema": {
+ "fields": [
{
"source_id": "user_logins",
- "field_name": "session_id",
+ "name": "session_id",
+ "type": "string",
"column_name": "session_id",
"column_type": "String"
},
{
"source_id": "user_logins",
- "field_name": "user_id",
+ "name": "user_id",
+ "type": "string",
"column_name": "user_id",
"column_type": "String"
},
{
"source_id": "orders",
- "field_name": "order_id",
+ "name": "order_id",
+ "type": "string",
"column_name": "order_id",
"column_type": "String"
},
+ {
+ "source_id": "orders",
+ "name": "user_id",
+ "type": "string",
+ "column_name": "user_id",
+ "column_type": "String"
+ },
{
"source_id": "user_logins",
- "field_name": "timestamp",
+ "name": "timestamp",
+ "type": "string",
"column_name": "login_at",
"column_type": "DateTime"
},
{
"source_id": "orders",
- "field_name": "timestamp",
+ "name": "timestamp",
+ "type": "string",
"column_name": "order_placed_at",
"column_type": "DateTime"
+ },
+ {
+ "source_id": "orders",
+ "name": "skip_sink_field",
+ "type": "string"
}
]
}
diff --git a/tests/data/valid_pipeline.yaml b/tests/data/valid_pipeline.yaml
index bff8106..b19e662 100644
--- a/tests/data/valid_pipeline.yaml
+++ b/tests/data/valid_pipeline.yaml
@@ -12,6 +12,9 @@ join:
type: temporal
name: Test Pipeline
pipeline_id: test-pipeline
+filter:
+ enabled: true
+ expression: user_id = '123'
sink:
database: default
host:
@@ -23,27 +26,6 @@ sink:
provider: aiven
secure: true
table: user_orders
- table_mapping:
- - column_name: session_id
- column_type: String
- field_name: session_id
- source_id: user_logins
- - column_name: user_id
- column_type: String
- field_name: user_id
- source_id: user_logins
- - column_name: order_id
- column_type: String
- field_name: order_id
- source_id: orders
- - column_name: login_at
- column_type: DateTime
- field_name: timestamp
- source_id: user_logins
- - column_name: order_placed_at
- column_type: DateTime
- field_name: timestamp
- source_id: orders
type: clickhouse
username:
source:
@@ -66,15 +48,6 @@ source:
id_field_type: string
time_window: 12h
name: user_logins
- schema:
- fields:
- - name: session_id
- type: string
- - name: user_id
- type: string
- - name: timestamp
- type: string
- type: json
- consumer_group_initial_offset: earliest
deduplication:
enabled: true
@@ -82,13 +55,39 @@ source:
id_field_type: string
time_window: 12h
name: orders
- schema:
- fields:
- - name: user_id
- type: string
- - name: order_id
- type: string
- - name: timestamp
- type: string
- type: json
type: kafka
+schema:
+ fields:
+ - column_name: session_id
+ column_type: String
+ name: session_id
+ source_id: user_logins
+ type: string
+ - column_name: user_id
+ column_type: String
+ name: user_id
+ source_id: user_logins
+ type: string
+ - column_name: order_id
+ column_type: String
+ name: order_id
+ source_id: orders
+ type: string
+ - column_name: user_id
+ column_type: String
+ name: user_id
+ source_id: orders
+ type: string
+ - column_name: login_at
+ column_type: DateTime
+ name: timestamp
+ source_id: user_logins
+ type: string
+ - column_name: order_placed_at
+ column_type: DateTime
+ name: timestamp
+ source_id: orders
+ type: string
+ - name: skip_sink_field
+ source_id: orders
+ type: string
\ No newline at end of file
diff --git a/tests/test_models/test_config_update.py b/tests/test_models/test_config_update.py
index 951d3a5..0052ead 100644
--- a/tests/test_models/test_config_update.py
+++ b/tests/test_models/test_config_update.py
@@ -177,12 +177,6 @@ def test_update_topics(self, valid_config):
source = models.SourceConfig(**valid_config["source"])
new_topic = models.TopicConfig(
name="new-topic",
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(name="id", type=models.KafkaDataType.STRING)
- ],
- ),
)
patch = models.SourceConfigPatch(topics=[new_topic])
@@ -231,24 +225,6 @@ def test_update_credentials(self, valid_config):
assert updated.password == "new-password"
assert updated.host == sink.host
- def test_update_table_mapping(self, valid_config):
- """Test updating table mapping."""
- sink = models.SinkConfig(**valid_config["sink"])
- new_mapping = [
- models.TableMapping(
- source_id="test",
- field_name="id",
- column_name="id",
- column_type=models.ClickhouseDataType.STRING,
- )
- ]
- patch = models.SinkConfigPatch(table_mapping=new_mapping)
-
- updated = sink.update(patch)
-
- assert len(updated.table_mapping) == 1
- assert updated.table_mapping[0].source_id == "test"
-
def test_update_multiple_sink_fields(self, valid_config):
"""Test updating multiple sink fields at once."""
sink = models.SinkConfig(**valid_config["sink"])
diff --git a/tests/test_models/test_data_types.py b/tests/test_models/test_data_types.py
index a3c86d5..4d8dc56 100644
--- a/tests/test_models/test_data_types.py
+++ b/tests/test_models/test_data_types.py
@@ -8,8 +8,8 @@ class TestDataTypeCompatibility:
def test_validate_data_type_compatibility_invalid_mapping(self, valid_config):
"""Test data type compatibility validation with invalid type mappings."""
- # Modify the sink configuration to have an invalid type mapping
- valid_config["sink"]["table_mapping"][0]["column_type"] = (
+ # Modify the schema to have an invalid type mapping
+ valid_config["schema"]["fields"][0]["column_type"] = (
models.ClickhouseDataType.INT32
)
diff --git a/tests/test_models/test_join.py b/tests/test_models/test_join.py
index 3d4ea99..c6c6bbd 100644
--- a/tests/test_models/test_join.py
+++ b/tests/test_models/test_join.py
@@ -57,5 +57,6 @@ def test_join_validation_error_scenarios(self, valid_config, scenario):
source=valid_config["source"],
join=scenario["join"](valid_config),
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert scenario["error_message"] in str(exc_info.value)
diff --git a/tests/test_models/test_pipeline_config.py b/tests/test_models/test_pipeline_config.py
index 08877df..6a5d221 100644
--- a/tests/test_models/test_pipeline_config.py
+++ b/tests/test_models/test_pipeline_config.py
@@ -26,6 +26,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert config.pipeline_id == "test-pipeline-123a"
@@ -36,6 +37,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert "pipeline_id cannot be empty" in str(exc_info.value)
@@ -45,6 +47,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert (
"pipeline_id can only contain lowercase letters, numbers, and hyphens"
@@ -57,6 +60,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert "pipeline_id cannot be longer than 40 characters" in str(exc_info.value)
@@ -66,6 +70,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert "pipeline_id must start with a lowercase alphanumeric" in str(
exc_info.value
@@ -77,6 +82,7 @@ def test_pipeline_config_pipeline_id_validation(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert "pipeline_id must end with a lowercase alphanumeric" in str(
exc_info.value
@@ -90,6 +96,7 @@ def test_pipeline_config_pipeline_name_provided(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert config.pipeline_id == "test-pipeline"
assert config.name == "My Custom Pipeline Name"
@@ -101,6 +108,7 @@ def test_pipeline_config_pipeline_name_not_provided(self, valid_config):
source=valid_config["source"],
join=valid_config["join"],
sink=valid_config["sink"],
+ schema=valid_config["schema"],
)
assert config.pipeline_id == "test-pipeline"
assert config.name == "Test Pipeline"
diff --git a/tests/test_models/test_schema.py b/tests/test_models/test_schema.py
new file mode 100644
index 0000000..550b024
--- /dev/null
+++ b/tests/test_models/test_schema.py
@@ -0,0 +1,27 @@
+import pytest
+
+from glassflow.etl import models
+from tests.data import error_scenarios
+
+
+class TestSchemaConfig:
+ """Tests for SchemaConfig validation."""
+
+ @pytest.mark.parametrize(
+ "scenario",
+ error_scenarios.get_schema_validation_error_scenarios(),
+ ids=lambda s: s["name"],
+ )
+ def test_schema_validation_error_scenarios(self, valid_config, scenario):
+ """Test schema validation with various error scenarios."""
+
+ with pytest.raises(scenario["expected_error"]) as exc_info:
+ models.PipelineConfig(
+ pipeline_id="test-pipeline",
+ source=valid_config["source"],
+ sink=valid_config["sink"],
+ schema=scenario["schema"](valid_config),
+ filter=valid_config["filter"],
+ join=valid_config["join"],
+ )
+ assert scenario["error_message"] in str(exc_info.value)
diff --git a/tests/test_models/test_sink.py b/tests/test_models/test_sink.py
deleted file mode 100644
index 77d8d06..0000000
--- a/tests/test_models/test_sink.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import pytest
-
-from glassflow.etl import models
-from tests.data import error_scenarios
-
-
-class TestSinkConfig:
- """Tests for SinkConfig validation."""
-
- @pytest.mark.parametrize(
- "scenario",
- error_scenarios.get_sink_validation_error_scenarios(),
- ids=lambda s: s["name"],
- )
- def test_sink_validation_error_scenarios(self, valid_config, scenario):
- """Test sink validation with various error scenarios."""
-
- with pytest.raises(scenario["expected_error"]) as exc_info:
- models.PipelineConfig(
- pipeline_id="test-pipeline",
- source=valid_config["source"],
- sink=scenario["sink"](valid_config),
- )
- assert scenario["error_message"] in str(exc_info.value)
diff --git a/tests/test_models/test_topic.py b/tests/test_models/test_topic.py
index f91f33a..2ed1ed7 100644
--- a/tests/test_models/test_topic.py
+++ b/tests/test_models/test_topic.py
@@ -4,21 +4,13 @@
class TestTopicConfig:
- """Tests for TopicConfig deduplication validation."""
+ """Tests for TopicConfig."""
- def test_topic_config_deduplication_id_field_validation(self):
- """Test TopicConfig validation for deduplication ID field."""
- # Test with valid configuration
+ def test_topic_config_creation(self):
+ """Test TopicConfig creation."""
config = models.TopicConfig(
name="test-topic",
consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST,
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(name="id", type=models.KafkaDataType.STRING),
- models.SchemaField(name="name", type=models.KafkaDataType.STRING),
- ],
- ),
deduplication=models.DeduplicationConfig(
enabled=True,
id_field="id",
@@ -30,44 +22,13 @@ def test_topic_config_deduplication_id_field_validation(self):
assert config.deduplication.id_field == "id"
assert config.deduplication.id_field_type == models.KafkaDataType.STRING
- # Test with non-existent ID field
- with pytest.raises(ValueError) as exc_info:
- models.TopicConfig(
- name="test-topic",
- consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST,
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(
- name="name",
- type=models.KafkaDataType.STRING,
- ),
- ],
- ),
- deduplication=models.DeduplicationConfig(
- enabled=True,
- id_field="non-existent-field",
- id_field_type=models.KafkaDataType.STRING,
- time_window="1h",
- ),
- )
- assert "does not exist in the event schema" in str(exc_info.value)
-
- # Test with disabled deduplication (should not validate)
+ def test_topic_config_with_disabled_deduplication(self):
+ """Test TopicConfig with disabled deduplication."""
config = models.TopicConfig(
name="test-topic",
consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST,
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(name="name", type=models.KafkaDataType.STRING),
- ],
- ),
deduplication=models.DeduplicationConfig(
enabled=False,
- id_field="non-existent-field",
- id_field_type=models.KafkaDataType.STRING,
- time_window="1h",
),
)
assert config.deduplication.enabled is False
@@ -77,12 +38,6 @@ def test_topic_config_replicas_validation(self):
config = models.TopicConfig(
name="test-topic",
consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST,
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(name="name", type=models.KafkaDataType.STRING),
- ],
- ),
replicas=3,
)
assert config.replicas == 3
@@ -91,14 +46,114 @@ def test_topic_config_replicas_validation(self):
models.TopicConfig(
name="test-topic",
consumer_group_initial_offset=models.ConsumerGroupOffset.EARLIEST,
- schema=models.Schema(
- type=models.SchemaType.JSON,
- fields=[
- models.SchemaField(
- name="name", type=models.KafkaDataType.STRING
- ),
- ],
- ),
replicas=0,
)
assert "Replicas must be at least 1" in str(exc_info.value)
+
+
+class TestPipelineConfigDeduplicationValidation:
+ """Tests for PipelineConfig deduplication ID field validation."""
+
+ def test_deduplication_id_field_validation_valid(self):
+ """
+ Test PipelineConfig validation for deduplication ID field with valid field.
+ """
+ schema = models.Schema(
+ fields=[
+ models.SchemaField(
+ source_id="test-topic",
+ name="id",
+ type=models.KafkaDataType.STRING,
+ column_name="id",
+ column_type=models.ClickhouseDataType.STRING,
+ ),
+ ]
+ )
+ source = models.SourceConfig(
+ type=models.SourceType.KAFKA,
+ connection_params=models.KafkaConnectionParams(
+ brokers=["localhost:9092"],
+ protocol=models.KafkaProtocol.PLAINTEXT,
+ ),
+ topics=[
+ models.TopicConfig(
+ name="test-topic",
+ deduplication=models.DeduplicationConfig(
+ enabled=True,
+ id_field="id",
+ id_field_type=models.KafkaDataType.STRING,
+ time_window="1h",
+ ),
+ ),
+ ],
+ )
+ sink = models.SinkConfig(
+ type=models.SinkType.CLICKHOUSE,
+ host="localhost",
+ port="9000",
+ database="test",
+ username="default",
+ password="",
+ table="test_table",
+ )
+
+ # Should not raise an error
+ config = models.PipelineConfig(
+ pipeline_id="test-pipeline",
+ source=source,
+ sink=sink,
+ schema=schema,
+ )
+ assert config.pipeline_id == "test-pipeline"
+
+ def test_deduplication_id_field_validation_invalid(self):
+ """
+ Test PipelineConfig validation for deduplication ID field with invalid field.
+ """
+ schema = models.Schema(
+ fields=[
+ models.SchemaField(
+ source_id="test-topic",
+ name="name",
+ type=models.KafkaDataType.STRING,
+ column_name="name",
+ column_type=models.ClickhouseDataType.STRING,
+ ),
+ ]
+ )
+ source = models.SourceConfig(
+ type=models.SourceType.KAFKA,
+ connection_params=models.KafkaConnectionParams(
+ brokers=["localhost:9092"],
+ protocol=models.KafkaProtocol.PLAINTEXT,
+ ),
+ topics=[
+ models.TopicConfig(
+ name="test-topic",
+ deduplication=models.DeduplicationConfig(
+ enabled=True,
+ id_field="non-existent-field",
+ id_field_type=models.KafkaDataType.STRING,
+ time_window="1h",
+ ),
+ ),
+ ],
+ )
+ sink = models.SinkConfig(
+ type=models.SinkType.CLICKHOUSE,
+ host="localhost",
+ port="9000",
+ database="test",
+ username="default",
+ password="",
+ table="test_table",
+ )
+
+ with pytest.raises(ValueError) as exc_info:
+ models.PipelineConfig(
+ pipeline_id="test-pipeline",
+ source=source,
+ sink=sink,
+ schema=schema,
+ )
+ assert "not found in schema" in str(exc_info.value)
diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py
index 256be77..fe602a1 100644
--- a/tests/test_pipeline.py
+++ b/tests/test_pipeline.py
@@ -315,11 +315,12 @@ def test_tracking_info(
assert pipeline._tracking_info() == {
"pipeline_id": valid_config["pipeline_id"],
"join_enabled": True,
+ "filter_enabled": True,
"deduplication_enabled": True,
"source_auth_method": "SCRAM-SHA-256",
"source_security_protocol": "SASL_SSL",
"source_root_ca_provided": True,
- "source_skip_auth": False,
+ "source_skip_tls_verification": False,
}
pipeline = Pipeline(
@@ -329,11 +330,12 @@ def test_tracking_info(
assert pipeline._tracking_info() == {
"pipeline_id": valid_config_with_dedup_disabled["pipeline_id"],
"join_enabled": True,
+ "filter_enabled": True,
"deduplication_enabled": False,
"source_auth_method": "SCRAM-SHA-256",
"source_security_protocol": "SASL_SSL",
"source_root_ca_provided": True,
- "source_skip_auth": False,
+ "source_skip_tls_verification": False,
}
pipeline = Pipeline(
@@ -342,11 +344,12 @@ def test_tracking_info(
assert pipeline._tracking_info() == {
"pipeline_id": valid_config_without_joins["pipeline_id"],
"join_enabled": False,
+ "filter_enabled": False,
"deduplication_enabled": True,
"source_auth_method": "SCRAM-SHA-256",
"source_security_protocol": "SASL_SSL",
"source_root_ca_provided": True,
- "source_skip_auth": False,
+ "source_skip_tls_verification": False,
}
pipeline = Pipeline(
@@ -357,11 +360,12 @@ def test_tracking_info(
assert pipeline._tracking_info() == {
"pipeline_id": pipeline_id,
"join_enabled": False,
+ "filter_enabled": False,
"deduplication_enabled": False,
"source_auth_method": "SCRAM-SHA-256",
"source_security_protocol": "SASL_SSL",
"source_root_ca_provided": True,
- "source_skip_auth": False,
+ "source_skip_tls_verification": False,
}