Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ pipeline.resume()
print(pipeline.status)
```

### Stop pipeline

```python
# Stop a pipeline gracefully
client.stop_pipeline("my-pipeline-id")

# Stop a pipeline ungracefully (terminate)
client.stop_pipeline("my-pipeline-id", terminate=True)

# Or stop via pipeline instance
pipeline.stop()
```

### Delete pipeline

```python
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.1.0
3.2.0
44 changes: 42 additions & 2 deletions src/glassflow/etl/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,51 @@ def _raise_api_error(response: httpx.Response) -> None:
"""Raise an APIError based on the response."""
status_code = response.status_code
try:
message = response.json().get("message", None)
error_data = response.json()
message = error_data.get("message", None)
code = error_data.get("code", None)
except json.JSONDecodeError:
message = f"{status_code} {response.reason_phrase}"
code = None
error_data = {}

if status_code == 400:
raise errors.ValidationError(status_code, message, response=response)
# Handle specific status validation error codes
if code == "TERMINAL_STATE_VIOLATION":
raise errors.TerminalStateViolationError(
status_code, message, response=response
)
elif code == "INVALID_STATUS_TRANSITION":
raise errors.InvalidStatusTransitionError(
status_code,
message,
response=response,
)
elif code == "UNKNOWN_STATUS":
raise errors.UnknownStatusError(status_code, message, response=response)
elif code == "PIPELINE_ALREADY_IN_STATE":
raise errors.PipelineAlreadyInStateError(
status_code, message, response=response
)
elif code == "PIPELINE_IN_TRANSITION":
raise errors.PipelineInTransitionError(
status_code, message, response=response
)
elif message and message.startswith("invalid json:"):
raise errors.InvalidJsonError(status_code, message, response=response)
elif message and message == "pipeline id cannot be empty":
raise errors.EmptyPipelineIdError(
status_code, message, response=response
)
elif message and message.startswith(
"pipeline can only be deleted if it's stopped or terminated"
):
raise errors.PipelineDeletionStateViolationError(
status_code, message, response=response
)
else:
# Generic 400 error for unknown codes
raise errors.ValidationError(status_code, message, response=response)
elif status_code == 403:
raise errors.ForbiddenError(status_code, message, response=response)
elif status_code == 404:
Expand Down
24 changes: 20 additions & 4 deletions src/glassflow/etl/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,34 @@ def create_pipeline(

return pipeline.create()

def delete_pipeline(self, pipeline_id: str, terminate: bool = True) -> None:
"""Deletes the pipeline with the given ID.
def stop_pipeline(self, pipeline_id: str, terminate: bool = False) -> None:
"""Stops the pipeline with the given ID.

Args:
pipeline_id: The ID of the pipeline to delete
pipeline_id: The ID of the pipeline to stop
terminate: Whether to terminate the pipeline (i.e. delete all the pipeline
components and potentially all the events in the pipeline)

Raises:
PipelineInTransitionError: If pipeline is in transition
PipelineNotFoundError: If pipeline is not found
APIError: If the API request fails
"""
Pipeline(host=self.host, pipeline_id=pipeline_id).stop(terminate=terminate)

def delete_pipeline(self, pipeline_id: str) -> None:
"""Deletes the pipeline with the given ID.

Args:
pipeline_id: The ID of the pipeline to delete

Raises:
PipelineDeletionStateViolationError: If pipeline is not stopped or
terminating
PipelineNotFoundError: If pipeline is not found
APIError: If the API request fails
"""
Pipeline(host=self.host, pipeline_id=pipeline_id).delete(terminate=terminate)
Pipeline(host=self.host, pipeline_id=pipeline_id).delete()

def disable_tracking(self) -> None:
"""Disable tracking of pipeline events."""
Expand Down
36 changes: 36 additions & 0 deletions src/glassflow/etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,39 @@ class InvalidDataTypeMappingError(GlassFlowError):

class InvalidBatchSizeError(GlassFlowError):
"""Exception raised when a batch size is invalid."""


# Status validation error classes for 400 Bad Request responses
class TerminalStateViolationError(ValidationError):
"""Raised when attempting to transition from a terminal state to another state."""


class InvalidStatusTransitionError(ValidationError):
"""Raised when attempting an invalid status transition."""


class UnknownStatusError(ValidationError):
"""Raised when an unknown pipeline status is encountered."""


class PipelineAlreadyInStateError(ValidationError):
"""Raised when pipeline is already in the requested state."""


class PipelineInTransitionError(ValidationError):
"""
Raised when pipeline is currently transitioning and cannot perform the
requested operation.
"""


class InvalidJsonError(ValidationError):
"""Raised when malformed JSON is provided in request body."""


class EmptyPipelineIdError(ValidationError):
"""Raised when pipeline ID parameter is empty or whitespace."""


class PipelineDeletionStateViolationError(ValidationError):
"""Raised when attempting to delete a pipeline that's not in a deletable state."""
3 changes: 3 additions & 0 deletions src/glassflow/etl/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ class PipelineStatus(CaseInsensitiveStrEnum):
PAUSING = "Pausing"
PAUSED = "Paused"
RESUMING = "Resuming"
STOPPING = "Stopping"
STOPPED = "Stopped"
TERMINATING = "Terminating"
TERMINATED = "Terminated"
FAILED = "Failed"
DELETED = "Deleted"


class PipelineConfig(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion src/glassflow/etl/models/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ConsumerGroupOffset(CaseInsensitiveStrEnum):


class TopicConfig(BaseModel):
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.EARLIEST
consumer_group_initial_offset: ConsumerGroupOffset = ConsumerGroupOffset.LATEST
name: str
event_schema: Schema = Field(alias="schema")
deduplication: Optional[DeduplicationConfig] = Field(default=DeduplicationConfig())
Expand Down
53 changes: 43 additions & 10 deletions src/glassflow/etl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,52 @@ def update(
"""
raise NotImplementedError("Updating is not implemented")

def delete(self, terminate: bool = True) -> None:
"""Deletes the pipeline with the given ID.
def delete(self) -> None:
"""
Deletes the pipeline from the database. Only pipelines that are stopped or
terminating can be deleted.

Raises:
PipelineDeletionStateViolationError: If pipeline is not stopped or
terminating
PipelineNotFoundError: If pipeline is not found
APIError: If the API request fails
"""
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}"
self._request("DELETE", endpoint, event_name="PipelineDeleted")
self.status = models.PipelineStatus.DELETED

def stop(self, terminate: bool = False) -> Pipeline:
"""
Stops the pipeline. Gracefully by default, ungracefully if terminate is True.
Ungracefully means deleting all the pipeline components without waiting for the
events in the pipeline to be processed.

Args:
terminate: Whether to terminate the pipeline (i.e. delete all the pipeline
components and potentially all the events in the pipeline)

Returns:
Pipeline: A Pipeline instance for the stopped pipeline

Raises:
PipelineInTransitionError: If pipeline is in transition
PipelineNotFoundError: If pipeline is not found
InvalidStatusTransitionError: If pipeline is not in a state that can be
stopped
APIError: If the API request fails
"""
if not terminate:
raise NotImplementedError("Graceful deletion is not implemented")

if self.config is None:
self.get()
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
self._request("DELETE", endpoint, event_name="PipelineDeleted")
self.status = models.PipelineStatus.TERMINATING
if terminate:
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/terminate"
next_status = models.PipelineStatus.TERMINATING
event_name = "PipelineTerminated"
else:
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/stop"
next_status = models.PipelineStatus.STOPPING
event_name = "PipelineStopped"
self._request("POST", endpoint, event_name=event_name)
self.status = next_status
return self

def pause(self) -> Pipeline:
"""Pauses the pipeline with the given ID.
Expand All @@ -172,7 +199,10 @@ def pause(self) -> Pipeline:
Pipeline: A Pipeline instance for the paused pipeline

Raises:
PipelineInTransitionError: If pipeline is in transition
PipelineNotFoundError: If pipeline is not found
InvalidStatusTransitionError: If pipeline is not in a state that can be
paused
APIError: If the API request fails
"""
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/pause"
Expand All @@ -187,7 +217,10 @@ def resume(self) -> Pipeline:
Pipeline: A Pipeline instance for the resumed pipeline

Raises:
PipelineInTransitionError: If pipeline is in transition
PipelineNotFoundError: If pipeline is not found
InvalidStatusTransitionError: If pipeline is not in a state that can be
resumed
APIError: If the API request fails
"""
endpoint = f"{self.ENDPOINT}/{self.pipeline_id}/resume"
Expand Down
109 changes: 105 additions & 4 deletions tests/data/error_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,132 @@ def get_http_error_scenarios():
{
"name": "not_found",
"status_code": 404,
"text": "Pipeline not found",
"json_data": {"message": "Pipeline not found"},
"expected_error": errors.PipelineNotFoundError,
"error_message": "not found",
},
{
"name": "forbidden",
"status_code": 403,
"text": "Pipeline already active",
"json_data": {"message": "Pipeline already active"},
"expected_error": errors.PipelineAlreadyExistsError,
"error_message": "already exists",
},
{
"name": "bad_request",
"status_code": 400,
"text": "Bad request",
"json_data": {"message": "Bad request"},
"expected_error": errors.ValidationError,
"error_message": "Bad request",
},
{
"name": "server_error",
"status_code": 500,
"text": "Internal server error",
"json_data": {"message": "Internal server error"},
"expected_error": errors.ServerError,
"error_message": "Internal server error",
},
# Status validation error scenarios for 400 Bad Request responses
{
"name": "terminal_state_violation",
"status_code": 400,
"json_data": {
"message": (
"Cannot transition from terminal state Terminated to Running"
),
"code": "TERMINAL_STATE_VIOLATION",
"current_status": "Terminated",
"requested_status": "Running",
},
"expected_error": errors.TerminalStateViolationError,
"error_message": (
"Cannot transition from terminal state Terminated to Running"
),
},
{
"name": "invalid_status_transition",
"status_code": 400,
"json_data": {
"message": "Invalid status transition from Running to Paused",
"code": "INVALID_STATUS_TRANSITION",
"current_status": "Running",
"requested_status": "Paused",
"valid_transitions": ["Stopping", "Terminating"],
},
"expected_error": errors.InvalidStatusTransitionError,
"error_message": "Invalid status transition from Running to Paused",
},
{
"name": "unknown_status",
"status_code": 400,
"json_data": {
"message": "Unknown pipeline status: InvalidStatus",
"code": "UNKNOWN_STATUS",
"current_status": "InvalidStatus",
},
"expected_error": errors.UnknownStatusError,
"error_message": "Unknown pipeline status: InvalidStatus",
},
{
"name": "pipeline_already_in_state",
"status_code": 400,
"json_data": {
"message": "Pipeline is already in Running state",
"code": "PIPELINE_ALREADY_IN_STATE",
"current_status": "Running",
"requested_status": "Running",
},
"expected_error": errors.PipelineAlreadyInStateError,
"error_message": "Pipeline is already in Running state",
},
{
"name": "pipeline_in_transition",
"status_code": 400,
"json_data": {
"message": (
"Pipeline is currently transitioning from Pausing state, "
"cannot perform Stopping operation"
),
"code": "PIPELINE_IN_TRANSITION",
"current_status": "Pausing",
"requested_status": "Stopping",
},
"expected_error": errors.PipelineInTransitionError,
"error_message": (
"Pipeline is currently transitioning from Pausing state, "
"cannot perform Stopping operation"
),
},
{
"name": "invalid_json",
"status_code": 400,
"json_data": {"message": "invalid json: unexpected end of JSON input"},
"expected_error": errors.InvalidJsonError,
"error_message": "invalid json: unexpected end of JSON input",
},
{
"name": "empty_pipeline_id",
"status_code": 400,
"json_data": {"message": "pipeline id cannot be empty"},
"expected_error": errors.EmptyPipelineIdError,
"error_message": "pipeline id cannot be empty",
},
{
"name": "pipeline_deletion_state_violation",
"status_code": 400,
"json_data": {
"message": (
"pipeline can only be deleted if it's stopped or terminated, "
"current status: Running"
),
"field": {"current_status": "Running"},
},
"expected_error": errors.PipelineDeletionStateViolationError,
"error_message": (
"pipeline can only be deleted if it's stopped or terminated, "
"current status: Running"
),
},
]


Expand Down
Loading