diff --git a/README.md b/README.md index 518958b..517ae4f 100644 --- a/README.md +++ b/README.md @@ -170,35 +170,42 @@ for pipeline in pipelines: print(f"State: {pipeline['state']}") ``` -### Pause / Resume Pipeline +### Stop / Terminate / Resume Pipeline ```python pipeline = client.get_pipeline("my-pipeline-id") -pipeline.pause() +pipeline.stop() print(pipeline.status) ``` +``` +STOPPING +``` + ```python -pipeline = client.get_pipeline("my-pipeline-id") -pipeline.resume() +# Stop a pipeline ungracefully (terminate) +client.stop_pipeline("my-pipeline-id", terminate=True) print(pipeline.status) ``` -### Stop pipeline +``` +TERMINATING +``` ```python -# Stop a pipeline gracefully -client.stop_pipeline("my-pipeline-id") - -# Stop a pipeline ungracefully (terminate) -client.stop_pipeline("my-pipeline-id", terminate=True) +pipeline = client.get_pipeline("my-pipeline-id") +pipeline.resume() +print(pipeline.status) +``` -# Or stop via pipeline instance -pipeline.stop() +``` +RESUMING ``` ### Delete pipeline +Only stopped or terminated pipelines can be deleted. + ```python # Delete a pipeline client.delete_pipeline("my-pipeline-id") diff --git a/VERSION b/VERSION index e4604e3..1809198 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.1 +3.4.0 diff --git a/src/glassflow/etl/dlq.py b/src/glassflow/etl/dlq.py index 803e7a9..a484fe8 100644 --- a/src/glassflow/etl/dlq.py +++ b/src/glassflow/etl/dlq.py @@ -74,3 +74,26 @@ def state(self) -> Dict[str, Any]: ) from e except errors.APIError as e: raise e + + def purge(self) -> None: + """ + Purge all messages from the Dead Letter Queue. + + This operation removes all messages currently in the DLQ and cannot be undone. + + Raises: + PipelineNotFoundError: If the pipeline does not exist + ConnectionError: If there is a network error + InternalServerError: If the API request fails + """ + try: + response = self._request("POST", f"{self.endpoint}/purge") + response.raise_for_status() + except errors.NotFoundError as e: + raise errors.PipelineNotFoundError( + status_code=e.status_code, + message=f"Pipeline with id '{self.pipeline_id}' not found", + response=e.response, + ) from e + except errors.APIError as e: + raise e diff --git a/src/glassflow/etl/models/data_types.py b/src/glassflow/etl/models/data_types.py index a915f95..981c049 100644 --- a/src/glassflow/etl/models/data_types.py +++ b/src/glassflow/etl/models/data_types.py @@ -58,6 +58,26 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): ARRAY_INT16 = "Array(Int16)" ARRAY_INT32 = "Array(Int32)" ARRAY_INT64 = "Array(Int64)" + ARRAY_FLOAT32 = "Array(Float32)" + ARRAY_FLOAT64 = "Array(Float64)" + ARRAY_BOOL = "Array(Bool)" + ARRAY_UINT8 = "Array(UInt8)" + ARRAY_UINT16 = "Array(UInt16)" + ARRAY_UINT32 = "Array(UInt32)" + ARRAY_UINT64 = "Array(UInt64)" + ARRAY_LC_STRING = "Array(LowCardinality(String))" + ARRAY_LC_INT8 = "Array(LowCardinality(Int8))" + ARRAY_LC_INT16 = "Array(LowCardinality(Int16))" + ARRAY_LC_INT32 = "Array(LowCardinality(Int32))" + ARRAY_LC_INT64 = "Array(LowCardinality(Int64))" + ARRAY_LC_UINT8 = "Array(LowCardinality(UInt8))" + ARRAY_LC_UINT16 = "Array(LowCardinality(UInt16))" + ARRAY_LC_UINT32 = "Array(LowCardinality(UInt32))" + ARRAY_LC_UINT64 = "Array(LowCardinality(UInt64))" + ARRAY_LC_FLOAT32 = "Array(LowCardinality(Float32))" + ARRAY_LC_FLOAT64 = "Array(LowCardinality(Float64))" + ARRAY_LC_DATETIME = "Array(LowCardinality(DateTime))" + ARRAY_LC_FIXEDSTRING = "Array(LowCardinality(FixedString))" kafka_to_clickhouse_data_type_mappings = { @@ -100,7 +120,7 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): KafkaDataType.UINT32: [ClickhouseDataType.UINT32], KafkaDataType.UINT64: [ClickhouseDataType.UINT64], KafkaDataType.UINT8: [ClickhouseDataType.UINT8], - KafkaDataType.FLOAT: [ClickhouseDataType.FLOAT64], + KafkaDataType.FLOAT: [ClickhouseDataType.FLOAT32, ClickhouseDataType.FLOAT64], KafkaDataType.FLOAT32: [ClickhouseDataType.FLOAT32, ClickhouseDataType.LC_FLOAT32], KafkaDataType.FLOAT64: [ ClickhouseDataType.FLOAT64, @@ -118,5 +138,25 @@ class ClickhouseDataType(CaseInsensitiveStrEnum): ClickhouseDataType.ARRAY_INT16, ClickhouseDataType.ARRAY_INT32, ClickhouseDataType.ARRAY_INT64, + ClickhouseDataType.ARRAY_FLOAT32, + ClickhouseDataType.ARRAY_FLOAT64, + ClickhouseDataType.ARRAY_BOOL, + ClickhouseDataType.ARRAY_UINT8, + ClickhouseDataType.ARRAY_UINT16, + ClickhouseDataType.ARRAY_UINT32, + ClickhouseDataType.ARRAY_UINT64, + ClickhouseDataType.ARRAY_LC_STRING, + ClickhouseDataType.ARRAY_LC_INT8, + ClickhouseDataType.ARRAY_LC_INT16, + ClickhouseDataType.ARRAY_LC_INT32, + ClickhouseDataType.ARRAY_LC_INT64, + ClickhouseDataType.ARRAY_LC_UINT8, + ClickhouseDataType.ARRAY_LC_UINT16, + ClickhouseDataType.ARRAY_LC_UINT32, + ClickhouseDataType.ARRAY_LC_UINT64, + ClickhouseDataType.ARRAY_LC_FLOAT32, + ClickhouseDataType.ARRAY_LC_FLOAT64, + ClickhouseDataType.ARRAY_LC_DATETIME, + ClickhouseDataType.ARRAY_LC_FIXEDSTRING, ], } diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index 82496ed..2f188ee 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -14,8 +14,6 @@ class PipelineStatus(CaseInsensitiveStrEnum): CREATED = "Created" RUNNING = "Running" - PAUSING = "Pausing" - PAUSED = "Paused" RESUMING = "Resuming" STOPPING = "Stopping" STOPPED = "Stopped" diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index 78d5315..dec2ef9 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -17,6 +17,7 @@ class KafkaMechanism(CaseInsensitiveStrEnum): SCRAM_SHA_256 = "SCRAM-SHA-256" SCRAM_SHA_512 = "SCRAM-SHA-512" PLAIN = "PLAIN" + GSSAPI = "GSSAPI" class SchemaField(BaseModel): @@ -135,6 +136,10 @@ class KafkaConnectionParams(BaseModel): username: Optional[str] = Field(default=None) password: Optional[str] = Field(default=None) root_ca: Optional[str] = Field(default=None) + kerberos_service_name: Optional[str] = Field(default=None) + kerberos_keytab: Optional[str] = Field(default=None) + kerberos_realm: Optional[str] = Field(default=None) + kerberos_config: Optional[str] = Field(default=None) skip_auth: bool = Field(default=False) @model_validator(mode="before") diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 8641999..e4c786c 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -148,11 +148,11 @@ def update( def delete(self) -> None: """ Deletes the pipeline from the database. Only pipelines that are stopped or - terminating can be deleted. + terminated can be deleted. Raises: PipelineDeletionStateViolationError: If pipeline is not stopped or - terminating + terminated PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ @@ -162,9 +162,8 @@ def delete(self) -> None: def stop(self, terminate: bool = False) -> Pipeline: """ - Stops the pipeline. Gracefully by default, ungracefully if terminate is True. - Ungracefully means deleting all the pipeline components without waiting for the - events in the pipeline to be processed. + Stops the pipeline, waiting for all the events in the pipeline to be processed. + If terminate is True, the pipeline will be terminated instead. Args: terminate: Whether to terminate the pipeline (i.e. delete all the pipeline @@ -192,26 +191,10 @@ def stop(self, terminate: bool = False) -> Pipeline: self.status = next_status return self - def pause(self) -> Pipeline: - """Pauses the pipeline with the given ID. - - Returns: - Pipeline: A Pipeline instance for the paused pipeline - - Raises: - PipelineInTransitionError: If pipeline is in transition - PipelineNotFoundError: If pipeline is not found - InvalidStatusTransitionError: If pipeline is not in a state that can be - paused - APIError: If the API request fails - """ - endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause" - self._request("POST", endpoint, event_name="PipelinePaused") - self.status = models.PipelineStatus.PAUSING - return self - def resume(self) -> Pipeline: - """Resumes the pipeline with the given ID. + """ + Resumes the pipeline with the given ID. + Only stopped or terminated pipelines can be resumed. Returns: Pipeline: A Pipeline instance for the resumed pipeline diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 80d0e76..9ce4243 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -108,6 +108,45 @@ def test_state_server_error(self, dlq): assert "Internal server error" in str(exc_info.value) + def test_purge_success(self, dlq, mock_success): + """Test successful DLQ purge operation.""" + with mock_success() as mock_post: + dlq.purge() + + mock_post.assert_called_once_with("POST", f"{dlq.endpoint}/purge") + + def test_purge_pipeline_not_found(self, dlq): + """Test DLQ purge with pipeline not found error.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=404, + json_data={"message": "Pipeline not found"}, + ) + + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.PipelineNotFoundError) as exc_info: + dlq.purge() + + assert "test-pipeline" in str(exc_info.value) + + def test_purge_server_error(self, dlq): + """Test DLQ purge with server error.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=500, + json_data={"message": "Internal server error"}, + ) + + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.ServerError) as exc_info: + dlq.purge() + + assert "Internal server error" in str(exc_info.value) + class TestPipelineDLQIntegration: """Test cases for Pipeline-DLQ integration.""" @@ -138,3 +177,9 @@ def test_pipeline_dlq_state_integration(self, pipeline, mock_success): result = pipeline.dlq.state() assert result == {"total_messages": 10} + + def test_pipeline_dlq_purge_integration(self, pipeline, mock_success): + """Test Pipeline DLQ purge functionality.""" + with mock_success(): + pipeline.dlq.purge() + # If no exception is raised, the test passes diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 5d1d7bb..ab7d135 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -21,7 +21,9 @@ def test_create_success(self, pipeline, mock_success): mock_request.assert_called_once_with( "POST", pipeline.ENDPOINT, - json=pipeline.config.model_dump(mode="json", by_alias=True), + json=pipeline.config.model_dump( + mode="json", by_alias=True, exclude_none=True + ), ) assert result == pipeline assert pipeline.status == models.PipelineStatus.CREATED @@ -73,13 +75,12 @@ def test_create_http_error_scenarios(self, pipeline, scenario): class TestPipelineLifecycle: - """Tests for pause, resume, stop, terminate, delete operations.""" + """Tests for resume, stop, terminate, delete operations.""" @pytest.mark.parametrize( "operation,method,endpoint,params,status", [ ("get", "GET", "", {}, models.PipelineStatus.RUNNING), - ("pause", "POST", "/pause", {}, models.PipelineStatus.PAUSING), ("resume", "POST", "/resume", {}, models.PipelineStatus.RESUMING), ("delete", "DELETE", "", {}, models.PipelineStatus.DELETED), ( @@ -137,14 +138,14 @@ def test_lifecycle_operations( assert result == pipeline assert pipeline.status == status - @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"]) + @pytest.mark.parametrize("operation", ["get", "delete", "resume", "stop"]) def test_lifecycle_not_found(self, pipeline, mock_not_found_response, operation): """Test lifecycle operations when pipeline is not found.""" with patch("httpx.Client.request", return_value=mock_not_found_response): with pytest.raises(errors.PipelineNotFoundError): getattr(pipeline, operation)() - @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"]) + @pytest.mark.parametrize("operation", ["get", "delete", "resume", "stop"]) def test_lifecycle_connection_error( self, pipeline, mock_connection_error, operation ): @@ -326,7 +327,7 @@ def test_from_json(self, pipeline): def test_to_dict(self, pipeline): """Test pipeline to dictionary.""" assert pipeline.to_dict() == pipeline.config.model_dump( - mode="json", by_alias=True + mode="json", by_alias=True, exclude_none=True ) pipeline = Pipeline(host="http://localhost:8080", pipeline_id="test-pipeline")