Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.0
3.2.1
2 changes: 1 addition & 1 deletion src/glassflow/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def get(self) -> Pipeline:
"GET", f"{self.ENDPOINT}/{self.pipeline_id}", event_name="PipelineGet"
)
self.config = models.PipelineConfig.model_validate(response.json())
self.status = models.PipelineStatus(response.json()["status"])
self.health()
self._dlq = DLQ(pipeline_id=self.pipeline_id, host=self.host)
return self

Expand Down
63 changes: 55 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ def get_pipeline_response(valid_config) -> dict:
return config


@pytest.fixture
def get_health_payload():
"""Factory to create a health endpoint payload for a pipeline id."""

def factory(
pipeline_id: str,
name: str = "Test Pipeline",
status: str = "Running",
) -> dict:
return {
"pipeline_id": pipeline_id,
"pipeline_name": name,
"overall_status": status,
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
}

return factory


@pytest.fixture
def valid_config_without_joins() -> dict:
"""Fixture for a valid pipeline configuration without joins."""
Expand Down Expand Up @@ -96,18 +116,45 @@ def mock_connection_error():


@pytest.fixture
def mock_success_get_pipeline(get_pipeline_response):
"""Fixture for a successful GET pipeline response."""
return mock_responses.create_mock_response_factory()(
status_code=200,
json_data=get_pipeline_response,
)
def mock_success():
"""Factory-context fixture that patches httpx and returns 200 with JSON.

- Accepts either a single dict payload or a list of dict payloads via the
optional argument to the returned context manager. If a list is provided,
they are returned sequentially from response.json() to simulate multiple
HTTP calls within the same test flow.
- If no payload is provided, it defaults to {"message": "Success"}.

Usage:
with mock_success(payload_or_list) as mock_request:
# invoke code under test
assert mock_request.call_args_list == [...]
"""
from contextlib import contextmanager

@contextmanager
def factory(json_payloads=None):
if json_payloads is None:
json_payloads = [{"message": "Success"}]
payload_list = (
list(json_payloads) if isinstance(json_payloads, list) else [json_payloads]
)
response = mock_responses.create_mock_response_factory()(
status_code=200,
json_data=payload_list[0] if payload_list else {},
)
with patch("httpx.Client.request", return_value=response) as mock:
if payload_list:
response.json.side_effect = payload_list
yield mock

return factory


@pytest.fixture
def pipeline_from_id(mock_success_get_pipeline):
def pipeline_from_id(mock_success, get_pipeline_response, get_health_payload):
"""Fixture for a successful GET request."""
with patch("httpx.Client.request", return_value=mock_success_get_pipeline):
with mock_success([get_pipeline_response, get_health_payload("test-pipeline-id")]):
return Pipeline(pipeline_id="test-pipeline-id").get()


Expand Down
11 changes: 11 additions & 0 deletions tests/data/pipeline_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,14 @@ def get_invalid_config() -> dict:
"table_mapping": [], # Empty table mapping should trigger validation error
},
}


def get_health_payload(pipeline_id: str) -> dict:
"""Get a health payload for a pipeline."""
return {
"pipeline_id": pipeline_id,
"pipeline_name": "Test Pipeline",
"overall_status": "Running",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
}
62 changes: 25 additions & 37 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import call, patch

import pytest

Expand All @@ -19,21 +19,23 @@ def test_client_init(self):
assert client.http_client.base_url == "https://example.com"

def test_client_get_pipeline_success(
self, get_pipeline_response, mock_success_response
self, mock_success, get_pipeline_response, get_health_payload
):
"""Test successful pipeline retrieval by ID."""
client = Client()
pipeline_id = "test-pipeline-id"

mock_success_response.json.return_value = get_pipeline_response

with patch(
"httpx.Client.request", return_value=mock_success_response
with mock_success(
[
get_pipeline_response,
get_health_payload(pipeline_id),
]
) as mock_request:
pipeline = client.get_pipeline(pipeline_id)
mock_request.assert_called_once_with(
"GET", f"{client.ENDPOINT}/{pipeline_id}"
)
assert mock_request.call_args_list == [
call("GET", f"{client.ENDPOINT}/{pipeline_id}"),
call("GET", f"{client.ENDPOINT}/{pipeline_id}/health"),
]
assert isinstance(pipeline, Pipeline)
assert pipeline.pipeline_id == pipeline_id

Expand Down Expand Up @@ -126,13 +128,11 @@ def test_client_list_pipelines_empty(self):
mock_request.assert_called_once_with("GET", client.ENDPOINT)
assert pipelines == []

def test_client_create_pipeline_success(self, valid_config, mock_success_response):
def test_client_create_pipeline_success(self, valid_config, mock_success):
"""Test successful pipeline creation."""
client = Client()

with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
pipeline = client.create_pipeline(valid_config)
mock_request.assert_called_once_with(
"POST", client.ENDPOINT, json=mock_request.call_args[1]["json"]
Expand All @@ -150,25 +150,21 @@ def test_client_create_pipeline_already_exists(
with pytest.raises(errors.PipelineAlreadyExistsError):
client.create_pipeline(valid_config)

def test_client_create_pipeline_from_yaml_success(self, mock_success_response):
def test_client_create_pipeline_from_yaml_success(self, mock_success):
"""Test pipeline creation from YAML file."""
client = Client()
with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
client.create_pipeline(
pipeline_config_yaml_path="tests/data/valid_pipeline.yaml"
)
mock_request.assert_called_once_with(
"POST", client.ENDPOINT, json=mock_request.call_args[1]["json"]
)

def test_client_create_pipeline_from_json_success(self, mock_success_response):
def test_client_create_pipeline_from_json_success(self, mock_success):
"""Test pipeline creation from JSON file."""
client = Client()
with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
client.create_pipeline(
pipeline_config_json_path="tests/data/valid_pipeline.json"
)
Expand All @@ -188,14 +184,12 @@ def test_client_create_pipeline_value_error(self, valid_config):
with pytest.raises(ValueError):
client.create_pipeline()

def test_client_delete_pipeline_success(self, mock_success_response):
def test_client_delete_pipeline_success(self, mock_success):
"""Test successful pipeline deletion."""
client = Client()
pipeline_id = "test-pipeline-id"

with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_delete_request:
with mock_success() as mock_delete_request:
client.delete_pipeline(pipeline_id)
mock_delete_request.assert_called_once_with(
"DELETE", f"{client.ENDPOINT}/{pipeline_id}"
Expand All @@ -211,27 +205,23 @@ def test_client_delete_pipeline_not_found(self, mock_not_found_response):
client.delete_pipeline(pipeline_id)
assert "not found" in str(exc_info.value)

def test_client_stop_pipeline_success(self, mock_success_response):
def test_client_stop_pipeline_success(self, mock_success):
"""Test successful pipeline stop."""
client = Client()
pipeline_id = "test-pipeline-id"

with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
client.stop_pipeline(pipeline_id)
mock_request.assert_called_once_with(
"POST", f"{client.ENDPOINT}/{pipeline_id}/stop"
)

def test_client_stop_pipeline_terminate_success(self, mock_success_response):
def test_client_stop_pipeline_terminate_success(self, mock_success):
"""Test successful pipeline stop with terminate=True."""
client = Client()
pipeline_id = "test-pipeline-id"

with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
client.stop_pipeline(pipeline_id, terminate=True)
mock_request.assert_called_once_with(
"POST", f"{client.ENDPOINT}/{pipeline_id}/terminate"
Expand All @@ -256,11 +246,9 @@ def test_pipeline_to_dict(self, valid_config):
assert isinstance(pipeline_dict, dict)
assert pipeline_dict["pipeline_id"] == valid_config["pipeline_id"]

def test_pipeline_delete(self, pipeline_from_id, mock_success_response):
def test_pipeline_delete(self, pipeline_from_id, mock_success):
"""Test Pipeline delete with explicit pipeline_id."""
with patch(
"httpx.Client.request", return_value=mock_success_response
) as mock_request:
with mock_success() as mock_request:
pipeline_from_id.delete()
mock_request.assert_called_once_with(
"DELETE",
Expand Down
52 changes: 17 additions & 35 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,13 @@ def test_dlq_initialization(self, dlq):
assert dlq.http_client.base_url == "http://localhost:8080"
assert dlq.endpoint == "/api/v1/pipeline/test-pipeline/dlq"

def test_consume_success(self, dlq):
def test_consume_success(self, dlq, mock_success):
"""Test successful DLQ consume operation."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=200,
json_data=[
{"id": "msg1", "content": "test message 1"},
{"id": "msg2", "content": "test message 2"},
],
)

with patch("httpx.Client.request", return_value=mock_response) as mock_get:
payload = [
{"id": "msg1", "content": "test message 1"},
{"id": "msg2", "content": "test message 2"},
]
with mock_success(json_payloads=[payload]) as mock_get:
result = dlq.consume(batch_size=50)

mock_get.assert_called_once_with(
Expand Down Expand Up @@ -79,18 +75,14 @@ def test_consume_http_error_scenarios(self, dlq, scenario):

assert scenario["error_message"] in str(exc_info.value)

def test_state_success(self, dlq):
def test_state_success(self, dlq, mock_success):
"""Test successful DLQ state operation."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=200,
json_data={
"total_messages": 42,
"pending_messages": 5,
"last_updated": "2023-01-01T00:00:00Z",
},
)

with patch("httpx.Client.request", return_value=mock_response) as mock_get:
state_payload = {
"total_messages": 42,
"pending_messages": 5,
"last_updated": "2023-01-01T00:00:00Z",
}
with mock_success(state_payload) as mock_get:
result = dlq.state()

mock_get.assert_called_once_with("GET", f"{dlq.endpoint}/state")
Expand Down Expand Up @@ -133,26 +125,16 @@ def test_pipeline_dlq_property_same_url(self):
assert pipeline.http_client.base_url == custom_url
assert pipeline.dlq.http_client.base_url == custom_url

def test_pipeline_dlq_consume_integration(self, pipeline):
def test_pipeline_dlq_consume_integration(self, pipeline, mock_success):
"""Test Pipeline DLQ consume functionality."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=200,
json_data=[{"id": "msg1", "content": "test"}],
)

with patch("httpx.Client.request", return_value=mock_response):
with mock_success(json_payloads=[[{"id": "msg1", "content": "test"}]]):
result = pipeline.dlq.consume(batch_size=10)

assert result == [{"id": "msg1", "content": "test"}]

def test_pipeline_dlq_state_integration(self, pipeline):
def test_pipeline_dlq_state_integration(self, pipeline, mock_success):
"""Test Pipeline DLQ state functionality."""
mock_response = mock_responses.create_mock_response_factory()(
status_code=200,
json_data={"total_messages": 10},
)

with patch("httpx.Client.request", return_value=mock_response):
with mock_success({"total_messages": 10}):
result = pipeline.dlq.state()

assert result == {"total_messages": 10}
Loading