diff --git a/src/glassflow/etl/dlq.py b/src/glassflow/etl/dlq.py index 803e7a9..a484fe8 100644 --- a/src/glassflow/etl/dlq.py +++ b/src/glassflow/etl/dlq.py @@ -74,3 +74,26 @@ def state(self) -> Dict[str, Any]: ) from e except errors.APIError as e: raise e + + def purge(self) -> None: + """ + Purge all messages from the Dead Letter Queue. + + This operation removes all messages currently in the DLQ and cannot be undone. + + Raises: + PipelineNotFoundError: If the pipeline does not exist + ConnectionError: If there is a network error + InternalServerError: If the API request fails + """ + try: + response = self._request("POST", f"{self.endpoint}/purge") + response.raise_for_status() + except errors.NotFoundError as e: + raise errors.PipelineNotFoundError( + status_code=e.status_code, + message=f"Pipeline with id '{self.pipeline_id}' not found", + response=e.response, + ) from e + except errors.APIError as e: + raise e diff --git a/tests/test_dlq.py b/tests/test_dlq.py index 80d0e76..9ce4243 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -108,6 +108,45 @@ def test_state_server_error(self, dlq): assert "Internal server error" in str(exc_info.value) + def test_purge_success(self, dlq, mock_success): + """Test successful DLQ purge operation.""" + with mock_success() as mock_post: + dlq.purge() + + mock_post.assert_called_once_with("POST", f"{dlq.endpoint}/purge") + + def test_purge_pipeline_not_found(self, dlq): + """Test DLQ purge with pipeline not found error.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=404, + json_data={"message": "Pipeline not found"}, + ) + + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.PipelineNotFoundError) as exc_info: + dlq.purge() + + assert "test-pipeline" in str(exc_info.value) + + def test_purge_server_error(self, dlq): + """Test DLQ purge with server error.""" + mock_response = mock_responses.create_mock_response_factory()( + status_code=500, + json_data={"message": "Internal server error"}, + ) + + with patch( + "httpx.Client.request", + side_effect=mock_response.raise_for_status.side_effect, + ): + with pytest.raises(errors.ServerError) as exc_info: + dlq.purge() + + assert "Internal server error" in str(exc_info.value) + class TestPipelineDLQIntegration: """Test cases for Pipeline-DLQ integration.""" @@ -138,3 +177,9 @@ def test_pipeline_dlq_state_integration(self, pipeline, mock_success): result = pipeline.dlq.state() assert result == {"total_messages": 10} + + def test_pipeline_dlq_purge_integration(self, pipeline, mock_success): + """Test Pipeline DLQ purge functionality.""" + with mock_success(): + pipeline.dlq.purge() + # If no exception is raised, the test passes