Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<img src="https://github.com/glassflow/glassflow-python-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src=https://img.shields.io/badge/coverage-93%25-brightgreen>
<img src=https://img.shields.io/badge/coverage-92%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.5.2
3.6.0
15 changes: 9 additions & 6 deletions src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
from .config import GlassFlowConfig
from .data_types import ClickhouseDataType, KafkaDataType
from .filter import FilterConfig, FilterConfigPatch
from .join import (
JoinConfig,
JoinConfigPatch,
JoinOrientation,
JoinSourceConfig,
JoinType,
)
from .metadata import MetadataConfig
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
from .sink import SinkConfig, SinkConfigPatch, SinkType, TableMapping
from .schema import Schema, SchemaField
from .sink import SinkConfig, SinkConfigPatch, SinkType
from .source import (
ConsumerGroupOffset,
DeduplicationConfig,
DeduplicationConfigPatch,
KafkaConnectionParams,
KafkaConnectionParamsPatch,
KafkaMechanism,
Schema,
SchemaField,
SchemaType,
KafkaProtocol,
SourceConfig,
SourceConfigPatch,
SourceType,
Expand All @@ -29,23 +30,25 @@
"ClickhouseDataType",
"ConsumerGroupOffset",
"DeduplicationConfig",
"FilterConfig",
"FilterConfigPatch",
"KafkaConnectionParams",
"KafkaDataType",
"KafkaMechanism",
"KafkaProtocol",
"JoinConfig",
"JoinOrientation",
"JoinSourceConfig",
"JoinType",
"MetadataConfig",
"PipelineConfig",
"PipelineConfigPatch",
"PipelineStatus",
"SinkConfig",
"SinkConfigPatch",
"SinkType",
"TableMapping",
"Schema",
"SchemaField",
"SchemaType",
"SourceConfig",
"SourceType",
"TopicConfig",
Expand Down
27 changes: 27 additions & 0 deletions src/glassflow/etl/models/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Any, Optional

from pydantic import BaseModel, Field


class FilterConfig(BaseModel):
enabled: bool = Field(default=False)
expression: Optional[str] = Field(default=None, description="The filter expression")

def update(self, patch: "FilterConfigPatch") -> "FilterConfig":
"""Apply a patch to this filter config."""
update_dict: dict[str, Any] = {}

# Check each field explicitly to handle model instances properly
if patch.enabled is not None:
update_dict["enabled"] = patch.enabled
if patch.expression is not None:
update_dict["expression"] = patch.expression

if update_dict:
return self.model_copy(update=update_dict)
return self


class FilterConfigPatch(BaseModel):
enabled: Optional[bool] = Field(default=None)
expression: Optional[str] = Field(default=None)
7 changes: 7 additions & 0 deletions src/glassflow/etl/models/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import List, Optional

from pydantic import BaseModel, Field


class MetadataConfig(BaseModel):
tags: Optional[List[str]] = Field(default=None)
191 changes: 60 additions & 131 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import re
from typing import Any, Optional
from typing import Optional

from pydantic import BaseModel, Field, field_validator, model_validator

from ..errors import InvalidDataTypeMappingError
from .base import CaseInsensitiveStrEnum
from .data_types import kafka_to_clickhouse_data_type_mappings
from .filter import FilterConfig, FilterConfigPatch
from .join import JoinConfig, JoinConfigPatch
from .metadata import MetadataConfig
from .schema import Schema
from .sink import SinkConfig, SinkConfigPatch
from .source import SourceConfig, SourceConfigPatch

Expand All @@ -24,11 +25,15 @@ class PipelineStatus(CaseInsensitiveStrEnum):


class PipelineConfig(BaseModel):
version: str = Field(default="2.0.0")
pipeline_id: str
name: Optional[str] = Field(default=None)
source: SourceConfig
join: Optional[JoinConfig] = Field(default=JoinConfig())
filter: Optional[FilterConfig] = Field(default=FilterConfig())
metadata: Optional[MetadataConfig] = Field(default=MetadataConfig())
sink: SinkConfig
pipeline_schema: Schema = Field(alias="schema")

@field_validator("pipeline_id")
@classmethod
Expand All @@ -48,147 +53,55 @@ def validate_pipeline_id(cls, v: str) -> str:
return v

@model_validator(mode="after")
def set_pipeline_name(self) -> "PipelineConfig":
def validate_config(self) -> "PipelineConfig":
"""
If name is not provided, use the pipeline_id and replace hyphens
with spaces.
Set pipeline name if not provided and validate configuration.
"""
# Set pipeline name if not provided
if self.name is None:
self.name = self.pipeline_id.replace("-", " ").title()
return self

@field_validator("join")
@classmethod
def validate_join_config(
cls,
v: Optional[JoinConfig],
info: Any,
) -> Optional[JoinConfig]:
if not v or not v.enabled:
return v

# Get the source topics from the parent model's data
source = info.data.get("source", {})
if isinstance(source, dict):
source_topics = source.get("topics", [])
else:
source_topics = source.topics
if not source_topics:
return v

# Validate each source in the join config
for source in v.sources:
# Check if source_id exists in any topic
source_exists = any(
topic.name == source.source_id for topic in source_topics
)
if not source_exists:
# Validate schema
topic_names = [topic.name for topic in self.source.topics]
for field in self.pipeline_schema.fields:
if field.source_id not in topic_names:
raise ValueError(
f"Source ID '{source.source_id}' does not exist in any topic"
f"Source '{field.source_id}' does not exist in any topic"
)

# Find the topic and check if join_key exists in its schema
topic = next((t for t in source_topics if t.name == source.source_id), None)
if not topic:
# Validate deduplication ID fields
for topic in self.source.topics:
if topic.deduplication is None or not topic.deduplication.enabled:
continue

field_exists = any(
field.name == source.join_key for field in topic.event_schema.fields
)
if not field_exists:
if not self.pipeline_schema.is_field_in_schema(
topic.deduplication.id_field, topic.name
):
raise ValueError(
f"Join key '{source.join_key}' does not exist in source "
f"'{source.source_id}' schema"
f"Deduplication id_field '{topic.deduplication.id_field}' "
f"not found in schema from source '{topic.name}'"
)

return v

@field_validator("sink")
@classmethod
def validate_sink_config(cls, v: SinkConfig, info: Any) -> SinkConfig:
# Get the source topics from the parent model's data
source = info.data.get("source", {})
if isinstance(source, dict):
source_topics = source.get("topics", [])
else:
source_topics = source.topics
if not source_topics:
return v

# Validate each table mapping
for mapping in v.table_mapping:
# Check if source_id exists in any topic
source_exists = any(
topic.name == mapping.source_id for topic in source_topics
)
if not source_exists:
raise ValueError(
f"Source ID '{mapping.source_id}' does not exist in any topic"
)

# Find the topic and check if field_name exists in its schema
topic = next(
(t for t in source_topics if t.name == mapping.source_id), None
)
if not topic:
continue

field_exists = any(
field.name == mapping.field_name for field in topic.event_schema.fields
)
if not field_exists:
raise ValueError(
f"Field '{mapping.field_name}' does not exist in source "
f"'{mapping.source_id}' event schema"
)

return v

@field_validator("sink")
@classmethod
def validate_data_type_compatibility(cls, v: SinkConfig, info: Any) -> SinkConfig:
# Get the source topics from the parent model's data
source = info.data.get("source", {})
if isinstance(source, dict):
source_topics = source.get("topics", [])
else:
source_topics = source.topics
if not source_topics:
return v

# Validate each table mapping
for mapping in v.table_mapping:
# Find the topic
topic = next(
(t for t in source_topics if t.name == mapping.source_id), None
)
if not topic:
continue

# Find the source field
source_field = next(
(f for f in topic.event_schema.fields if f.name == mapping.field_name),
None,
)
if not source_field:
continue

# Get the source and target data types
source_type = source_field.type
target_type = mapping.column_type
# Validate join configuration
if self.join and self.join.enabled:
# Validate each source in the join config
for join_source in self.join.sources:
if join_source.source_id not in topic_names:
raise ValueError(
f"Join source '{join_source.source_id}' does not exist in any "
"topic"
)

if not self.pipeline_schema.is_field_in_schema(
join_source.join_key,
join_source.source_id,
):
raise ValueError(
f"Join key '{join_source.join_key}' does not exist in source "
f"'{join_source.source_id}' schema"
)

# Check if the target type is compatible with the source type
compatible_types = kafka_to_clickhouse_data_type_mappings.get(
source_type, []
)
if target_type not in compatible_types:
raise InvalidDataTypeMappingError(
f"Data type '{target_type}' is not compatible with source type "
f"'{source_type}' for field '{mapping.field_name}' in source "
f"'{mapping.source_id}'"
)

return v
return self

def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
"""
Expand Down Expand Up @@ -217,15 +130,31 @@ def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
config_patch.join
)

# Update filter if provided
if config_patch.filter is not None:
updated_config.filter = (updated_config.filter or FilterConfig()).update(
config_patch.filter
)

# Update sink if provided
if config_patch.sink is not None:
updated_config.sink = updated_config.sink.update(config_patch.sink)

# Update schema if provided
if config_patch.pipeline_schema is not None:
updated_config.pipeline_schema = config_patch.pipeline_schema

if config_patch.metadata is not None:
updated_config.metadata = config_patch.metadata

return updated_config


class PipelineConfigPatch(BaseModel):
name: Optional[str] = Field(default=None)
source: Optional[SourceConfigPatch] = Field(default=None)
join: Optional[JoinConfigPatch] = Field(default=None)
filter: Optional[FilterConfigPatch] = Field(default=None)
metadata: Optional[MetadataConfig] = Field(default=None)
pipeline_schema: Optional[Schema] = Field(default=None, alias="schema")
sink: Optional[SinkConfigPatch] = Field(default=None)
source: Optional[SourceConfigPatch] = Field(default=None)
Loading