Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/glassflow/etl/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,26 @@ def state(self) -> Dict[str, Any]:
) from e
except errors.APIError as e:
raise e

def purge(self) -> None:
"""
Purge all messages from the Dead Letter Queue.

This operation removes all messages currently in the DLQ and cannot be undone.

Raises:
PipelineNotFoundError: If the pipeline does not exist
ConnectionError: If there is a network error
InternalServerError: If the API request fails
"""
try:
response = self._request("POST", f"{self.endpoint}/purge")
response.raise_for_status()
except errors.NotFoundError as e:
raise errors.PipelineNotFoundError(
status_code=e.status_code,
message=f"Pipeline with id '{self.pipeline_id}' not found",
response=e.response,
) from e
except errors.APIError as e:
raise e
45 changes: 45 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,45 @@ def test_state_server_error(self, dlq):

assert "Internal server error" in str(exc_info.value)

def test_purge_success(self, dlq, mock_success):
"""Test successful DLQ purge operation."""
with mock_success() as mock_post:
dlq.purge()

mock_post.assert_called_once_with("POST", f"{dlq.endpoint}/purge")

def test_purge_pipeline_not_found(self, dlq):
"""Test DLQ purge with pipeline not found error."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=404,
json_data={"message": "Pipeline not found"},
)

with patch(
"httpx.Client.request",
side_effect=mock_response.raise_for_status.side_effect,
):
with pytest.raises(errors.PipelineNotFoundError) as exc_info:
dlq.purge()

assert "test-pipeline" in str(exc_info.value)

def test_purge_server_error(self, dlq):
"""Test DLQ purge with server error."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=500,
json_data={"message": "Internal server error"},
)

with patch(
"httpx.Client.request",
side_effect=mock_response.raise_for_status.side_effect,
):
with pytest.raises(errors.ServerError) as exc_info:
dlq.purge()

assert "Internal server error" in str(exc_info.value)


class TestPipelineDLQIntegration:
"""Test cases for Pipeline-DLQ integration."""
Expand Down Expand Up @@ -138,3 +177,9 @@ def test_pipeline_dlq_state_integration(self, pipeline, mock_success):
result = pipeline.dlq.state()

assert result == {"total_messages": 10}

def test_pipeline_dlq_purge_integration(self, pipeline, mock_success):
"""Test Pipeline DLQ purge functionality."""
with mock_success():
pipeline.dlq.purge()
# If no exception is raised, the test passes