From 416653b26323ee0a1299fb38244ca709d84252c9 Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 6 Oct 2025 16:09:34 +0200 Subject: [PATCH 1/3] remove status from get pipeline response --- src/glassflow/etl/pipeline.py | 2 +- tests/conftest.py | 61 ++++++++++++++++++++++++++++----- tests/data/pipeline_configs.py | 10 ++++++ tests/test_client.py | 62 +++++++++++++--------------------- tests/test_dlq.py | 52 ++++++++++------------------ tests/test_pipeline.py | 55 +++++++++++++++++------------- 6 files changed, 136 insertions(+), 106 deletions(-) diff --git a/src/glassflow/etl/pipeline.py b/src/glassflow/etl/pipeline.py index 93edd6f..8641999 100644 --- a/src/glassflow/etl/pipeline.py +++ b/src/glassflow/etl/pipeline.py @@ -68,7 +68,7 @@ def get(self) -> Pipeline: "GET", f"{self.ENDPOINT}/{self.pipeline_id}", event_name="PipelineGet" ) self.config = models.PipelineConfig.model_validate(response.json()) - self.status = models.PipelineStatus(response.json()["status"]) + self.health() self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host) return self diff --git a/tests/conftest.py b/tests/conftest.py index 322338f..9f44c0b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,6 +29,25 @@ def get_pipeline_response(valid_config) -> dict: return config +@pytest.fixture +def get_health_payload(): + """Factory to create a health endpoint payload for a pipeline id.""" + def factory( + pipeline_id: str, + name: str = "Test Pipeline", + status: str = "Running", + ) -> dict: + return { + "pipeline_id": pipeline_id, + "pipeline_name": name, + "overall_status": status, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + } + + return factory + + @pytest.fixture def valid_config_without_joins() -> dict: """Fixture for a valid pipeline configuration without joins.""" @@ -96,18 +115,44 @@ def mock_connection_error(): @pytest.fixture -def mock_success_get_pipeline(get_pipeline_response): - """Fixture for a successful GET pipeline response.""" - return mock_responses.create_mock_response_factory()( - status_code=200, - json_data=get_pipeline_response, - ) +def mock_success(): + """Factory-context fixture that patches httpx and returns 200 with JSON. + + - Accepts either a single dict payload or a list of dict payloads via the + optional argument to the returned context manager. If a list is provided, + they are returned sequentially from response.json() to simulate multiple + HTTP calls within the same test flow. + - If no payload is provided, it defaults to {"message": "Success"}. + + Usage: + with mock_success(payload_or_list) as mock_request: + # invoke code under test + assert mock_request.call_args_list == [...] + """ + from contextlib import contextmanager + + @contextmanager + def factory(json_payloads=None): + if json_payloads is None: + json_payloads = [{"message": "Success"}] + payload_list = ( + list(json_payloads) if isinstance(json_payloads, list) else [json_payloads] + ) + response = mock_responses.create_mock_response_factory()( + status_code=200, + json_data=payload_list[0] if payload_list else {}, + ) + with patch("httpx.Client.request", return_value=response) as mock: + if payload_list: + response.json.side_effect = payload_list + yield mock + return factory @pytest.fixture -def pipeline_from_id(mock_success_get_pipeline): +def pipeline_from_id(mock_success, get_pipeline_response, get_health_payload): """Fixture for a successful GET request.""" - with patch("httpx.Client.request", return_value=mock_success_get_pipeline): + with mock_success([get_pipeline_response, get_health_payload("test-pipeline-id")]): return Pipeline(pipeline_id="test-pipeline-id").get() diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index bcda32e..4de80e0 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -272,3 +272,13 @@ def get_invalid_config() -> dict: "table_mapping": [], # Empty table mapping should trigger validation error }, } + +def get_health_payload(pipeline_id: str) -> dict: + """Get a health payload for a pipeline.""" + return { + "pipeline_id": pipeline_id, + "pipeline_name": "Test Pipeline", + "overall_status": "Running", + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + } diff --git a/tests/test_client.py b/tests/test_client.py index 73fde37..4bc71e7 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,4 +1,4 @@ -from unittest.mock import patch +from unittest.mock import call, patch import pytest @@ -19,21 +19,21 @@ def test_client_init(self): assert client.http_client.base_url == "https://example.com" def test_client_get_pipeline_success( - self, get_pipeline_response, mock_success_response + self, mock_success, get_pipeline_response, get_health_payload ): """Test successful pipeline retrieval by ID.""" client = Client() pipeline_id = "test-pipeline-id" - mock_success_response.json.return_value = get_pipeline_response - - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success([ + get_pipeline_response, + get_health_payload(pipeline_id), + ]) as mock_request: pipeline = client.get_pipeline(pipeline_id) - mock_request.assert_called_once_with( - "GET", f"{client.ENDPOINT}/{pipeline_id}" - ) + assert mock_request.call_args_list == [ + call("GET", f"{client.ENDPOINT}/{pipeline_id}"), + call("GET", f"{client.ENDPOINT}/{pipeline_id}/health"), + ] assert isinstance(pipeline, Pipeline) assert pipeline.pipeline_id == pipeline_id @@ -126,13 +126,11 @@ def test_client_list_pipelines_empty(self): mock_request.assert_called_once_with("GET", client.ENDPOINT) assert pipelines == [] - def test_client_create_pipeline_success(self, valid_config, mock_success_response): + def test_client_create_pipeline_success(self, valid_config, mock_success): """Test successful pipeline creation.""" client = Client() - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: pipeline = client.create_pipeline(valid_config) mock_request.assert_called_once_with( "POST", client.ENDPOINT, json=mock_request.call_args[1]["json"] @@ -150,12 +148,10 @@ def test_client_create_pipeline_already_exists( with pytest.raises(errors.PipelineAlreadyExistsError): client.create_pipeline(valid_config) - def test_client_create_pipeline_from_yaml_success(self, mock_success_response): + def test_client_create_pipeline_from_yaml_success(self, mock_success): """Test pipeline creation from YAML file.""" client = Client() - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: client.create_pipeline( pipeline_config_yaml_path="tests/data/valid_pipeline.yaml" ) @@ -163,12 +159,10 @@ def test_client_create_pipeline_from_yaml_success(self, mock_success_response): "POST", client.ENDPOINT, json=mock_request.call_args[1]["json"] ) - def test_client_create_pipeline_from_json_success(self, mock_success_response): + def test_client_create_pipeline_from_json_success(self, mock_success): """Test pipeline creation from JSON file.""" client = Client() - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: client.create_pipeline( pipeline_config_json_path="tests/data/valid_pipeline.json" ) @@ -188,14 +182,12 @@ def test_client_create_pipeline_value_error(self, valid_config): with pytest.raises(ValueError): client.create_pipeline() - def test_client_delete_pipeline_success(self, mock_success_response): + def test_client_delete_pipeline_success(self, mock_success): """Test successful pipeline deletion.""" client = Client() pipeline_id = "test-pipeline-id" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_delete_request: + with mock_success() as mock_delete_request: client.delete_pipeline(pipeline_id) mock_delete_request.assert_called_once_with( "DELETE", f"{client.ENDPOINT}/{pipeline_id}" @@ -211,27 +203,23 @@ def test_client_delete_pipeline_not_found(self, mock_not_found_response): client.delete_pipeline(pipeline_id) assert "not found" in str(exc_info.value) - def test_client_stop_pipeline_success(self, mock_success_response): + def test_client_stop_pipeline_success(self, mock_success): """Test successful pipeline stop.""" client = Client() pipeline_id = "test-pipeline-id" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: client.stop_pipeline(pipeline_id) mock_request.assert_called_once_with( "POST", f"{client.ENDPOINT}/{pipeline_id}/stop" ) - def test_client_stop_pipeline_terminate_success(self, mock_success_response): + def test_client_stop_pipeline_terminate_success(self, mock_success): """Test successful pipeline stop with terminate=True.""" client = Client() pipeline_id = "test-pipeline-id" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: client.stop_pipeline(pipeline_id, terminate=True) mock_request.assert_called_once_with( "POST", f"{client.ENDPOINT}/{pipeline_id}/terminate" @@ -256,11 +244,9 @@ def test_pipeline_to_dict(self, valid_config): assert isinstance(pipeline_dict, dict) assert pipeline_dict["pipeline_id"] == valid_config["pipeline_id"] - def test_pipeline_delete(self, pipeline_from_id, mock_success_response): + def test_pipeline_delete(self, pipeline_from_id, mock_success): """Test Pipeline delete with explicit pipeline_id.""" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: pipeline_from_id.delete() mock_request.assert_called_once_with( "DELETE", diff --git a/tests/test_dlq.py b/tests/test_dlq.py index d82a5f3..80d0e76 100644 --- a/tests/test_dlq.py +++ b/tests/test_dlq.py @@ -16,17 +16,13 @@ def test_dlq_initialization(self, dlq): assert dlq.http_client.base_url == "http://localhost:8080" assert dlq.endpoint == "/api/v1/pipeline/test-pipeline/dlq" - def test_consume_success(self, dlq): + def test_consume_success(self, dlq, mock_success): """Test successful DLQ consume operation.""" - mock_response = mock_responses.create_mock_response_factory()( - status_code=200, - json_data=[ - {"id": "msg1", "content": "test message 1"}, - {"id": "msg2", "content": "test message 2"}, - ], - ) - - with patch("httpx.Client.request", return_value=mock_response) as mock_get: + payload = [ + {"id": "msg1", "content": "test message 1"}, + {"id": "msg2", "content": "test message 2"}, + ] + with mock_success(json_payloads=[payload]) as mock_get: result = dlq.consume(batch_size=50) mock_get.assert_called_once_with( @@ -79,18 +75,14 @@ def test_consume_http_error_scenarios(self, dlq, scenario): assert scenario["error_message"] in str(exc_info.value) - def test_state_success(self, dlq): + def test_state_success(self, dlq, mock_success): """Test successful DLQ state operation.""" - mock_response = mock_responses.create_mock_response_factory()( - status_code=200, - json_data={ - "total_messages": 42, - "pending_messages": 5, - "last_updated": "2023-01-01T00:00:00Z", - }, - ) - - with patch("httpx.Client.request", return_value=mock_response) as mock_get: + state_payload = { + "total_messages": 42, + "pending_messages": 5, + "last_updated": "2023-01-01T00:00:00Z", + } + with mock_success(state_payload) as mock_get: result = dlq.state() mock_get.assert_called_once_with("GET", f"{dlq.endpoint}/state") @@ -133,26 +125,16 @@ def test_pipeline_dlq_property_same_url(self): assert pipeline.http_client.base_url == custom_url assert pipeline.dlq.http_client.base_url == custom_url - def test_pipeline_dlq_consume_integration(self, pipeline): + def test_pipeline_dlq_consume_integration(self, pipeline, mock_success): """Test Pipeline DLQ consume functionality.""" - mock_response = mock_responses.create_mock_response_factory()( - status_code=200, - json_data=[{"id": "msg1", "content": "test"}], - ) - - with patch("httpx.Client.request", return_value=mock_response): + with mock_success(json_payloads=[[{"id": "msg1", "content": "test"}]]): result = pipeline.dlq.consume(batch_size=10) assert result == [{"id": "msg1", "content": "test"}] - def test_pipeline_dlq_state_integration(self, pipeline): + def test_pipeline_dlq_state_integration(self, pipeline, mock_success): """Test Pipeline DLQ state functionality.""" - mock_response = mock_responses.create_mock_response_factory()( - status_code=200, - json_data={"total_messages": 10}, - ) - - with patch("httpx.Client.request", return_value=mock_response): + with mock_success({"total_messages": 10}): result = pipeline.dlq.state() assert result == {"total_messages": 10} diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 8240580..185c846 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,6 +1,6 @@ import os import tempfile -from unittest.mock import patch +from unittest.mock import call, patch import pytest from pydantic import ValidationError @@ -14,11 +14,9 @@ class TestPipelineCreation: """Tests for pipeline creation operations.""" - def test_create_success(self, pipeline, mock_success_response): + def test_create_success(self, pipeline, mock_success): """Test successful pipeline creation.""" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: result = pipeline.create() mock_request.assert_called_once_with( "POST", @@ -103,23 +101,38 @@ class TestPipelineLifecycle: def test_lifecycle_operations( self, pipeline, - mock_success_response, + mock_success, + get_pipeline_response, + get_health_payload, operation, method, endpoint, params, - get_pipeline_response, status, ): """Test common pipeline lifecycle operations.""" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: - if method == "GET": - mock_request.return_value.json.return_value = get_pipeline_response + if operation == "get": + mocked = mock_success( + [get_pipeline_response, get_health_payload(pipeline.pipeline_id)] + ) + else: + mocked = mock_success() + with mocked as mock_request: result = getattr(pipeline, operation)(**params) - expected_endpoint = f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}" - mock_request.assert_called_once_with(method, expected_endpoint) + expected_endpoint = ( + f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}" + ) + if operation == "get": + assert mock_request.call_args_list == [ + call("GET", expected_endpoint), + call( + "GET", + f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}/health", + ), + ] + else: + mock_request.assert_called_once_with(method, expected_endpoint) + if operation == "delete": assert result is None else: @@ -147,12 +160,10 @@ def test_lifecycle_connection_error( class TestPipelineModification: """Tests for update, rename operations.""" - def test_rename_success(self, pipeline, mock_success_response): + def test_rename_success(self, pipeline, mock_success): """Test successful pipeline rename.""" new_name = "renamed-pipeline" - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success() as mock_request: result = pipeline.rename(new_name) mock_request.assert_called_once_with( "PATCH", @@ -327,7 +338,7 @@ def test_to_dict(self, pipeline): class TestPipelineHealth: """Tests for pipeline health endpoint.""" - def test_health_success(self, pipeline, mock_success_response): + def test_health_success(self, pipeline, mock_success): """Test successful health fetch returns expected payload.""" expected = { "pipeline_id": "test-pipeline", @@ -336,11 +347,7 @@ def test_health_success(self, pipeline, mock_success_response): "created_at": "2025-08-31T16:05:09.163872763Z", "updated_at": "2025-08-31T16:05:10.638243216Z", } - mock_success_response.json.return_value = expected - - with patch( - "httpx.Client.request", return_value=mock_success_response - ) as mock_request: + with mock_success(expected) as mock_request: result = pipeline.health() mock_request.assert_called_once_with( "GET", From 5ed7add355deda47d7ce037427f41a13b7ca1aef Mon Sep 17 00:00:00 2001 From: Pablo Pardo Garcia Date: Mon, 6 Oct 2025 16:13:39 +0200 Subject: [PATCH 2/3] format code --- tests/conftest.py | 2 ++ tests/data/pipeline_configs.py | 1 + tests/test_client.py | 10 ++++++---- tests/test_pipeline.py | 4 +--- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9f44c0b..c37af4b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -32,6 +32,7 @@ def get_pipeline_response(valid_config) -> dict: @pytest.fixture def get_health_payload(): """Factory to create a health endpoint payload for a pipeline id.""" + def factory( pipeline_id: str, name: str = "Test Pipeline", @@ -149,6 +150,7 @@ def factory(json_payloads=None): return factory + @pytest.fixture def pipeline_from_id(mock_success, get_pipeline_response, get_health_payload): """Fixture for a successful GET request.""" diff --git a/tests/data/pipeline_configs.py b/tests/data/pipeline_configs.py index 4de80e0..d77d6a3 100644 --- a/tests/data/pipeline_configs.py +++ b/tests/data/pipeline_configs.py @@ -273,6 +273,7 @@ def get_invalid_config() -> dict: }, } + def get_health_payload(pipeline_id: str) -> dict: """Get a health payload for a pipeline.""" return { diff --git a/tests/test_client.py b/tests/test_client.py index 4bc71e7..645fc8c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,10 +25,12 @@ def test_client_get_pipeline_success( client = Client() pipeline_id = "test-pipeline-id" - with mock_success([ - get_pipeline_response, - get_health_payload(pipeline_id), - ]) as mock_request: + with mock_success( + [ + get_pipeline_response, + get_health_payload(pipeline_id), + ] + ) as mock_request: pipeline = client.get_pipeline(pipeline_id) assert mock_request.call_args_list == [ call("GET", f"{client.ENDPOINT}/{pipeline_id}"), diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 185c846..5d1d7bb 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -119,9 +119,7 @@ def test_lifecycle_operations( mocked = mock_success() with mocked as mock_request: result = getattr(pipeline, operation)(**params) - expected_endpoint = ( - f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}" - ) + expected_endpoint = f"{pipeline.ENDPOINT}/{pipeline.pipeline_id}{endpoint}" if operation == "get": assert mock_request.call_args_list == [ call("GET", expected_endpoint), From 48c11a4ee769e96d0eb3627a9d8c3ab1c065d26a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 6 Oct 2025 14:18:35 +0000 Subject: [PATCH 3/3] chore: bump version to 3.2.1 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 944880f..e4604e3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -3.2.0 +3.2.1