diff --git a/tests/resources/addon/.gitignore b/tests/resources/addon/.gitignore new file mode 100644 index 000000000..4ac096f1e --- /dev/null +++ b/tests/resources/addon/.gitignore @@ -0,0 +1,2 @@ +/package/ +/__pycache__/ \ No newline at end of file diff --git a/tests/resources/addon/create_package.py b/tests/resources/addon/create_package.py new file mode 100644 index 000000000..5c5ba8590 --- /dev/null +++ b/tests/resources/addon/create_package.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python + +"""Prepares server package from addon repo to upload to server. + +Requires Python 3.9. (Or at least 3.8+). + +This script should be called from cloned addon repo. + +It will produce 'package' subdirectory which could be pasted into server +addon directory directly (eg. into `ayon-backend/addons`). + +Format of package folder: +ADDON_REPO/package/{addon name}/{addon version} + +You can specify `--output_dir` in arguments to change output directory where +package will be created. Existing package directory will always be purged if +already present! This could be used to create package directly in server folder +if available. + +Package contains server side files directly, +client side code zipped in `private` subfolder. +""" + +import os +import sys +import re +import io +import shutil +import platform +import argparse +import logging +import collections +import zipfile +import subprocess +from typing import Optional, Iterable, Pattern, Union, List, Tuple + +import package + +FileMapping = Tuple[Union[str, io.BytesIO], str] +ADDON_NAME: str = package.name +ADDON_VERSION: str = package.version +ADDON_CLIENT_DIR: Union[str, None] = getattr(package, "client_dir", None) + +CURRENT_ROOT: str = os.path.dirname(os.path.abspath(__file__)) +SERVER_ROOT: str = os.path.join(CURRENT_ROOT, "server") +FRONTEND_ROOT: str = os.path.join(CURRENT_ROOT, "frontend") +FRONTEND_DIST_ROOT: str = os.path.join(FRONTEND_ROOT, "dist") +DST_DIST_DIR: str = os.path.join("frontend", "dist") +PRIVATE_ROOT: str = os.path.join(CURRENT_ROOT, "private") +PUBLIC_ROOT: str = os.path.join(CURRENT_ROOT, "public") +CLIENT_ROOT: str = os.path.join(CURRENT_ROOT, "client") + +VERSION_PY_CONTENT = f'''# -*- coding: utf-8 -*- +"""Package declaring AYON addon '{ADDON_NAME}' version.""" +__version__ = "{ADDON_VERSION}" +''' + +# Patterns of directories to be skipped for server part of addon +IGNORE_DIR_PATTERNS: List[Pattern] = [ + re.compile(pattern) + for pattern in { + # Skip directories starting with '.' + r"^\.", + # Skip any pycache folders + "^__pycache__$" + } +] + +# Patterns of files to be skipped for server part of addon +IGNORE_FILE_PATTERNS: List[Pattern] = [ + re.compile(pattern) + for pattern in { + # Skip files starting with '.' + # NOTE this could be an issue in some cases + r"^\.", + # Skip '.pyc' files + r"\.pyc$" + } +] + + +class ZipFileLongPaths(zipfile.ZipFile): + """Allows longer paths in zip files. + + Regular DOS paths are limited to MAX_PATH (260) characters, including + the string's terminating NUL character. + That limit can be exceeded by using an extended-length path that + starts with the '\\?\' prefix. + """ + _is_windows = platform.system().lower() == "windows" + + def _extract_member(self, member, tpath, pwd): + if self._is_windows: + tpath = os.path.abspath(tpath) + if tpath.startswith("\\\\"): + tpath = "\\\\?\\UNC\\" + tpath[2:] + else: + tpath = "\\\\?\\" + tpath + + return super()._extract_member(member, tpath, pwd) + + +def _get_yarn_executable() -> Union[str, None]: + cmd = "which" + if platform.system().lower() == "windows": + cmd = "where" + + for line in subprocess.check_output( + [cmd, "yarn"], encoding="utf-8" + ).splitlines(): + if not line or not os.path.exists(line): + continue + try: + subprocess.call([line, "--version"]) + return line + except OSError: + continue + return None + + +def safe_copy_file(src_path: str, dst_path: str): + """Copy file and make sure destination directory exists. + + Ignore if destination already contains directories from source. + + Args: + src_path (str): File path that will be copied. + dst_path (str): Path to destination file. + """ + + if src_path == dst_path: + return + + dst_dir: str = os.path.dirname(dst_path) + os.makedirs(dst_dir, exist_ok=True) + + shutil.copy2(src_path, dst_path) + + +def _value_match_regexes(value: str, regexes: Iterable[Pattern]) -> bool: + return any( + regex.search(value) + for regex in regexes + ) + + +def find_files_in_subdir( + src_path: str, + ignore_file_patterns: Optional[List[Pattern]] = None, + ignore_dir_patterns: Optional[List[Pattern]] = None +) -> List[Tuple[str, str]]: + """Find all files to copy in subdirectories of given path. + + All files that match any of the patterns in 'ignore_file_patterns' will + be skipped and any directories that match any of the patterns in + 'ignore_dir_patterns' will be skipped with all subfiles. + + Args: + src_path (str): Path to directory to search in. + ignore_file_patterns (Optional[list[Pattern]]): List of regexes + to match files to ignore. + ignore_dir_patterns (Optional[list[Pattern]]): List of regexes + to match directories to ignore. + + Returns: + list[tuple[str, str]]: List of tuples with path to file and parent + directories relative to 'src_path'. + """ + + if ignore_file_patterns is None: + ignore_file_patterns = IGNORE_FILE_PATTERNS + + if ignore_dir_patterns is None: + ignore_dir_patterns = IGNORE_DIR_PATTERNS + output: List[Tuple[str, str]] = [] + if not os.path.exists(src_path): + return output + + hierarchy_queue: collections.deque = collections.deque() + hierarchy_queue.append((src_path, [])) + while hierarchy_queue: + item: Tuple[str, str] = hierarchy_queue.popleft() + dirpath, parents = item + for name in os.listdir(dirpath): + path: str = os.path.join(dirpath, name) + if os.path.isfile(path): + if not _value_match_regexes(name, ignore_file_patterns): + items: List[str] = list(parents) + items.append(name) + output.append((path, os.path.sep.join(items))) + continue + + if not _value_match_regexes(name, ignore_dir_patterns): + items: List[str] = list(parents) + items.append(name) + hierarchy_queue.append((path, items)) + + return output + + +def update_client_version(logger): + """Update version in client code if version.py is present.""" + if not ADDON_CLIENT_DIR: + return + + version_path: str = os.path.join( + CLIENT_ROOT, ADDON_CLIENT_DIR, "version.py" + ) + if not os.path.exists(version_path): + logger.debug("Did not find version.py in client directory") + return + + logger.info("Updating client version") + with open(version_path, "w") as stream: + stream.write(VERSION_PY_CONTENT) + + +def build_frontend(): + yarn_executable = _get_yarn_executable() + if yarn_executable is None: + raise RuntimeError("Yarn executable was not found.") + + subprocess.run([yarn_executable, "install"], cwd=FRONTEND_ROOT) + subprocess.run([yarn_executable, "build"], cwd=FRONTEND_ROOT) + if not os.path.exists(FRONTEND_DIST_ROOT): + raise RuntimeError( + "Frontend build failed. Did not find 'dist' folder." + ) + + +def get_client_files_mapping() -> List[Tuple[str, str]]: + """Mapping of source client code files to destination paths. + + Example output: + [ + ( + "C:/addons/MyAddon/version.py", + "my_addon/version.py" + ), + ( + "C:/addons/MyAddon/client/my_addon/__init__.py", + "my_addon/__init__.py" + ) + ] + + Returns: + list[tuple[str, str]]: List of path mappings to copy. The destination + path is relative to expected output directory. + + """ + # Add client code content to zip + client_code_dir: str = os.path.join(CLIENT_ROOT, ADDON_CLIENT_DIR) + mapping = [ + (path, os.path.join(ADDON_CLIENT_DIR, sub_path)) + for path, sub_path in find_files_in_subdir(client_code_dir) + ] + + license_path = os.path.join(CURRENT_ROOT, "LICENSE") + if os.path.exists(license_path): + mapping.append((license_path, f"{ADDON_CLIENT_DIR}/LICENSE")) + return mapping + + +def get_client_zip_content(log) -> io.BytesIO: + log.info("Preparing client code zip") + files_mapping: List[Tuple[str, str]] = get_client_files_mapping() + stream = io.BytesIO() + with ZipFileLongPaths(stream, "w", zipfile.ZIP_DEFLATED) as zipf: + for src_path, subpath in files_mapping: + zipf.write(src_path, subpath) + stream.seek(0) + return stream + + +def get_base_files_mapping() -> List[FileMapping]: + filepaths_to_copy: List[FileMapping] = [ + ( + os.path.join(CURRENT_ROOT, "package.py"), + "package.py" + ) + ] + # Add license file to package if exists + license_path = os.path.join(CURRENT_ROOT, "LICENSE") + if os.path.exists(license_path): + filepaths_to_copy.append((license_path, "LICENSE")) + + # Go through server, private and public directories and find all files + for dirpath in (SERVER_ROOT, PRIVATE_ROOT, PUBLIC_ROOT): + if not os.path.exists(dirpath): + continue + + dirname = os.path.basename(dirpath) + for src_file, subpath in find_files_in_subdir(dirpath): + dst_subpath = os.path.join(dirname, subpath) + filepaths_to_copy.append((src_file, dst_subpath)) + + if os.path.exists(FRONTEND_DIST_ROOT): + for src_file, subpath in find_files_in_subdir(FRONTEND_DIST_ROOT): + dst_subpath = os.path.join(DST_DIST_DIR, subpath) + filepaths_to_copy.append((src_file, dst_subpath)) + + pyproject_toml = os.path.join(CLIENT_ROOT, "pyproject.toml") + if os.path.exists(pyproject_toml): + filepaths_to_copy.append( + (pyproject_toml, "private/pyproject.toml") + ) + + return filepaths_to_copy + + +def copy_client_code(output_dir: str, log: logging.Logger): + """Copies server side folders to 'addon_package_dir' + + Args: + output_dir (str): Output directory path. + log (logging.Logger) + + """ + log.info(f"Copying client for {ADDON_NAME}-{ADDON_VERSION}") + + full_output_path = os.path.join( + output_dir, f"{ADDON_NAME}_{ADDON_VERSION}" + ) + if os.path.exists(full_output_path): + shutil.rmtree(full_output_path) + os.makedirs(full_output_path, exist_ok=True) + + for src_path, dst_subpath in get_client_files_mapping(): + dst_path = os.path.join(full_output_path, dst_subpath) + safe_copy_file(src_path, dst_path) + + log.info("Client copy finished") + + +def copy_addon_package( + output_dir: str, + files_mapping: List[FileMapping], + log: logging.Logger +): + """Copy client code to output directory. + + Args: + output_dir (str): Directory path to output client code. + files_mapping (List[FileMapping]): List of tuples with source file + and destination subpath. + log (logging.Logger): Logger object. + + """ + log.info(f"Copying package for {ADDON_NAME}-{ADDON_VERSION}") + + # Add addon name and version to output directory + addon_output_dir: str = os.path.join( + output_dir, ADDON_NAME, ADDON_VERSION + ) + if os.path.isdir(addon_output_dir): + log.info(f"Purging {addon_output_dir}") + shutil.rmtree(addon_output_dir) + + os.makedirs(addon_output_dir, exist_ok=True) + + # Copy server content + for src_file, dst_subpath in files_mapping: + dst_path: str = os.path.join(addon_output_dir, dst_subpath) + dst_dir: str = os.path.dirname(dst_path) + os.makedirs(dst_dir, exist_ok=True) + if isinstance(src_file, io.BytesIO): + with open(dst_path, "wb") as stream: + stream.write(src_file.getvalue()) + else: + safe_copy_file(src_file, dst_path) + + log.info("Package copy finished") + + +def create_addon_package( + output_dir: str, + files_mapping: List[FileMapping], + log: logging.Logger +): + log.info(f"Creating package for {ADDON_NAME}-{ADDON_VERSION}") + + os.makedirs(output_dir, exist_ok=True) + output_path = os.path.join( + output_dir, f"{ADDON_NAME}-{ADDON_VERSION}.zip" + ) + + with ZipFileLongPaths(output_path, "w", zipfile.ZIP_DEFLATED) as zipf: + # Copy server content + for src_file, dst_subpath in files_mapping: + if isinstance(src_file, io.BytesIO): + zipf.writestr(dst_subpath, src_file.getvalue()) + else: + zipf.write(src_file, dst_subpath) + + log.info("Package created") + + +def main( + output_dir: Optional[str] = None, + skip_zip: Optional[bool] = False, + only_client: Optional[bool] = False +): + log: logging.Logger = logging.getLogger("create_package") + log.info("Package creation started") + + if not output_dir: + output_dir = os.path.join(CURRENT_ROOT, "package") + + has_client_code = bool(ADDON_CLIENT_DIR) + if has_client_code: + client_dir: str = os.path.join(CLIENT_ROOT, ADDON_CLIENT_DIR) + if not os.path.exists(client_dir): + raise RuntimeError( + f"Client directory was not found '{client_dir}'." + " Please check 'client_dir' in 'package.py'." + ) + update_client_version(log) + + if only_client: + if not has_client_code: + raise RuntimeError("Client code is not available. Skipping") + + copy_client_code(output_dir, log) + return + + log.info(f"Preparing package for {ADDON_NAME}-{ADDON_VERSION}") + + if os.path.exists(FRONTEND_ROOT): + build_frontend() + + files_mapping: List[FileMapping] = [] + files_mapping.extend(get_base_files_mapping()) + + if has_client_code: + files_mapping.append( + (get_client_zip_content(log), "private/client.zip") + ) + + # Skip server zipping + if skip_zip: + copy_addon_package(output_dir, files_mapping, log) + else: + create_addon_package(output_dir, files_mapping, log) + + log.info("Package creation finished") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--skip-zip", + dest="skip_zip", + action="store_true", + help=( + "Skip zipping server package and create only" + " server folder structure." + ) + ) + parser.add_argument( + "-o", "--output", + dest="output_dir", + default=None, + help=( + "Directory path where package will be created" + " (Will be purged if already exists!)" + ) + ) + parser.add_argument( + "--only-client", + dest="only_client", + action="store_true", + help=( + "Extract only client code. This is useful for development." + " Requires '-o', '--output' argument to be filled." + ) + ) + parser.add_argument( + "--debug", + dest="debug", + action="store_true", + help="Debug log messages." + ) + + args = parser.parse_args(sys.argv[1:]) + level = logging.INFO + if args.debug: + level = logging.DEBUG + logging.basicConfig(level=level) + main(args.output_dir, args.skip_zip, args.only_client) diff --git a/tests/resources/addon/package.py b/tests/resources/addon/package.py new file mode 100644 index 000000000..2143cebcd --- /dev/null +++ b/tests/resources/addon/package.py @@ -0,0 +1,9 @@ +name = "tests" +title = "Tests" +version = "1.0.0" + +client_dir = None +# ayon_launcher_version = ">=1.0.2" + +ayon_required_addons = {} +ayon_compatible_addons = {} diff --git a/tests/resources/addon/private/ayon-symbol.png b/tests/resources/addon/private/ayon-symbol.png new file mode 100644 index 000000000..30afee0e8 Binary files /dev/null and b/tests/resources/addon/private/ayon-symbol.png differ diff --git a/tests/resources/addon/server/__init__.py b/tests/resources/addon/server/__init__.py new file mode 100644 index 000000000..e330d93a2 --- /dev/null +++ b/tests/resources/addon/server/__init__.py @@ -0,0 +1,19 @@ +from ayon_server.addons import BaseServerAddon +from ayon_server.api.dependencies import CurrentUser + + +class TestsAddon(BaseServerAddon): + def initialize(self): + self.add_endpoint( + "test-get", + self.get_test, + method="GET", + ) + + async def get_test( + self, user: CurrentUser, + ): + """Return a random folder from the database""" + return { + "success": True, + } diff --git a/tests/resources/ayon-symbol.png b/tests/resources/ayon-symbol.png new file mode 100644 index 000000000..30afee0e8 Binary files /dev/null and b/tests/resources/ayon-symbol.png differ diff --git a/tests/test_server.py b/tests/test_server.py index ce5a98d07..64d1187f8 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -2,18 +2,44 @@ To run use: pytest --envfile {environment path}. Make sure you have set AYON_TOKEN in your environment. + """ +from datetime import datetime, timedelta, timezone import os import pytest +import time from ayon_api import ( - is_connection_created, close_connection, + create_folder, + create_project, + create_thumbnail, + delete, + delete_project, + dispatch_event, + download_addon_private_file, + enroll_event_job, get, + get_addons_info, + get_default_fields_for_type, + get_event, + get_events, + get_folder_thumbnail, + get_project, + get_project_names, + get_user_by_name, get_server_api_connection, get_base_url, get_rest_url, + get_timeout, + is_connection_created, + set_timeout, + trigger_server_restart, + update_event, + upload_addon_zip, + ServerAPI, + exceptions ) AYON_BASE_URL = os.getenv("AYON_SERVER_URL") @@ -21,25 +47,1172 @@ def test_close_connection(): - _con = get_server_api_connection() + """Tests the functionality of opening and closing the server API + connection. + + Verifies: + - Confirms that the connection is successfully created when + `get_server_api_connection()` is called. + - Ensures that the connection is closed correctly when + `close_connection()` is invoked, and that the connection state + is appropriately updated. + + """ + _ = get_server_api_connection() assert is_connection_created() is True close_connection() assert is_connection_created() is False def test_get_base_url(): + """Tests the retrieval of the base URL for the API. + + Verifies: + - Confirms that `get_base_url()` returns a string. + - Ensures that the returned URL matches the expected `AYON_BASE_URL`. + + """ res = get_base_url() assert isinstance(res, str) assert res == AYON_BASE_URL def test_get_rest_url(): + """Tests the retrieval of the REST API URL. + + Verifies: + - Confirms that `get_rest_url()` returns a string. + - Ensures that the returned URL matches the expected `AYON_REST_URL`. + + """ res = get_rest_url() assert isinstance(res, str) assert res == AYON_REST_URL def test_get(): + """Tests the `get` method for making API requests. + + Verifies: + - Ensures that a successful GET request to the endpoint 'info' + returns a status code of 200. + - Confirms that the response data is in the form of a dictionary. + + """ res = get("info") assert res.status_code == 200 assert isinstance(res.data, dict) + + +test_project_names = [ + (None), + ([]), + (["demo_Big_Episodic"]), + (["demo_Big_Feature"]), + (["demo_Commercial"]), + (["AY_Tests"]), + (["demo_Big_Episodic", "demo_Big_Feature", "demo_Commercial", "AY_Tests"]) +] + +test_topics = [ + (None), + ([]), + (["entity.folder.attrib_changed"]), + (["entity.task.created", "entity.project.created"]), + (["settings.changed", "entity.version.status_changed"]), + (["entity.task.status_changed", "entity.folder.deleted"]), + ([ + "entity.project.changed", + "entity.task.tags_changed", + "entity.product.created" + ]) +] + +test_users = [ + (None), + ([]), + (["admin"]), + (["mkolar", "tadeas.8964"]), + (["roy", "luke.inderwick", "ynbot"]), + ([ + "entity.folder.attrib_changed", + "entity.project.created", + "entity.task.created", + "settings.changed" + ]), +] + +# states is incorrect name for statuses +test_states = [ + (None), + ([]), + (["pending", "in_progress", "finished", "failed", "aborted", "restarted"]), + (["failed", "aborted"]), + (["pending", "in_progress"]), + (["finished", "failed", "restarted"]), + (["finished"]), +] + +test_include_logs = [ + (None), + (True), + (False), +] + +test_has_children = [ + (None), + (True), + (False), +] + +now = datetime.now(timezone.utc) + +test_newer_than = [ + (None), + ((now - timedelta(days=2)).isoformat()), + ((now - timedelta(days=5)).isoformat()), + ((now - timedelta(days=10)).isoformat()), + ((now - timedelta(days=20)).isoformat()), + ((now - timedelta(days=30)).isoformat()), +] + +test_older_than = [ + (None), + ((now - timedelta(days=0)).isoformat()), + ((now - timedelta(days=5)).isoformat()), + ((now - timedelta(days=10)).isoformat()), + ((now - timedelta(days=20)).isoformat()), + ((now - timedelta(days=30)).isoformat()), +] + +test_fields = [ + (None), + ([]), + ([]) +] + +@pytest.fixture(params=[3, 4, 5]) +def event_ids(request): + length = request.param + if length == 0: + return None + + recent_events = list(get_events( + newer_than=(datetime.now(timezone.utc) - timedelta(days=5)).isoformat() + )) + + return [recent_event["id"] for recent_event in recent_events[:length]] + + +# takes max 3 items in a list to reduce the number of combinations +@pytest.mark.parametrize("topics", test_topics[-3:]) +@pytest.mark.parametrize( + "event_ids", + [None] + [pytest.param(None, marks=pytest.mark.usefixtures("event_ids"))] +) +@pytest.mark.parametrize("project_names", test_project_names[-3:]) +@pytest.mark.parametrize("states", test_states[-3:]) +@pytest.mark.parametrize("users", test_users[-3:]) +@pytest.mark.parametrize("include_logs", test_include_logs[-3:]) +@pytest.mark.parametrize("has_children", test_has_children[2:3]) +@pytest.mark.parametrize("newer_than", test_newer_than[-3:]) +@pytest.mark.parametrize("older_than", test_older_than[-3:]) +@pytest.mark.parametrize("fields", test_fields[-3:]) +def test_get_events_all_filter_combinations( + topics, + event_ids, + project_names, + states, + users, + include_logs, + has_children, + newer_than, + older_than, + fields +): + """Tests all combinations of possible filters for `get_events`. + + Verifies: + - Calls `get_events` with the provided filter parameters. + - Ensures each event in the result set matches the specified filters. + - Checks that the number of returned events matches the expected count + based on the filters applied. + - Confirms that each event contains only the specified fields, with + no extra keys. + + Note: + - Adjusts the timeout setting if necessary to handle a large number + of tests and avoid timeout errors. + - Some combinations of filter parameters may lead to a server timeout + error. When this occurs, the test will skip instead of failing. + - Currently, a ServerError due to timeout may occur when `has_children` + is set to False. + + """ + if get_timeout() < 5: + set_timeout(None) # default timeout + + try: + res = list(get_events( + topics=topics, + event_ids=event_ids, + project_names=project_names, + states=states, + users=users, + include_logs=include_logs, + has_children=has_children, + newer_than=newer_than, + older_than=older_than, + fields=fields + )) + except exceptions.ServerError as exc: + assert has_children is False, ( + f"{exc} even if has_children is {has_children}." + ) + print("Warning: ServerError encountered, test skipped due to timeout.") + pytest.skip("Skipping test due to server timeout.") + + for item in res: + assert item.get("topic") in topics + assert item.get("project") in project_names + assert item.get("user") in users + assert item.get("status") in states + + assert (newer_than is None) or ( + datetime.fromisoformat(item.get("createdAt")) + > datetime.fromisoformat(newer_than) + ) + assert (older_than is None) or ( + datetime.fromisoformat(item.get("createdAt")) + < datetime.fromisoformat(older_than) + ) + + assert topics is None or len(res) == sum(len(list( + get_events( + topics=[topic], + project_names=project_names, + states=states, + users=users, + include_logs=include_logs, + has_children=has_children, + newer_than=newer_than, + older_than=older_than, + fields=fields + ) + )) for topic in topics) + + assert project_names is None or len(res) == sum(len(list( + get_events( + topics=topics, + project_names=[project_name], + states=states, + users=users, + include_logs=include_logs, + has_children=has_children, + newer_than=newer_than, + older_than=older_than, + fields=fields + ) + )) for project_name in project_names) + + assert states is None or len(res) == sum(len(list( + get_events( + topics=topics, + project_names=project_names, + states=[state], + users=users, + include_logs=include_logs, + has_children=has_children, + newer_than=newer_than, + older_than=older_than, + fields=fields + ) + )) for state in states) + + assert users is None or len(res) == sum(len(list( + get_events( + topics=topics, + project_names=project_names, + states=states, + users=[user], + include_logs=include_logs, + has_children=has_children, + newer_than=newer_than, + older_than=older_than, + fields=fields + ) + )) for user in users) + + if fields == []: + fields = get_default_fields_for_type("event") + + assert fields is None \ + or all( + set(event.keys()) == set(fields) + for event in res + ) + + +@pytest.mark.parametrize("has_children", test_has_children) +def test_get_events_timeout_has_children(has_children): + """Test `get_events` function with the `has_children` filter. + + Verifies: + - The `get_events` function handles requests correctly and does not + time out when using the `has_children` filter with events created + within the last 5 days. + - If a `ServerError` (likely due to a timeout) is raised: + - Logs a warning message and skips the test to avoid failure. + - Asserts that the `ServerError` should occur only when + `has_children` is set to False. + + """ + try: + _ = list(get_events( + has_children=has_children, + newer_than=( + datetime.now(timezone.utc) - timedelta(days=5) + ).isoformat() + )) + except exceptions.ServerError as exc: + assert has_children is False, ( + f"{exc} even if has_children is {has_children}." + ) + print("Warning: ServerError encountered, test skipped due to timeout.") + pytest.skip("Skipping test due to server timeout.") + + +def test_get_events_event_ids(event_ids): + """Test `get_events` function using specified event IDs. + + Verifies: + - Each item returned has an ID in the `event_ids` list. + - The number of items returned matches the expected count when filtered + by each individual event ID. + + """ + res = list(get_events(event_ids=event_ids)) + + for item in res: + assert item.get("id") in event_ids + + assert len(res) == sum(len(list( + get_events( + event_ids=[event_id] + ) + )) for event_id in event_ids) + + +@pytest.mark.parametrize("project_names", test_project_names) +def test_get_events_project_name(project_names): + """Test `get_events` function using specified project names. + + Verifies: + - Each item returned has a project in the `project_names` list. + - The count of items matches the expected number when filtered + by each individual project name. + + """ + res = list(get_events(project_names=project_names)) + + for item in res: + assert item.get("project") in project_names + + # test if the legths are equal + assert len(res) == sum(len(list( + get_events( + project_names=[project_name] + ) + )) for project_name in project_names) + + +@pytest.mark.parametrize("project_names", test_project_names) +@pytest.mark.parametrize("topics", test_topics) +def test_get_events_project_name_topic(project_names, topics): + """Test `get_events` function using both project names and topics. + + Verifies: + - Each item returned has a project in `project_names` and a topic + in `topics`. + - The item count matches the expected number when filtered by each + project name and topic combination. + + """ + res = list(get_events( + topics=topics, + project_names=project_names + )) + + for item in res: + assert item.get("topic") in topics + assert item.get("project") in project_names + + # test if the legths are equal + assert len(res) == sum(len(list( + get_events( + project_names=[project_name], + topics=topics + ) + )) for project_name in project_names) + + assert len(res) == sum(len(list( + get_events( + project_names=project_names, + topics=[topic] + ) + )) for topic in topics) + + +@pytest.mark.parametrize("project_names", test_project_names) +@pytest.mark.parametrize("topics", test_topics) +@pytest.mark.parametrize("users", test_users) +def test_get_events_project_name_topic_user(project_names, topics, users): + """Test `get_events` function using project names, topics, and users. + + Verifies: + - Each item has a project in `project_names`, a topic in `topics`, + and a user in `users`. + - The item count matches the expected number when filtered by + combinations of project names, topics, and users. + + """ + res = list(get_events( + topics=topics, + project_names=project_names, + users=users + )) + + for item in res: + assert item.get("topic") in topics + assert item.get("project") in project_names + assert item.get("user") in project_names + + # test if the legths are equal + assert len(res) == sum(len(list( + get_events( + project_names=[project_name], + topics=topics + ) + )) for project_name in project_names) + + assert len(res) == sum(len(list( + get_events( + project_names=project_names, + topics=[topic] + ) + )) for topic in topics) + + assert len(res) == sum(len(list( + get_events( + project_names=project_names, + topics=[topic] + ) + )) for topic in topics) + + +@pytest.mark.parametrize("newer_than", test_newer_than) +@pytest.mark.parametrize("older_than", test_older_than) +def test_get_events_timestamps(newer_than, older_than): + """Test `get_events` function using date filters `newer_than` and + `older_than`. + + Verifies: + - Each item's creation date falls within the specified date + range between `newer_than` and `older_than`. + + """ + res = list(get_events( + newer_than=newer_than, + older_than=older_than + )) + + for item in res: + assert (newer_than is None) or ( + datetime.fromisoformat(item.get("createdAt") + > datetime.fromisoformat(newer_than)) + ) + assert (older_than is None) or ( + datetime.fromisoformat(item.get("createdAt") + < datetime.fromisoformat(older_than)) + ) + + +test_invalid_topics = [ + (None), + (["invalid_topic_name_1", "invalid_topic_name_2"]), + (["invalid_topic_name_1"]), +] + +test_invalid_project_names = [ + (None), + (["invalid_project"]), + (["invalid_project", "demo_Big_Episodic", "demo_Big_Feature"]), + (["invalid_name_2", "demo_Commercial"]), + (["demo_Commercial"]), +] + +test_invalid_states = [ + (None), + (["pending_invalid"]), + (["in_progress_invalid"]), + (["finished_invalid", "failed_invalid"]), +] + +test_invalid_users = [ + (None), + (["ayon_invalid_user"]), + (["ayon_invalid_user1", "ayon_invalid_user2"]), + (["ayon_invalid_user1", "ayon_invalid_user2", "admin"]), +] + +test_invalid_newer_than = [ + (None), + ((datetime.now(timezone.utc) + timedelta(days=2)).isoformat()), + ((datetime.now(timezone.utc) + timedelta(days=5)).isoformat()), + ((datetime.now(timezone.utc) - timedelta(days=5)).isoformat()), +] + + +@pytest.mark.parametrize("topics", test_invalid_topics) +@pytest.mark.parametrize("project_names", test_invalid_project_names) +@pytest.mark.parametrize("states", test_invalid_states) +@pytest.mark.parametrize("users", test_invalid_users) +@pytest.mark.parametrize("newer_than", test_invalid_newer_than) +def test_get_events_invalid_data( + topics, + project_names, + states, + users, + newer_than +): + """Tests `get_events` with invalid filter data to ensure correct handling + of invalid input and prevent errors or unexpected results. + + Verifies: + - Confirms that the result is either empty or aligns with expected + valid entries: + - `topics`: Result is empty or topics is set to `None`. + - `project_names`: Result is empty or project names exist in the + list of valid project names. + - `states`: Result is empty or states is set to `None`. + - `users`: Result is empty or each user exists as a valid user. + - `newer_than`: Result is empty or `newer_than` date is in the + past. + + Note: + - Adjusts the timeout setting if necessary to handle a large number + of tests and avoid timeout errors. + + """ + if get_timeout() < 5: + set_timeout(None) # default timeout value + + res = list(get_events( + topics=topics, + project_names=project_names, + states=states, + users=users, + newer_than=newer_than + )) + + valid_project_names = get_project_names() + + assert res == [] \ + or topics is None + assert res == [] \ + or project_names is None \ + or any( + project_name in valid_project_names + for project_name in project_names + ) + assert res == [] \ + or states is None + assert res == [] \ + or users is None \ + or any(get_user_by_name(user) is not None for user in users) + assert res == [] \ + or newer_than is None \ + or datetime.fromisoformat(newer_than) < datetime.now(timezone.utc) + + +@pytest.fixture +def event_id(): + """Fixture that retrieves the ID of a recent event created within + the last 5 days. + + Returns: + - The event ID of the most recent event within the last 5 days + if available. + - `None` if no recent events are found within this time frame. + + """ + recent_events = list(get_events( + newer_than=(datetime.now(timezone.utc) - timedelta(days=5)).isoformat() + )) + return recent_events[0]["id"] if recent_events else None + +test_update_sender = [ + ("test.server.api"), +] + +test_update_username = [ + ("testing_user"), +] + +test_update_status = [ + ("pending"), + ("in_progress"), + ("finished"), + ("failed"), + ("aborted"), + ("restarted") +] + +test_update_description = [ + ("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce vivera."), + ("Updated description test...") +] + +test_update_retries = [ + (1), + (0), + (10), +] + +@pytest.mark.parametrize("sender", test_update_sender) +@pytest.mark.parametrize("username", test_update_username) +@pytest.mark.parametrize("status", test_update_status) +@pytest.mark.parametrize("description", test_update_description) +@pytest.mark.parametrize("retries", test_update_retries) +def test_update_event( + event_id, + sender, + username, + status, + description, + retries, + project_name=None, + summary=None, + payload=None, + progress=None, +): + """Verifies that the `update_event` function correctly updates event + fields. + + Verifies: + - The function updates the specified event fields based on the provided + parameters (`sender`, `username`, `status`, `description`, + `retries`, etc.). + - Only the fields specified in `kwargs` are updated, and other fields + remain unchanged. + - The `updatedAt` field is updated and the change occurs within + a reasonable time frame (within one minute). + - The event's state before and after the update matches the expected + values for the updated fields. + + Notes: + - Parameters like `event_id`, `sender`, `username`, `status`, + `description`, `retries`, etc., are passed dynamically to + the function. + - If any parameter is `None`, it is excluded from the update request. + + """ + kwargs = { + key: value + for key, value in ( + ("event_id", event_id), + ("sender", sender), + ("project", project_name), + ("username", username), + ("status", status), + ("description", description), + ("summary", summary), + ("payload", payload), + ("progress", progress), + ("retries", retries), + ) + if value is not None + } + + prev = get_event(event_id=event_id) + update_event(**kwargs) + res = get_event(event_id=event_id) + + for key, value in res.items(): + assert value == prev.get(key) \ + or key in kwargs.keys() and value == kwargs.get(key) \ + or ( + key == "updatedAt" and ( + (datetime.fromisoformat(value) - datetime.now(timezone.utc)) + < + timedelta(minutes=1) + ) + ) + + +test_update_invalid_status = [ + ("finisheddd"), + ("pending_pending"), + (42), + (False), + ("_in_progress") +] + +@pytest.mark.parametrize("status", test_update_invalid_status) +def test_update_event_invalid_status(status): + """Tests `update_event` with invalid status values to ensure correct + error handling for unsupported status inputs. + + Verifies: + - Confirms that an `HTTPRequestError` is raised for invalid status + values when attempting to update an event with an unsupported + status. + + """ + with pytest.raises(exceptions.HTTPRequestError): + update_event(event_id, status=status) + + +test_update_invalid_progress = [ + ("good"), + ("bad"), + (-1), + ([0, 1, 2]), + (101) +] + +@pytest.mark.parametrize("progress", test_update_invalid_progress) +def test_update_event_invalid_progress(event_id, progress): + """Tests `update_event` with invalid progress values to ensure correct + error handling for unsupported progress inputs. + + Verifies: + - Confirms that an `HTTPRequestError` is raised for invalid progress + values when attempting to update an event with unsupported + progress. + + """ + with pytest.raises(exceptions.HTTPRequestError): + update_event(event_id, progress=progress) + + +TEST_SOURCE_TOPIC = "test.source.topic" +TEST_TARGET_TOPIC = "test.target.topic" +DEFAULT_NUMBER_OF_EVENTS = 3 + +test_sequential = [ + (True), + (False), + (None) +] + +@pytest.fixture +def clean_up_events(topics=[TEST_SOURCE_TOPIC, TEST_TARGET_TOPIC]): + """Used before running marked testt to close any pending events that may + interfere with the test setup or outcomes by marking them as 'finished'. + + """ + events = list(get_events(topics=topics)) + for event in events: + if event["status"] not in ["finished", "failed"]: + update_event(event["id"], status="finished") + + +@pytest.fixture +def create_test_events(num_of_events=DEFAULT_NUMBER_OF_EVENTS): + """This fixture dispatches events to the `TEST_SOURCE_TOPIC` and returns + the list of event IDs for the created events. + + """ + return [ + dispatch_event( + topic=TEST_SOURCE_TOPIC, + sender="tester", + description=f"New test event n. {num}" + )["id"] + for num in range(num_of_events) + ] + + +# clean_up should be below create_test to ensure it is called first +# pytest probably does not guarantee the order of execution +@pytest.mark.usefixtures("create_test_events") +@pytest.mark.usefixtures("clean_up_events") +@pytest.mark.parametrize("sequential", test_sequential) +def test_enroll_event_job(sequential): + """Tests the `enroll_event_job` function for proper event job enrollment + based on sequential argument. + + Verifies: + - When `sequential` is set to `True`, only one job can be enrolled at + a time, preventing new enrollments until the first job is closed or + updated. + - When `sequential` is `False` or `None`, multiple jobs can be + enrolled concurrently without conflicts. + - The `update_event` function updates the `status` of a job to allowing + next sequential job processing. + + Notes: + - `update_event` is used to set `job_1`'s status to "failed" to test + re-enrollment behavior. + - TODO - delete events after test if possible + + """ + job_1 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender_1", + sequential=sequential + ) + + job_2 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender_2", + sequential=sequential + ) + + assert sequential is False \ + or sequential is None \ + or job_2 is None + + update_event(job_1["id"], status="finished") + + job_2 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender_2", + sequential=sequential + ) + + assert job_2 is not None \ + and job_1 != job_2 + + +@pytest.mark.usefixtures("clean_up_events") +@pytest.mark.parametrize("sequential", test_sequential) +def test_enroll_event_job_failed(sequential): + """Tests `enroll_event_job` behavior when the initial job fails and + sequential processing is enabled. + + Verifies: + - `enroll_event_job` creates a job (`job_1`) with specified parameters + `(`source_topic`, `target_topic`, `sender`, and `sequential`). + - After `job_1` fails (status set to "failed"), a new job (`job_2`) can + be enrolled with the same parameters. + - When `sequential` is `True`, the test verifies that `job_1` and + `job_2` are identical, as a failed sequential job should not allow + a new job to be enrolled separately. + - When `sequential` is `False`, `job_1` and `job_2` are allowed to + differ, as concurrent processing is permitted. + + Notes: + - `update_event` is used to set `job_1`'s status to "failed" to test + re-enrollment behavior. + - TODO - delete events after test if possible + + """ + job_1 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender_1", + sequential=sequential + ) + + update_event(job_1["id"], status="failed") + + job_2 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender_2", + sequential=sequential + ) + + assert sequential is not True or job_1 == job_2 + + +@pytest.mark.usefixtures("clean_up_events") +@pytest.mark.parametrize("sequential", test_sequential) +def test_enroll_event_job_same_sender(sequential): + """Tests `enroll_event_job` behavior when multiple jobs are enrolled + by the same sender. + + Verifies: + - `enroll_event_job` creates a `job_1` and `job_2` with the same + parameters (`source_topic`, `target_topic`, `sender`, and + `sequential`). + - The test checks that `job_1` and `job_2` are identical, ensuring that + no duplicate jobs are created for the same sender. + + Notes: + - TODO - delete events after test if possible + + """ + job_1 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender", + sequential=sequential + ) + + job_2 = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender", + sequential=sequential + ) + + assert job_1 == job_2 + + +test_invalid_topic = [ + ("invalid_source_topic"), + ("nonexisting_source_topic"), +] + +@pytest.mark.usefixtures("clean_up_events") +@pytest.mark.parametrize("topic", test_invalid_topics) +@pytest.mark.parametrize("sequential", test_sequential) +def test_enroll_event_job_invalid_topic(topic, sequential): + """Tests `enroll_event_job` behavior when provided with invalid topics. + + Verifies: + - `enroll_event_job` returns `None` when given invalid `source_topic` + or `target_topic`, indicating that the function properly rejects + invalid topic values. + - The function correctly handles both sequential and non-sequential + job processing modes when invalid topics are used. + + Notes: + - `clean_up_events()` is called at the beginning to close any pending + jobs that may interfere with the test setup or outcomes. + + """ + job = enroll_event_job( + source_topic=topic, + target_topic=TEST_TARGET_TOPIC, + sender="test_sender", + sequential=sequential + ) + + assert job is None + + +# clean_up should be below create_test to ensure it is called first +# pytest probably does not guarantee the order of execution +@pytest.mark.usefixtures("create_test_events") +@pytest.mark.usefixtures("clean_up_events") +def test_enroll_event_job_sequential_false(): + """Tests `enroll_event_job` behavior when `sequential` is set to `False`. + + Verifies: + - `enroll_event_job` creates a unique job for each sender even when + `sequential` is set to `False`, allowing concurrent job processing. + - Each job has a unique `dependsOn` identifier + + Notes: + - The `depends_on_ids` set is used to track `dependsOn` identifiers and + verify that each job has a unique dependency state, as required for + concurrent processing. + - TODO - delete events after test if possible + + """ + depends_on_ids = set() + + for sender in ["tester_1", "tester_2", "tester_3"]: + job = enroll_event_job( + source_topic=TEST_SOURCE_TOPIC, + target_topic=TEST_TARGET_TOPIC, + sender=sender, + sequential=False + ) + + assert job is not None \ + and job["dependsOn"] not in depends_on_ids + + depends_on_ids.add(job["dependsOn"]) + + +TEST_PROJECT_NAME = "test_API_project" +TEST_PROJECT_CODE = "apitest" +AYON_THUMBNAIL_PATH = "tests/resources/ayon-symbol.png" + + +def test_thumbnail_operations( + project_name=TEST_PROJECT_NAME, + project_code=TEST_PROJECT_CODE, + thumbnail_path=AYON_THUMBNAIL_PATH +): + """Tests thumbnail operations for a project. + + Verifies: + - A thumbnail is created for the project and associated with a folder. + - The thumbnail associated with the folder is correctly retrieved, with + attributes matching the project name and thumbnail ID. + - The content of the retrieved thumbnail matches the expected image + bytes read from the specified `thumbnail_path`. + + Notes: + - `delete_project` is called initially to remove any pre-existing + project with the same name, ensuring no conflicts during testing. + - At the end of the test, the project is deleted to clean up resources. + + """ + if get_project(project_name): + delete_project(TEST_PROJECT_NAME) + + project = create_project(project_name, project_code) + + thumbnail_id = create_thumbnail(project_name, thumbnail_path) + + folder_id = create_folder( + project_name, + "my_test_folder", + thumbnail_id=thumbnail_id + ) + thumbnail = get_folder_thumbnail(project_name, folder_id, thumbnail_id) + + assert thumbnail.project_name == project_name + assert thumbnail.thumbnail_id == thumbnail_id + + with open(thumbnail_path, "rb") as file: + image_bytes = file.read() + + assert image_bytes == thumbnail.content + + delete_project(project["name"]) + + +def test_addon_methods(): + """Tests addon methods, including upload and download of private file. + + Verifies: + - An addon with the specified name and version does not exist at the + start. + - Uploads an addon package `.zip` file and triggers a server restart. + - Ensures the server restart completes, and verifies the uploaded addon + is available in the list of addons after the restart. + - Downloads a private file associated with the addon, verifying its + existence and correct download location. + - Cleans up downloaded files and directories after the test to maintain + a clean state. + + Notes: + - `time.sleep()` is used to allow for a brief pause for the server + restart. + - The `finally` block removes downloaded files and the directory to + prevent residual test artifacts. + + """ + addon_name = "tests" + addon_version = "1.0.0" + download_path = "tests/resources/tmp_downloads" + private_file_path = os.path.join(download_path, "ayon-symbol.png") + + delete(f"/addons/{addon_name}/{addon_version}") + assert all( + addon_name != addon["name"] for addon in get_addons_info()["addons"] + ) + + try: + _ = upload_addon_zip("tests/resources/addon/package/tests-1.0.0.zip") + + trigger_server_restart() + + # need to wait at least 0.1 sec. to restart server + time.sleep(0.5) + while True: + try: + addons = get_addons_info()["addons"] + break + except exceptions.ServerError as exc: + assert "Connection timed out" in str(exc) + + assert any(addon_name == addon["name"] for addon in addons) + + downloaded_file = download_addon_private_file( + addon_name, + addon_version, + "ayon-symbol.png", + download_path + ) + + assert downloaded_file == private_file_path + assert os.path.isfile(private_file_path) + + finally: + if os.path.isfile(private_file_path): + os.remove(private_file_path) + + if os.path.isdir(download_path): + os.rmdir(download_path) + + +@pytest.fixture +def api_artist_user(): + """Fixture that sets up an API connection for a non-admin artist user. + + Workflow: + - Checks if the project exists; if not, it creates one with specified + `TEST_PROJECT_NAME` and `TEST_PROJECT_CODE`. + - Establishes a server API connection and retrieves the list + of available access groups. + - Configures a new user with limited permissions (`isAdmin` and + `isManager` set to `False`) and assigns all available access groups + as default and project-specific groups. + - Creates a new API connection using the artist user's credentials + (`username` and `password`) and logs in with it. + + Returns: + new_api: A `ServerAPI` instance authenticated with the artist user's + credentials, ready to use in tests. + + """ + project = get_project(TEST_PROJECT_NAME) + if project is None: + project = create_project(TEST_PROJECT_NAME, TEST_PROJECT_CODE) + + api = get_server_api_connection() + + username = "testUser" + password = "testUserPassword" + response = api.get("accessGroups/_") + access_groups = [ + item["name"] + for item in response.data + ] + api.put( + f"users/{username}", + password=password, + data={ + "isAdmin": False, + "isManager": False, + "defaultAccessGroups": access_groups, + "accessGroups": { + project["name"]: access_groups + }, + } + ) + new_api = ServerAPI(api.base_url) + new_api.login(username, password) + + return new_api + + +def test_server_restart_as_user(api_artist_user): + """Tests that a non-admin artist user is not permitted to trigger a server + restart. + + Verifies: + - An attempt to call `trigger_server_restart` as a non-admin artist + user raises an exception, ensuring that only users with the + appropriate permissions (e.g., admins) can perform server restart + operations. + + Notes: + - The exception is not specified as there is a todo to raise more + specific exception. + + """ + with pytest.raises(Exception): + api_artist_user.trigger_server_restart()