diff --git a/ayon_api/__init__.py b/ayon_api/__init__.py index 0d077afcb..a41fee778 100644 --- a/ayon_api/__init__.py +++ b/ayon_api/__init__.py @@ -80,6 +80,8 @@ get_default_fields_for_type, get_rest_entity_by_id, send_batch_operations, + send_background_batch_operations, + get_background_operations_status, get_installers, create_installer, update_installer, @@ -347,6 +349,8 @@ "get_default_fields_for_type", "get_rest_entity_by_id", "send_batch_operations", + "send_background_batch_operations", + "get_background_operations_status", "get_installers", "create_installer", "update_installer", diff --git a/ayon_api/_api.py b/ayon_api/_api.py index 14d1988b7..b6a101f1c 100644 --- a/ayon_api/_api.py +++ b/ayon_api/_api.py @@ -48,6 +48,7 @@ ActivityReferenceType, EntityListEntityType, EntityListItemMode, + BackgroundOperationTask, LinkDirection, EventFilter, EventStatus, @@ -1253,6 +1254,80 @@ def send_batch_operations( ) +def send_background_batch_operations( + project_name: str, + operations: list[dict[str, Any]], + *, + can_fail: bool = False, + wait: bool = False, + raise_on_fail: bool = True, +) -> BackgroundOperationTask: + """Post multiple CRUD operations to server. + + When multiple changes should be made on server side this is the best + way to go. It is possible to pass multiple operations to process on a + server side and do the changes in a transaction. + + Compared to 'send_batch_operations' this function creates a task on + server which then can be periodically checked for a status and + receive it's result. + + When used with 'wait' set to 'True' this method blocks until task is + finished. Which makes it work as 'send_batch_operations' + but safer for large operations batch as is not bound to + response timeout. + + Args: + project_name (str): On which project should be operations + processed. + operations (list[dict[str, Any]]): Operations to be processed. + can_fail (Optional[bool]): Server will try to process all + operations even if one of them fails. + wait (bool): Wait for operations to end. + raise_on_fail (Optional[bool]): Raise exception if an operation + fails. You can handle failed operations on your own + when set to 'False'. Used when 'wait' is enabled. + + Raises: + ValueError: Operations can't be converted to json string. + FailedOperations: When output does not contain server operations + or 'raise_on_fail' is enabled and any operation fails. + + Returns: + BackgroundOperationTask: Background operation. + + """ + con = get_server_api_connection() + return con.send_background_batch_operations( + project_name=project_name, + operations=operations, + can_fail=can_fail, + wait=wait, + raise_on_fail=raise_on_fail, + ) + + +def get_background_operations_status( + project_name: str, + task_id: str, +) -> BackgroundOperationTask: + """Get status of background operations task. + + Args: + project_name (str): Project name. + task_id (str): Backgorund operation task id. + + Returns: + BackgroundOperationTask: Background operation. + + """ + con = get_server_api_connection() + return con.get_background_operations_status( + project_name=project_name, + task_id=task_id, + ) + + def get_installers( version: Optional[str] = None, platform_name: Optional[str] = None, diff --git a/ayon_api/entity_hub.py b/ayon_api/entity_hub.py index 62e6a4f54..5e52ef8f1 100644 --- a/ayon_api/entity_hub.py +++ b/ayon_api/entity_hub.py @@ -1225,8 +1225,11 @@ def commit_changes(self) -> None: if not entity.created: operations_body.append(self._get_delete_body(entity)) - self._connection.send_batch_operations( - self.project_name, operations_body + self._connection.send_background_batch_operations( + self.project_name, + operations_body, + can_fail=False, + wait=True, ) if post_project_changes: self._connection.update_project( diff --git a/ayon_api/server_api.py b/ayon_api/server_api.py index aaa588973..87bb044d5 100644 --- a/ayon_api/server_api.py +++ b/ayon_api/server_api.py @@ -91,6 +91,7 @@ ServerVersion, AnyEntityDict, StreamType, + BackgroundOperationTask, ) VERSION_REGEX = re.compile( @@ -1870,7 +1871,7 @@ def send_batch_operations( project_name: str, operations: list[dict[str, Any]], can_fail: bool = False, - raise_on_fail: bool = True + raise_on_fail: bool = True, ) -> list[dict[str, Any]]: """Post multiple CRUD operations to server. @@ -1904,17 +1905,98 @@ def send_batch_operations( raise_on_fail, ) - def _send_batch_operations( + def send_background_batch_operations( self, - uri: str, + project_name: str, operations: list[dict[str, Any]], - can_fail: bool, - raise_on_fail: bool - ) -> list[dict[str, Any]]: - if not operations: - return [] + *, + can_fail: bool = False, + wait: bool = False, + raise_on_fail: bool = True, + ) -> BackgroundOperationTask: + """Post multiple CRUD operations to server. + + When multiple changes should be made on server side this is the best + way to go. It is possible to pass multiple operations to process on a + server side and do the changes in a transaction. + + Compared to 'send_batch_operations' this function creates a task on + server which then can be periodically checked for a status and + receive it's result. + + When used with 'wait' set to 'True' this method blocks until task is + finished. Which makes it work as 'send_batch_operations' + but safer for large operations batch as is not bound to + response timeout. + + Args: + project_name (str): On which project should be operations + processed. + operations (list[dict[str, Any]]): Operations to be processed. + can_fail (Optional[bool]): Server will try to process all + operations even if one of them fails. + wait (bool): Wait for operations to end. + raise_on_fail (Optional[bool]): Raise exception if an operation + fails. You can handle failed operations on your own + when set to 'False'. Used when 'wait' is enabled. + + Raises: + ValueError: Operations can't be converted to json string. + FailedOperations: When output does not contain server operations + or 'raise_on_fail' is enabled and any operation fails. + + Returns: + BackgroundOperationTask: Background operation. + + """ + operations_body = self._prepare_operations_body(operations) + response = self.post( + f"projects/{project_name}/operations/background", + operations=operations_body, + canFail=can_fail + ) + response.raise_for_status() + if not wait: + return response.data - body_by_id = {} + task_id = response["id"] + time.sleep(0.1) + while True: + op_status = self.get_background_operations_status( + project_name, task_id + ) + if op_status["status"] == "completed": + break + time.sleep(1) + + if raise_on_fail: + self._validate_operations_result( + op_status["result"], operations_body + ) + return op_status + + def get_background_operations_status( + self, project_name: str, task_id: str + ) -> BackgroundOperationTask: + """Get status of background operations task. + + Args: + project_name (str): Project name. + task_id (str): Backgorund operation task id. + + Returns: + BackgroundOperationTask: Background operation. + + """ + response = self.get( + f"projects/{project_name}/operations/background/{task_id}" + ) + response.raise_for_status() + return response.data + + def _prepare_operations_body( + self, operations: list[dict[str, Any]] + ) -> list[dict[str, Any]]: operations_body = [] for operation in operations: if not operation: @@ -1936,42 +2018,68 @@ def _send_batch_operations( ) )) - body_by_id[op_id] = body operations_body.append(body) + return operations_body + def _send_batch_operations( + self, + uri: str, + operations: list[dict[str, Any]], + can_fail: bool, + raise_on_fail: bool + ) -> list[dict[str, Any]]: + if not operations: + return [] + + operations_body = self._prepare_operations_body(operations) if not operations_body: return [] - result = self.post( + response = self.post( uri, operations=operations_body, canFail=can_fail ) - op_results = result.get("operations") + op_results = response.get("operations") if op_results is None: - detail = result.get("detail") + detail = response.get("detail") if detail: raise FailedOperations(f"Operation failed. Detail: {detail}") raise FailedOperations( - f"Operation failed. Content: {result.text}" + f"Operation failed. Content: {response.text}" ) - if result.get("success") or not raise_on_fail: - return op_results - - for op_result in op_results: - if not op_result["success"]: - operation_id = op_result["id"] - raise FailedOperations(( - "Operation \"{}\" failed with data:\n{}\nDetail: {}." - ).format( - operation_id, - json.dumps(body_by_id[operation_id], indent=4), - op_result["detail"], - )) + if raise_on_fail: + self._validate_operations_result(response.data, operations_body) return op_results + def _validate_operations_result( + self, + result: dict[str, Any], + operations_body: list[dict[str, Any]], + ) -> None: + if result.get("success"): + return None + + print(result) + for op_result in result["operations"]: + if op_result["success"]: + continue + + operation_id = op_result["id"] + operation = next( + op + for op in operations_body + if op["id"] == operation_id + ) + detail = op_result["detail"] + raise FailedOperations( + f"Operation \"{operation_id}\" failed with data:" + f"\n{json.dumps(operation, indent=4)}" + f"\nDetail: {detail}." + ) + def _prepare_fields( self, entity_type: str, fields: set[str], own_attributes: bool = False ): diff --git a/ayon_api/typing.py b/ayon_api/typing.py index cce3f196a..24458cebb 100644 --- a/ayon_api/typing.py +++ b/ayon_api/typing.py @@ -89,6 +89,12 @@ class EventFilter(TypedDict): operator: Literal["and", "or"] +class BackgroundOperationTask(TypedDict): + id: str + status: Literal["pending", "in_progress", "completed"] + result: Optional[dict[str, Any]] + + AttributeScope = Literal[ "project", "folder",