Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ayon_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
get_default_fields_for_type,
get_rest_entity_by_id,
send_batch_operations,
send_background_batch_operations,
get_background_operations_status,
get_installers,
create_installer,
update_installer,
Expand Down Expand Up @@ -347,6 +349,8 @@
"get_default_fields_for_type",
"get_rest_entity_by_id",
"send_batch_operations",
"send_background_batch_operations",
"get_background_operations_status",
"get_installers",
"create_installer",
"update_installer",
Expand Down
75 changes: 75 additions & 0 deletions ayon_api/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ActivityReferenceType,
EntityListEntityType,
EntityListItemMode,
BackgroundOperationTask,
LinkDirection,
EventFilter,
EventStatus,
Expand Down Expand Up @@ -1253,6 +1254,80 @@ def send_batch_operations(
)


def send_background_batch_operations(
project_name: str,
operations: list[dict[str, Any]],
*,
can_fail: bool = False,
wait: bool = False,
raise_on_fail: bool = True,
) -> BackgroundOperationTask:
"""Post multiple CRUD operations to server.

When multiple changes should be made on server side this is the best
way to go. It is possible to pass multiple operations to process on a
server side and do the changes in a transaction.

Compared to 'send_batch_operations' this function creates a task on
server which then can be periodically checked for a status and
receive it's result.

When used with 'wait' set to 'True' this method blocks until task is
finished. Which makes it work as 'send_batch_operations'
but safer for large operations batch as is not bound to
response timeout.

Args:
project_name (str): On which project should be operations
processed.
operations (list[dict[str, Any]]): Operations to be processed.
can_fail (Optional[bool]): Server will try to process all
operations even if one of them fails.
wait (bool): Wait for operations to end.
raise_on_fail (Optional[bool]): Raise exception if an operation
fails. You can handle failed operations on your own
when set to 'False'. Used when 'wait' is enabled.

Raises:
ValueError: Operations can't be converted to json string.
FailedOperations: When output does not contain server operations
or 'raise_on_fail' is enabled and any operation fails.

Returns:
BackgroundOperationTask: Background operation.

"""
con = get_server_api_connection()
return con.send_background_batch_operations(
project_name=project_name,
operations=operations,
can_fail=can_fail,
wait=wait,
raise_on_fail=raise_on_fail,
)


def get_background_operations_status(
project_name: str,
task_id: str,
) -> BackgroundOperationTask:
"""Get status of background operations task.

Args:
project_name (str): Project name.
task_id (str): Backgorund operation task id.

Returns:
BackgroundOperationTask: Background operation.

"""
con = get_server_api_connection()
return con.get_background_operations_status(
project_name=project_name,
task_id=task_id,
)


def get_installers(
version: Optional[str] = None,
platform_name: Optional[str] = None,
Expand Down
7 changes: 5 additions & 2 deletions ayon_api/entity_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,8 +1225,11 @@ def commit_changes(self) -> None:
if not entity.created:
operations_body.append(self._get_delete_body(entity))

self._connection.send_batch_operations(
self.project_name, operations_body
self._connection.send_background_batch_operations(
self.project_name,
operations_body,
can_fail=False,
wait=True,
)
if post_project_changes:
self._connection.update_project(
Expand Down
162 changes: 135 additions & 27 deletions ayon_api/server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
ServerVersion,
AnyEntityDict,
StreamType,
BackgroundOperationTask,
)

VERSION_REGEX = re.compile(
Expand Down Expand Up @@ -1870,7 +1871,7 @@ def send_batch_operations(
project_name: str,
operations: list[dict[str, Any]],
can_fail: bool = False,
raise_on_fail: bool = True
raise_on_fail: bool = True,
) -> list[dict[str, Any]]:
"""Post multiple CRUD operations to server.

Expand Down Expand Up @@ -1904,17 +1905,98 @@ def send_batch_operations(
raise_on_fail,
)

def _send_batch_operations(
def send_background_batch_operations(
self,
uri: str,
project_name: str,
operations: list[dict[str, Any]],
can_fail: bool,
raise_on_fail: bool
) -> list[dict[str, Any]]:
if not operations:
return []
*,
can_fail: bool = False,
wait: bool = False,
raise_on_fail: bool = True,
) -> BackgroundOperationTask:
"""Post multiple CRUD operations to server.

When multiple changes should be made on server side this is the best
way to go. It is possible to pass multiple operations to process on a
server side and do the changes in a transaction.

Compared to 'send_batch_operations' this function creates a task on
server which then can be periodically checked for a status and
receive it's result.

When used with 'wait' set to 'True' this method blocks until task is
finished. Which makes it work as 'send_batch_operations'
but safer for large operations batch as is not bound to
response timeout.

Args:
project_name (str): On which project should be operations
processed.
operations (list[dict[str, Any]]): Operations to be processed.
can_fail (Optional[bool]): Server will try to process all
operations even if one of them fails.
wait (bool): Wait for operations to end.
raise_on_fail (Optional[bool]): Raise exception if an operation
fails. You can handle failed operations on your own
when set to 'False'. Used when 'wait' is enabled.

Raises:
ValueError: Operations can't be converted to json string.
FailedOperations: When output does not contain server operations
or 'raise_on_fail' is enabled and any operation fails.

Returns:
BackgroundOperationTask: Background operation.

"""
operations_body = self._prepare_operations_body(operations)
response = self.post(
f"projects/{project_name}/operations/background",
operations=operations_body,
canFail=can_fail
)
response.raise_for_status()
if not wait:
return response.data

body_by_id = {}
task_id = response["id"]
time.sleep(0.1)
while True:
op_status = self.get_background_operations_status(
project_name, task_id
)
if op_status["status"] == "completed":
break
time.sleep(1)

if raise_on_fail:
self._validate_operations_result(
op_status["result"], operations_body
)
return op_status

def get_background_operations_status(
self, project_name: str, task_id: str
) -> BackgroundOperationTask:
"""Get status of background operations task.

Args:
project_name (str): Project name.
task_id (str): Backgorund operation task id.

Returns:
BackgroundOperationTask: Background operation.

"""
response = self.get(
f"projects/{project_name}/operations/background/{task_id}"
)
response.raise_for_status()
return response.data

def _prepare_operations_body(
self, operations: list[dict[str, Any]]
) -> list[dict[str, Any]]:
operations_body = []
for operation in operations:
if not operation:
Expand All @@ -1936,42 +2018,68 @@ def _send_batch_operations(
)
))

body_by_id[op_id] = body
operations_body.append(body)
return operations_body

def _send_batch_operations(
self,
uri: str,
operations: list[dict[str, Any]],
can_fail: bool,
raise_on_fail: bool
) -> list[dict[str, Any]]:
if not operations:
return []

operations_body = self._prepare_operations_body(operations)
if not operations_body:
return []

result = self.post(
response = self.post(
uri,
operations=operations_body,
canFail=can_fail
)

op_results = result.get("operations")
op_results = response.get("operations")
if op_results is None:
detail = result.get("detail")
detail = response.get("detail")
if detail:
raise FailedOperations(f"Operation failed. Detail: {detail}")
raise FailedOperations(
f"Operation failed. Content: {result.text}"
f"Operation failed. Content: {response.text}"
)

if result.get("success") or not raise_on_fail:
return op_results

for op_result in op_results:
if not op_result["success"]:
operation_id = op_result["id"]
raise FailedOperations((
"Operation \"{}\" failed with data:\n{}\nDetail: {}."
).format(
operation_id,
json.dumps(body_by_id[operation_id], indent=4),
op_result["detail"],
))
if raise_on_fail:
self._validate_operations_result(response.data, operations_body)
return op_results

def _validate_operations_result(
self,
result: dict[str, Any],
operations_body: list[dict[str, Any]],
) -> None:
if result.get("success"):
return None

print(result)
for op_result in result["operations"]:
if op_result["success"]:
continue

operation_id = op_result["id"]
operation = next(
op
for op in operations_body
if op["id"] == operation_id
)
detail = op_result["detail"]
raise FailedOperations(
f"Operation \"{operation_id}\" failed with data:"
f"\n{json.dumps(operation, indent=4)}"
f"\nDetail: {detail}."
)

def _prepare_fields(
self, entity_type: str, fields: set[str], own_attributes: bool = False
):
Expand Down
6 changes: 6 additions & 0 deletions ayon_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class EventFilter(TypedDict):
operator: Literal["and", "or"]


class BackgroundOperationTask(TypedDict):
id: str
status: Literal["pending", "in_progress", "completed"]
result: Optional[dict[str, Any]]


AttributeScope = Literal[
"project",
"folder",
Expand Down