From 41f2cfa9c245ab3c19e1289e90f28f79afd7fd0e Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 26 Sep 2025 15:40:56 +0200 Subject: [PATCH 1/4] add stop and update delete methods --- README.md | 13 +++ src/glassflow/etl/api_client.py | 44 +++++++++- src/glassflow/etl/client.py | 24 +++++- src/glassflow/etl/errors.py | 36 +++++++++ src/glassflow/etl/models/pipeline.py | 3 + src/glassflow/etl/pipeline.py | 53 +++++++++--- tests/data/error_scenarios.py | 115 ++++++++++++++++++++++++++- tests/test_client.py | 60 ++++++++++---- tests/test_pipeline.py | 22 +++-- 9 files changed, 329 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index bc02ab7..518958b 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,19 @@ pipeline.resume() print(pipeline.status) ``` +### Stop pipeline + +```python +# Stop a pipeline gracefully +client.stop_pipeline("my-pipeline-id") + +# Stop a pipeline ungracefully (terminate) +client.stop_pipeline("my-pipeline-id", terminate=True) + +# Or stop via pipeline instance +pipeline.stop() +``` + ### Delete pipeline ```python diff --git a/src/glassflow/etl/api_client.py b/src/glassflow/etl/api_client.py index e71038a..221f6ce 100644 --- a/src/glassflow/etl/api_client.py +++ b/src/glassflow/etl/api_client.py @@ -64,11 +64,51 @@ def _raise_api_error(response: httpx.Response) -> None: """Raise an APIError based on the response.""" status_code = response.status_code try: - message = response.json().get("message", None) + error_data = response.json() + message = error_data.get("message", None) + code = error_data.get("code", None) except json.JSONDecodeError: message = f"{status_code} {response.reason_phrase}" + code = None + error_data = {} + if status_code == 400: - raise errors.ValidationError(status_code, message, response=response) + # Handle specific status validation error codes + if code == "TERMINAL_STATE_VIOLATION": + raise errors.TerminalStateViolationError( + status_code, message, response=response + ) + elif code == "INVALID_STATUS_TRANSITION": + raise errors.InvalidStatusTransitionError( + status_code, + message, + response=response, + ) + elif code == "UNKNOWN_STATUS": + raise errors.UnknownStatusError(status_code, message, response=response) + elif code == "PIPELINE_ALREADY_IN_STATE": + raise errors.PipelineAlreadyInStateError( + status_code, message, response=response + ) + elif code == "PIPELINE_IN_TRANSITION": + raise errors.PipelineInTransitionError( + status_code, message, response=response + ) + elif message and message.startswith("invalid json:"): + raise errors.InvalidJsonError(status_code, message, response=response) + elif message and message == "pipeline id cannot be empty": + raise errors.EmptyPipelineIdError( + status_code, message, response=response + ) + elif message and message.startswith( + "pipeline can only be deleted if it's stopped or terminated" + ): + raise errors.PipelineDeletionStateViolationError( + status_code, message, response=response + ) + else: + # Generic 400 error for unknown codes + raise errors.ValidationError(status_code, message, response=response) elif status_code == 403: raise errors.ForbiddenError(status_code, message, response=response) elif status_code == 404: diff --git a/src/glassflow/etl/client.py b/src/glassflow/etl/client.py index e210751..88c0c34 100644 --- a/src/glassflow/etl/client.py +++ b/src/glassflow/etl/client.py @@ -107,18 +107,34 @@ def create_pipeline( return pipeline.create() - def delete_pipeline(self, pipeline_id: str, terminate: bool = True) -> None: - """Deletes the pipeline with the given ID. + def stop_pipeline(self, pipeline_id: str, terminate: bool = False) -> None: + """Stops the pipeline with the given ID. Args: - pipeline_id: The ID of the pipeline to delete + pipeline_id: The ID of the pipeline to stop terminate: Whether to terminate the pipeline (i.e. delete all the pipeline components and potentially all the events in the pipeline) + + Raises: + PipelineInTransitionError: If pipeline is in transition + PipelineNotFoundError: If pipeline is not found + APIError: If the API request fails + """ + Pipeline(host=self.host, pipeline_id=pipeline_id).stop(terminate=terminate) + + def delete_pipeline(self, pipeline_id: str) -> None: + """Deletes the pipeline with the given ID. + + Args: + pipeline_id: The ID of the pipeline to delete + Raises: + PipelineDeletionStateViolationError: If pipeline is not stopped or + terminating PipelineNotFoundError: If pipeline is not found APIError: If the API request fails """ - Pipeline(host=self.host, pipeline_id=pipeline_id).delete(terminate=terminate) + Pipeline(host=self.host, pipeline_id=pipeline_id).delete() def disable_tracking(self) -> None: """Disable tracking of pipeline events.""" diff --git a/src/glassflow/etl/errors.py b/src/glassflow/etl/errors.py index 95b1115..05102f1 100644 --- a/src/glassflow/etl/errors.py +++ b/src/glassflow/etl/errors.py @@ -60,3 +60,39 @@ class InvalidDataTypeMappingError(GlassFlowError): class InvalidBatchSizeError(GlassFlowError): """Exception raised when a batch size is invalid.""" + + +# Status validation error classes for 400 Bad Request responses +class TerminalStateViolationError(ValidationError): + """Raised when attempting to transition from a terminal state to another state.""" + + +class InvalidStatusTransitionError(ValidationError): + """Raised when attempting an invalid status transition.""" + + +class UnknownStatusError(ValidationError): + """Raised when an unknown pipeline status is encountered.""" + + +class PipelineAlreadyInStateError(ValidationError): + """Raised when pipeline is already in the requested state.""" + + +class PipelineInTransitionError(ValidationError): + """ + Raised when pipeline is currently transitioning and cannot perform the + requested operation. + """ + + +class InvalidJsonError(ValidationError): + """Raised when malformed JSON is provided in request body.""" + + +class EmptyPipelineIdError(ValidationError): + """Raised when pipeline ID parameter is empty or whitespace.""" + + +class PipelineDeletionStateViolationError(ValidationError): + """Raised when attempting to delete a pipeline that's not in a deletable state.""" diff --git a/src/glassflow/etl/models/pipeline.py b/src/glassflow/etl/models/pipeline.py index 9555bb8..82496ed 100644 --- a/src/glassflow/etl/models/pipeline.py +++ b/src/glassflow/etl/models/pipeline.py @@ -17,9 +17,12 @@ class PipelineStatus(CaseInsensitiveStrEnum): PAUSING = "Pausing" PAUSED = "Paused" RESUMING = "Resuming" + STOPPING = "Stopping" + STOPPED = "Stopped" TERMINATING = "Terminating" TERMINATED = "Terminated" FAILED = "Failed" + DELETED = "Deleted" class PipelineConfig(BaseModel): diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 773425d..93edd6f 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -145,25 +145,52 @@ def update( """ raise NotImplementedError("Updating is not implemented") - def delete(self, terminate: bool = True) -> None: - """Deletes the pipeline with the given ID. + def delete(self) -> None: + """ + Deletes the pipeline from the database. Only pipelines that are stopped or + terminating can be deleted. + + Raises: + PipelineDeletionStateViolationError: If pipeline is not stopped or + terminating + PipelineNotFoundError: If pipeline is not found + APIError: If the API request fails + """ + endpoint = f"{self.ENDPOINT}/{self.pipeline_id}" + self._request("DELETE", endpoint, event_name="PipelineDeleted") + self.status = models.PipelineStatus.DELETED + + def stop(self, terminate: bool = False) -> Pipeline: + """ + Stops the pipeline. Gracefully by default, ungracefully if terminate is True. + Ungracefully means deleting all the pipeline components without waiting for the + events in the pipeline to be processed. Args: terminate: Whether to terminate the pipeline (i.e. delete all the pipeline components and potentially all the events in the pipeline) + Returns: + Pipeline: A Pipeline instance for the stopped pipeline + Raises: + PipelineInTransitionError: If pipeline is in transition PipelineNotFoundError: If pipeline is not found + InvalidStatusTransitionError: If pipeline is not in a state that can be + stopped APIError: If the API request fails """ - if not terminate: - raise NotImplementedError("Graceful deletion is not implemented") - - if self.config is None: - self.get() - endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate" - self._request("DELETE", endpoint, event_name="PipelineDeleted") - self.status = models.PipelineStatus.TERMINATING + if terminate: + endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate" + next_status = models.PipelineStatus.TERMINATING + event_name = "PipelineTerminated" + else: + endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/stop" + next_status = models.PipelineStatus.STOPPING + event_name = "PipelineStopped" + self._request("POST", endpoint, event_name=event_name) + self.status = next_status + return self def pause(self) -> Pipeline: """Pauses the pipeline with the given ID. @@ -172,7 +199,10 @@ def pause(self) -> Pipeline: Pipeline: A Pipeline instance for the paused pipeline Raises: + PipelineInTransitionError: If pipeline is in transition PipelineNotFoundError: If pipeline is not found + InvalidStatusTransitionError: If pipeline is not in a state that can be + paused APIError: If the API request fails """ endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause" @@ -187,7 +217,10 @@ def resume(self) -> Pipeline: Pipeline: A Pipeline instance for the resumed pipeline Raises: + PipelineInTransitionError: If pipeline is in transition PipelineNotFoundError: If pipeline is not found + InvalidStatusTransitionError: If pipeline is not in a state that can be + resumed APIError: If the API request fails """ endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/resume" diff --git a/tests/data/error_scenarios.py b/tests/data/error_scenarios.py index a73e7b7..5747306 100644 --- a/tests/data/error_scenarios.py +++ b/tests/data/error_scenarios.py @@ -51,31 +51,138 @@ def get_http_error_scenarios(): { "name": "not_found", "status_code": 404, - "text": "Pipeline not found", + "json_data": {"message": "Pipeline not found"}, "expected_error": errors.PipelineNotFoundError, "error_message": "not found", }, { "name": "forbidden", "status_code": 403, - "text": "Pipeline already active", + "json_data": {"message": "Pipeline already active"}, "expected_error": errors.PipelineAlreadyExistsError, "error_message": "already exists", }, { "name": "bad_request", "status_code": 400, - "text": "Bad request", + "json_data": {"message": "Bad request"}, "expected_error": errors.ValidationError, "error_message": "Bad request", }, { "name": "server_error", "status_code": 500, - "text": "Internal server error", + "json_data": {"message": "Internal server error"}, "expected_error": errors.ServerError, "error_message": "Internal server error", }, + # Status validation error scenarios for 400 Bad Request responses + { + "name": "terminal_state_violation", + "status_code": 400, + "json_data": { + "message": ( + "Cannot transition from terminal state Terminated to Running" + ), + "code": "TERMINAL_STATE_VIOLATION", + "current_status": "Terminated", + "requested_status": "Running" + }, + "expected_error": errors.TerminalStateViolationError, + "error_message": ( + "Cannot transition from terminal state Terminated to Running" + ), + }, + { + "name": "invalid_status_transition", + "status_code": 400, + "json_data": { + "message": "Invalid status transition from Running to Paused", + "code": "INVALID_STATUS_TRANSITION", + "current_status": "Running", + "requested_status": "Paused", + "valid_transitions": ["Stopping", "Terminating"] + }, + "expected_error": errors.InvalidStatusTransitionError, + "error_message": "Invalid status transition from Running to Paused", + }, + { + "name": "unknown_status", + "status_code": 400, + "json_data": { + "message": "Unknown pipeline status: InvalidStatus", + "code": "UNKNOWN_STATUS", + "current_status": "InvalidStatus" + }, + "expected_error": errors.UnknownStatusError, + "error_message": "Unknown pipeline status: InvalidStatus", + }, + { + "name": "pipeline_already_in_state", + "status_code": 400, + "json_data": { + "message": "Pipeline is already in Running state", + "code": "PIPELINE_ALREADY_IN_STATE", + "current_status": "Running", + "requested_status": "Running" + }, + "expected_error": errors.PipelineAlreadyInStateError, + "error_message": "Pipeline is already in Running state", + }, + { + "name": "pipeline_in_transition", + "status_code": 400, + "json_data": { + "message": ( + "Pipeline is currently transitioning from Pausing state, " + "cannot perform Stopping operation" + ), + "code": "PIPELINE_IN_TRANSITION", + "current_status": "Pausing", + "requested_status": "Stopping" + }, + "expected_error": errors.PipelineInTransitionError, + "error_message": ( + "Pipeline is currently transitioning from Pausing state, " + "cannot perform Stopping operation" + ), + }, + { + "name": "invalid_json", + "status_code": 400, + "json_data": { + "message": "invalid json: unexpected end of JSON input" + }, + "expected_error": errors.InvalidJsonError, + "error_message": "invalid json: unexpected end of JSON input", + }, + { + "name": "empty_pipeline_id", + "status_code": 400, + "json_data": { + "message": "pipeline id cannot be empty" + }, + "expected_error": errors.EmptyPipelineIdError, + "error_message": "pipeline id cannot be empty", + }, + { + "name": "pipeline_deletion_state_violation", + "status_code": 400, + "json_data": { + "message": ( + "pipeline can only be deleted if it's stopped or terminated, " + "current status: Running" + ), + "field": { + "current_status": "Running" + } + }, + "expected_error": errors.PipelineDeletionStateViolationError, + "error_message": ( + "pipeline can only be deleted if it's stopped or terminated, " + "current status: Running" + ), + }, ] diff --git a/tests/test_client.py b/tests/test_client.py index fe1694c..73fde37 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -188,22 +188,18 @@ def test_client_create_pipeline_value_error(self, valid_config): with pytest.raises(ValueError): client.create_pipeline() - def test_client_delete_pipeline_success( - self, mock_success_response, mock_success_get_pipeline - ): + def test_client_delete_pipeline_success(self, mock_success_response): """Test successful pipeline deletion.""" client = Client() pipeline_id = "test-pipeline-id" - with patch("glassflow.etl.pipeline.Pipeline.get") as pipeline_get: - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_delete_request: - client.delete_pipeline(pipeline_id, terminate=True) - pipeline_get.assert_called_once_with() - mock_delete_request.assert_called_once_with( - "DELETE", f"{client.ENDPOINT}/{pipeline_id}/terminate" - ) + with patch( + "httpx.Client.request", return_value=mock_success_response + ) as mock_delete_request: + client.delete_pipeline(pipeline_id) + mock_delete_request.assert_called_once_with( + "DELETE", f"{client.ENDPOINT}/{pipeline_id}" + ) def test_client_delete_pipeline_not_found(self, mock_not_found_response): """Test pipeline deletion when pipeline is not found.""" @@ -215,6 +211,42 @@ def test_client_delete_pipeline_not_found(self, mock_not_found_response): client.delete_pipeline(pipeline_id) assert "not found" in str(exc_info.value) + def test_client_stop_pipeline_success(self, mock_success_response): + """Test successful pipeline stop.""" + client = Client() + pipeline_id = "test-pipeline-id" + + with patch( + "httpx.Client.request", return_value=mock_success_response + ) as mock_request: + client.stop_pipeline(pipeline_id) + mock_request.assert_called_once_with( + "POST", f"{client.ENDPOINT}/{pipeline_id}/stop" + ) + + def test_client_stop_pipeline_terminate_success(self, mock_success_response): + """Test successful pipeline stop with terminate=True.""" + client = Client() + pipeline_id = "test-pipeline-id" + + with patch( + "httpx.Client.request", return_value=mock_success_response + ) as mock_request: + client.stop_pipeline(pipeline_id, terminate=True) + mock_request.assert_called_once_with( + "POST", f"{client.ENDPOINT}/{pipeline_id}/terminate" + ) + + def test_client_stop_pipeline_not_found(self, mock_not_found_response): + """Test pipeline stop when pipeline is not found.""" + client = Client() + pipeline_id = "non-existent-pipeline" + + with patch("httpx.Client.request", return_value=mock_not_found_response): + with pytest.raises(errors.PipelineNotFoundError) as exc_info: + client.stop_pipeline(pipeline_id) + assert "not found" in str(exc_info.value) + def test_pipeline_to_dict(self, valid_config): """Test Pipeline to_dict method.""" config = PipelineConfig(**valid_config) @@ -229,8 +261,8 @@ def test_pipeline_delete(self, pipeline_from_id, mock_success_response): with patch( "httpx.Client.request", return_value=mock_success_response ) as mock_request: - pipeline_from_id.delete(terminate=True) + pipeline_from_id.delete() mock_request.assert_called_once_with( "DELETE", - f"{pipeline_from_id.ENDPOINT}/{pipeline_from_id.pipeline_id}/terminate", + f"{pipeline_from_id.ENDPOINT}/{pipeline_from_id.pipeline_id}", ) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index f201a77..8240580 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -61,8 +61,8 @@ def test_create_http_error_scenarios(self, pipeline, scenario): """Test pipeline creation with various HTTP error scenarios.""" mock_response = mock_responses.create_mock_response_factory()( status_code=scenario["status_code"], - json_data={"message": scenario["text"]}, - text=scenario["text"], + json_data=scenario["json_data"], + text=scenario["json_data"]["message"], ) with patch( @@ -75,7 +75,7 @@ def test_create_http_error_scenarios(self, pipeline, scenario): class TestPipelineLifecycle: - """Tests for pause, resume, delete operations.""" + """Tests for pause, resume, stop, terminate, delete operations.""" @pytest.mark.parametrize( "operation,method,endpoint,params,status", @@ -83,9 +83,17 @@ class TestPipelineLifecycle: ("get", "GET", "", {}, models.PipelineStatus.RUNNING), ("pause", "POST", "/pause", {}, models.PipelineStatus.PAUSING), ("resume", "POST", "/resume", {}, models.PipelineStatus.RESUMING), + ("delete", "DELETE", "", {}, models.PipelineStatus.DELETED), ( - "delete", - "DELETE", + "stop", + "POST", + "/stop", + {"terminate": False}, + models.PipelineStatus.STOPPING, + ), + ( + "stop", + "POST", "/terminate", {"terminate": True}, models.PipelineStatus.TERMINATING, @@ -118,14 +126,14 @@ def test_lifecycle_operations( assert result == pipeline assert pipeline.status == status - @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume"]) + @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"]) def test_lifecycle_not_found(self, pipeline, mock_not_found_response, operation): """Test lifecycle operations when pipeline is not found.""" with patch("httpx.Client.request", return_value=mock_not_found_response): with pytest.raises(errors.PipelineNotFoundError): getattr(pipeline, operation)() - @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume"]) + @pytest.mark.parametrize("operation", ["get", "delete", "pause", "resume", "stop"]) def test_lifecycle_connection_error( self, pipeline, mock_connection_error, operation ): From 30590c89b6cc70279d4a01426abe4ebfde71aa75 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 26 Sep 2025 15:41:22 +0200 Subject: [PATCH 2/4] change default value to same we use in the backend --- src/glassflow/etl/models/source.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/glassflow/etl/models/source.py b/src/glassflow/etl/models/source.py index a86a37c..78d5315 100644 --- a/src/glassflow/etl/models/source.py +++ b/src/glassflow/etl/models/source.py @@ -81,7 +81,7 @@ class ConsumerGroupOffset(CaseInsensitiveStrEnum): class TopicConfig(BaseModel): - consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.EARLIEST + consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST name: str event_schema: Schema = Field(alias="schema") deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig()) From 9b7ba266f0df42821c68f81a87c244939fc53b40 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 26 Sep 2025 13:45:23 +0000 Subject: [PATCH 3/4] chore: bump version to 3.2.0 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index fd2a018..944880f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.1.0 +3.2.0 From 7896dedba2103a5afa6813d16aadf74dc0a3cd62 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Fri, 26 Sep 2025 15:46:20 +0200 Subject: [PATCH 4/4] fix format --- tests/data/error_scenarios.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/tests/data/error_scenarios.py b/tests/data/error_scenarios.py index 5747306..715adca 100644 --- a/tests/data/error_scenarios.py +++ b/tests/data/error_scenarios.py @@ -86,7 +86,7 @@ def get_http_error_scenarios(): ), "code": "TERMINAL_STATE_VIOLATION", "current_status": "Terminated", - "requested_status": "Running" + "requested_status": "Running", }, "expected_error": errors.TerminalStateViolationError, "error_message": ( @@ -101,7 +101,7 @@ def get_http_error_scenarios(): "code": "INVALID_STATUS_TRANSITION", "current_status": "Running", "requested_status": "Paused", - "valid_transitions": ["Stopping", "Terminating"] + "valid_transitions": ["Stopping", "Terminating"], }, "expected_error": errors.InvalidStatusTransitionError, "error_message": "Invalid status transition from Running to Paused", @@ -112,7 +112,7 @@ def get_http_error_scenarios(): "json_data": { "message": "Unknown pipeline status: InvalidStatus", "code": "UNKNOWN_STATUS", - "current_status": "InvalidStatus" + "current_status": "InvalidStatus", }, "expected_error": errors.UnknownStatusError, "error_message": "Unknown pipeline status: InvalidStatus", @@ -124,7 +124,7 @@ def get_http_error_scenarios(): "message": "Pipeline is already in Running state", "code": "PIPELINE_ALREADY_IN_STATE", "current_status": "Running", - "requested_status": "Running" + "requested_status": "Running", }, "expected_error": errors.PipelineAlreadyInStateError, "error_message": "Pipeline is already in Running state", @@ -139,7 +139,7 @@ def get_http_error_scenarios(): ), "code": "PIPELINE_IN_TRANSITION", "current_status": "Pausing", - "requested_status": "Stopping" + "requested_status": "Stopping", }, "expected_error": errors.PipelineInTransitionError, "error_message": ( @@ -150,18 +150,14 @@ def get_http_error_scenarios(): { "name": "invalid_json", "status_code": 400, - "json_data": { - "message": "invalid json: unexpected end of JSON input" - }, + "json_data": {"message": "invalid json: unexpected end of JSON input"}, "expected_error": errors.InvalidJsonError, "error_message": "invalid json: unexpected end of JSON input", }, { "name": "empty_pipeline_id", "status_code": 400, - "json_data": { - "message": "pipeline id cannot be empty" - }, + "json_data": {"message": "pipeline id cannot be empty"}, "expected_error": errors.EmptyPipelineIdError, "error_message": "pipeline id cannot be empty", }, @@ -173,9 +169,7 @@ def get_http_error_scenarios(): "pipeline can only be deleted if it's stopped or terminated, " "current status: Running" ), - "field": { - "current_status": "Running" - } + "field": {"current_status": "Running"}, }, "expected_error": errors.PipelineDeletionStateViolationError, "error_message": (