Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<img src="https://github.com/glassflow/glassflow-python-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src=https://img.shields.io/badge/coverage-94%25-brightgreen>
<img src=https://img.shields.io/badge/coverage-93%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.4.0
3.5.0
6 changes: 1 addition & 5 deletions src/glassflow/etl/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
JoinConfigPatch,
JoinOrientation,
JoinSourceConfig,
JoinSourceConfigPatch,
JoinType,
)
from .pipeline import PipelineConfig, PipelineConfigPatch, PipelineStatus
Expand All @@ -24,7 +23,6 @@
SourceConfigPatch,
SourceType,
TopicConfig,
TopicConfigPatch,
)

__all__ = [
Expand All @@ -42,6 +40,7 @@
"PipelineConfigPatch",
"PipelineStatus",
"SinkConfig",
"SinkConfigPatch",
"SinkType",
"TableMapping",
"Schema",
Expand All @@ -52,10 +51,7 @@
"TopicConfig",
"GlassFlowConfig",
"SourceConfigPatch",
"TopicConfigPatch",
"KafkaConnectionParamsPatch",
"DeduplicationConfigPatch",
"JoinConfigPatch",
"JoinSourceConfigPatch",
"SinkConfigPatch",
]
21 changes: 14 additions & 7 deletions src/glassflow/etl/models/join.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Any, List, Optional

from pydantic import BaseModel, Field, ValidationInfo, field_validator

Expand Down Expand Up @@ -64,16 +64,23 @@ def validate_type(
raise ValueError("type is required when join is enabled")
return v

def update(self, patch: "JoinConfigPatch") -> "JoinConfig":
"""Apply a patch to this join config."""
update_dict: dict[str, Any] = {}

class JoinSourceConfigPatch(BaseModel):
source_id: Optional[str] = Field(default=None)
join_key: Optional[str] = Field(default=None)
time_window: Optional[str] = Field(default=None)
orientation: Optional[JoinOrientation] = Field(default=None)
if patch.enabled is not None:
update_dict["enabled"] = patch.enabled
if patch.type is not None:
update_dict["type"] = patch.type
if patch.sources is not None:
update_dict["sources"] = patch.sources

if update_dict:
return self.model_copy(update=update_dict)
return self


class JoinConfigPatch(BaseModel):
enabled: Optional[bool] = Field(default=None)
type: Optional[JoinType] = Field(default=None)
# TODO: How to patch an element in a list?
sources: Optional[List[JoinSourceConfig]] = Field(default=None)
33 changes: 33 additions & 0 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,39 @@ def validate_data_type_compatibility(cls, v: SinkConfig, info: Any) -> SinkConfi

return v

def update(self, config_patch: "PipelineConfigPatch") -> "PipelineConfig":
"""
Apply a patch configuration to this pipeline configuration.

Args:
config_patch: The patch configuration (PipelineConfigPatch or dict)

Returns:
PipelineConfig: A new PipelineConfig instance with the patch applied
"""
# Start with a deep copy of the current config
updated_config = self.model_copy(deep=True)

# Update name if provided
if config_patch.name is not None:
updated_config.name = config_patch.name

# Update source if provided
if config_patch.source is not None:
updated_config.source = updated_config.source.update(config_patch.source)

# Update join if provided
if config_patch.join is not None:
updated_config.join = (updated_config.join or JoinConfig()).update(
config_patch.join
)

# Update sink if provided
if config_patch.sink is not None:
updated_config.sink = updated_config.sink.update(config_patch.sink)

return updated_config


class PipelineConfigPatch(BaseModel):
name: Optional[str] = Field(default=None)
Expand Down
41 changes: 40 additions & 1 deletion src/glassflow/etl/models/sink.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Any, List, Optional

from pydantic import BaseModel, Field

Expand Down Expand Up @@ -33,6 +33,44 @@ class SinkConfig(BaseModel):
table: str
table_mapping: List[TableMapping]

def update(self, patch: "SinkConfigPatch") -> "SinkConfig":
"""Apply a patch to this sink config."""
update_dict: dict[str, Any] = {}

# Check each field explicitly to handle model instances properly
if patch.provider is not None:
update_dict["provider"] = patch.provider
if patch.host is not None:
update_dict["host"] = patch.host
if patch.port is not None:
update_dict["port"] = patch.port
if patch.http_port is not None:
update_dict["http_port"] = patch.http_port
if patch.database is not None:
update_dict["database"] = patch.database
if patch.username is not None:
update_dict["username"] = patch.username
if patch.password is not None:
update_dict["password"] = patch.password
if patch.secure is not None:
update_dict["secure"] = patch.secure
if patch.skip_certificate_verification is not None:
update_dict["skip_certificate_verification"] = (
patch.skip_certificate_verification
)
if patch.max_batch_size is not None:
update_dict["max_batch_size"] = patch.max_batch_size
if patch.max_delay_time is not None:
update_dict["max_delay_time"] = patch.max_delay_time
if patch.table is not None:
update_dict["table"] = patch.table
if patch.table_mapping is not None:
update_dict["table_mapping"] = patch.table_mapping

if update_dict:
return self.model_copy(update=update_dict)
return self


class SinkConfigPatch(BaseModel):
provider: Optional[str] = Field(default=None)
Expand All @@ -43,6 +81,7 @@ class SinkConfigPatch(BaseModel):
username: Optional[str] = Field(default=None)
password: Optional[str] = Field(default=None)
secure: Optional[bool] = Field(default=None)
skip_certificate_verification: Optional[bool] = Field(default=None)
max_batch_size: Optional[int] = Field(default=None)
max_delay_time: Optional[str] = Field(default=None)
table: Optional[str] = Field(default=None)
Expand Down
70 changes: 61 additions & 9 deletions src/glassflow/etl/models/source.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Any, List, Optional

from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator

Expand Down Expand Up @@ -40,6 +40,29 @@ class DeduplicationConfig(BaseModel):
id_field_type: Optional[KafkaDataType] = Field(default=None)
time_window: Optional[str] = Field(default=None)

def update(self, patch: "DeduplicationConfigPatch") -> "DeduplicationConfig":
"""Apply a patch to this deduplication config."""
update_dict: dict[str, Any] = {}

# Check each field explicitly - use model_fields_set to distinguish
# between "not provided" and "set to None"
fields_set = (
patch.model_fields_set if hasattr(patch, "model_fields_set") else set()
)

if "enabled" in fields_set or patch.enabled is not None:
update_dict["enabled"] = patch.enabled
if "id_field" in fields_set:
update_dict["id_field"] = patch.id_field
if "id_field_type" in fields_set:
update_dict["id_field_type"] = patch.id_field_type
if "time_window" in fields_set:
update_dict["time_window"] = patch.time_window

if update_dict:
return self.model_copy(update=update_dict)
return self

@model_validator(mode="before")
@classmethod
def validate_deduplication_fields(cls, values):
Expand Down Expand Up @@ -148,6 +171,13 @@ def empty_str_to_none(values):
values["mechanism"] = None
return values

def update(self, patch: "KafkaConnectionParamsPatch") -> "KafkaConnectionParams":
"""Apply a patch to this connection params config."""
current_dict = self.model_dump()
patch_dict = patch.model_dump(exclude_none=True)
merged_dict = {**current_dict, **patch_dict}
return KafkaConnectionParams.model_validate(merged_dict)


class SourceType(CaseInsensitiveStrEnum):
KAFKA = "kafka"
Expand All @@ -159,6 +189,29 @@ class SourceConfig(BaseModel):
connection_params: KafkaConnectionParams
topics: List[TopicConfig]

def update(self, patch: "SourceConfigPatch") -> "SourceConfig":
"""Apply a patch to this source config."""
update_dict: dict[str, Any] = {}

if patch.type is not None:
update_dict["type"] = patch.type
if patch.provider is not None:
update_dict["provider"] = patch.provider

# Handle connection_params patch
if patch.connection_params is not None:
update_dict["connection_params"] = self.connection_params.update(
patch.connection_params
)

# Handle topics patch - full replacement only if provided
if patch.topics is not None:
update_dict["topics"] = patch.topics

if update_dict:
return self.model_copy(update=update_dict)
return self


class DeduplicationConfigPatch(BaseModel):
enabled: Optional[bool] = Field(default=None)
Expand All @@ -167,24 +220,23 @@ class DeduplicationConfigPatch(BaseModel):
time_window: Optional[str] = Field(default=None)


class TopicConfigPatch(BaseModel):
consumer_group_initial_offset: Optional[ConsumerGroupOffset] = Field(default=None)
name: Optional[str] = Field(default=None)
event_schema: Optional[Schema] = Field(default=None)
deduplication: Optional[DeduplicationConfigPatch] = Field(default=None)


class KafkaConnectionParamsPatch(BaseModel):
brokers: Optional[List[str]] = Field(default=None)
protocol: Optional[KafkaProtocol] = Field(default=None)
mechanism: Optional[KafkaMechanism] = Field(default=None)
username: Optional[str] = Field(default=None)
password: Optional[str] = Field(default=None)
root_ca: Optional[str] = Field(default=None)
kerberos_service_name: Optional[str] = Field(default=None)
kerberos_keytab: Optional[str] = Field(default=None)
kerberos_realm: Optional[str] = Field(default=None)
kerberos_config: Optional[str] = Field(default=None)
skip_auth: Optional[bool] = Field(default=None)


class SourceConfigPatch(BaseModel):
type: Optional[SourceType] = Field(default=None)
provider: Optional[str] = Field(default=None)
connection_params: Optional[KafkaConnectionParamsPatch] = Field(default=None)
topics: Optional[List[TopicConfigPatch]] = Field(default=None)
# Full replacement only; users must provide complete TopicConfig entries
topics: Optional[List[TopicConfig]] = Field(default=None)
29 changes: 27 additions & 2 deletions src/glassflow/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def rename(self, name: str) -> Pipeline:
def update(
self, config_patch: models.PipelineConfigPatch | dict[str, Any]
) -> Pipeline:
"""Updates the pipeline with the given config.
"""Updates the pipeline with the given config patch.
Pipeline must be stopped or terminated before updating.

Args:
config_patch: Pipeline configuration patch
Expand All @@ -141,9 +142,33 @@ def update(

Raises:
PipelineNotFoundError: If pipeline is not found
PipelineInTransitionError: If pipeline is in transition
InvalidStatusTransitionError: If pipeline is not in a state that can be
updated
APIError: If the API request fails
"""
raise NotImplementedError("Updating is not implemented")
self.get() # Get latest config
if isinstance(config_patch, dict):
config_patch = models.PipelineConfigPatch.model_validate(config_patch)
else:
config_patch = config_patch
updated_config = self.config.update(config_patch)

self._request(
"POST",
f"{self.ENDPOINT}/{self.pipeline_id}/edit",
json=updated_config.model_dump(
mode="json",
by_alias=True,
exclude_none=True,
),
event_name="PipelineUpdated",
)
self.status = models.PipelineStatus.RESUMING

# Update self.config with the updated configuration
self.config = updated_config
return self

def delete(self) -> None:
"""
Expand Down
Loading
Loading