From 7b750ca635a0c01237589c592341843d94d02fed Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Wed, 31 Dec 2025 16:16:49 +0100 Subject: [PATCH 01/10] Convert test upload and get file --- tests/integration/test_object_store.py | 44 +++++++++++++++----------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/tests/integration/test_object_store.py b/tests/integration/test_object_store.py index a1b62b32..26ba2687 100644 --- a/tests/integration/test_object_store.py +++ b/tests/integration/test_object_store.py @@ -1,19 +1,25 @@ -# from infrahub_sdk import InfrahubClient -# from tests.helpers.test_app import TestInfrahubApp -# -# FILE_CONTENT_01 = """ -# any content -# another content -# """ -# -# -# class TestObjectStore(TestInfrahubApp): -# async def test_upload_and_get(self, client: InfrahubClient): -# response = await client.object_store.upload(content=FILE_CONTENT_01) -# -# assert sorted(response.keys()) == ["checksum", "identifier"] -# assert response["checksum"] == "aa19b96860ec59a73906dd8660bb3bad" -# assert response["identifier"] -# -# content = await client.object_store.get(identifier=response["identifier"]) -# assert content == FILE_CONTENT_01 +from __future__ import annotations + +from typing import TYPE_CHECKING + +from infrahub_sdk.testing.docker import TestInfrahubDockerClient + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient + +FILE_CONTENT_01 = """ + any content + another content + """ + + +class TestObjectStore(TestInfrahubDockerClient): + async def test_upload_and_get(self, client: InfrahubClient) -> None: + response = await client.object_store.upload(content=FILE_CONTENT_01) + + assert sorted(response.keys()) == ["checksum", "identifier"] + assert response["checksum"] == "aa19b96860ec59a73906dd8660bb3bad" + assert response["identifier"] + + content = await client.object_store.get(identifier=response["identifier"]) + assert content == FILE_CONTENT_01 From 47eab3284a4ec6a733a8abbd105ab3a11a282c09 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Wed, 31 Dec 2025 16:34:26 +0100 Subject: [PATCH 02/10] Enable tests that should be supported --- tests/integration/test_infrahub_client.py | 25 ++++++++++------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/integration/test_infrahub_client.py b/tests/integration/test_infrahub_client.py index db86d759..8fd96e5a 100644 --- a/tests/integration/test_infrahub_client.py +++ b/tests/integration/test_infrahub_client.py @@ -66,20 +66,17 @@ async def test_get_all(self, client: InfrahubClient, base_dataset) -> None: assert isinstance(nodes[0], InfrahubNode) assert [node.name.value for node in nodes] == ["Bella", "Luna"] - # TODO enable these tests for Infrahub version containing this commit - # https://github.com/opsmill/infrahub/commit/5a4d6860196b5bfb51fb8a124f33125f4a0b6753 - # when we support testing against multiple Infrahub versions. - # async def test_get_all_no_order(self, client: InfrahubClient, base_dataset): - # nodes = await client.all(kind=TESTING_CAT, order=Order(disable=True)) - # assert len(nodes) == 2 - # assert isinstance(nodes[0], InfrahubNode) - # assert {node.name.value for node in nodes} == {"Bella", "Luna"} - # - # async def test_get_filters_no_order(self, client: InfrahubClient, base_dataset): - # nodes = await client.filters(kind=TESTING_CAT, order=Order(disable=True)) - # assert len(nodes) == 2 - # assert isinstance(nodes[0], InfrahubNode) - # assert {node.name.value for node in nodes} == {"Bella", "Luna"} + async def test_get_all_no_order(self, client: InfrahubClient, base_dataset) -> None: + nodes = await client.all(kind=TESTING_CAT, order=Order(disable=True)) + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNode) + assert {node.name.value for node in nodes} == {"Bella", "Luna"} + + async def test_get_filters_no_order(self, client: InfrahubClient, base_dataset) -> None: + nodes = await client.filters(kind=TESTING_CAT, order=Order(disable=True)) + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNode) + assert {node.name.value for node in nodes} == {"Bella", "Luna"} async def test_get_one(self, client: InfrahubClient, base_dataset, cat_luna, person_sophia) -> None: node1 = await client.get(kind=TESTING_CAT, id=cat_luna.id) From ce4e94b9bfd62a8da1697178337ecc21c295f59e Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Wed, 31 Dec 2025 17:50:14 +0100 Subject: [PATCH 03/10] Convert test for schema and schema load --- tests/integration/conftest.py | 701 +++++++------------------------ tests/integration/test_schema.py | 84 ++-- 2 files changed, 177 insertions(+), 608 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a999d84e..97640cb1 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,562 +1,155 @@ -# import asyncio +from __future__ import annotations + import os +from typing import Any + +import pytest -# import httpx -# import pytest -# import ujson -# from fastapi.testclient import TestClient -# from infrahub import config -# from infrahub.components import ComponentType -# from infrahub.core.initialization import first_time_initialization, initialization -# from infrahub.core.node import Node -# from infrahub.core.utils import delete_all_nodes -# from infrahub.database import InfrahubDatabase, get_db -# from infrahub.lock import initialize_lock -# from infrahub.message_bus import InfrahubMessage -# from infrahub.message_bus.types import MessageTTL -# from infrahub.services.adapters.message_bus import InfrahubMessageBus -# from infrahub_sdk.schema import NodeSchema, SchemaRoot -# from infrahub_sdk.types import HTTPMethod from infrahub_sdk.utils import str_to_bool BUILD_NAME = os.environ.get("INFRAHUB_BUILD_NAME", "infrahub") TEST_IN_DOCKER = str_to_bool(os.environ.get("INFRAHUB_TEST_IN_DOCKER", "false")) -# @pytest.fixture(scope="session", autouse=True) -# def add_tracker(): -# os.environ["PYTEST_RUNNING"] = "true" - - -# class InfrahubTestClient(TestClient): -# def _request( -# self, url: str, method: HTTPMethod, headers: dict[str, Any], timeout: int, payload: Optional[dict] = None -# ) -> httpx.Response: -# content = None -# if payload: -# content = str(ujson.dumps(payload)).encode("UTF-8") -# with self as client: -# return client.request( -# method=method.value, -# url=url, -# headers=headers, -# timeout=timeout, -# content=content, -# ) - -# async def async_request( -# self, url: str, method: HTTPMethod, headers: dict[str, Any], timeout: int, payload: Optional[dict] = None -# ) -> httpx.Response: -# return self._request(url=url, method=method, headers=headers, timeout=timeout, payload=payload) - -# def sync_request( -# self, url: str, method: HTTPMethod, headers: dict[str, Any], timeout: int, payload: Optional[dict] = None -# ) -> httpx.Response: -# return self._request(url=url, method=method, headers=headers, timeout=timeout, payload=payload) - - -# @pytest.fixture(scope="session") -# def event_loop(): -# """Overrides pytest default function scoped event loop""" -# policy = asyncio.get_event_loop_policy() -# loop = policy.new_event_loop() -# yield loop -# loop.close() - - -# @pytest.fixture(scope="module", autouse=True) -# def execute_before_any_test(worker_id, tmpdir_factory): -# config.load_and_exit() - -# config.SETTINGS.storage.driver = config.StorageDriver.FileSystemStorage - -# if TEST_IN_DOCKER: -# try: -# db_id = int(worker_id[2]) + 1 -# except (ValueError, IndexError): -# db_id = 1 -# config.SETTINGS.cache.address = f"{BUILD_NAME}-cache-1" -# config.SETTINGS.database.address = f"{BUILD_NAME}-database-{db_id}" -# config.SETTINGS.storage.local = config.FileSystemStorageSettings(path="/opt/infrahub/storage") -# else: -# storage_dir = tmpdir_factory.mktemp("storage") -# config.SETTINGS.storage.local.path_ = str(storage_dir) - -# config.SETTINGS.broker.enable = False -# config.SETTINGS.cache.enable = True -# config.SETTINGS.miscellaneous.start_background_runner = False -# config.SETTINGS.security.secret_key = "4e26b3d9-b84f-42c9-a03f-fee3ada3b2fa" -# config.SETTINGS.main.internal_address = "http://mock" -# config.OVERRIDE.message_bus = BusRecorder() - -# initialize_lock() - - -# @pytest.fixture(scope="module") -# async def db() -> InfrahubDatabase: -# driver = InfrahubDatabase(driver=await get_db(retry=1)) - -# yield driver - -# await driver.close() - - -# @pytest.fixture(scope="module") -# async def init_db_base(db: InfrahubDatabase): -# await delete_all_nodes(db=db) -# await first_time_initialization(db=db) -# await initialization(db=db) - - -# @pytest.fixture(scope="module") -# async def builtin_org_schema() -> SchemaRoot: -# SCHEMA = { -# "version": "1.0", -# "nodes": [ -# { -# "name": "Organization", -# "namespace": "Test", -# "description": "An organization represent a legal entity, a company.", -# "include_in_menu": True, -# "label": "Organization", -# "icon": "mdi:domain", -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["label__value"], -# "branch": "aware", -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True}, -# {"name": "label", "kind": "Text", "optional": True}, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# "relationships": [ -# { -# "name": "tags", -# "peer": "BuiltinTag", -# "kind": "Attribute", -# "optional": True, -# "cardinality": "many", -# }, -# ], -# }, -# { -# "name": "Status", -# "namespace": "Builtin", -# "description": "Represent the status of an object: active, maintenance", -# "include_in_menu": True, -# "icon": "mdi:list-status", -# "label": "Status", -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["label__value"], -# "branch": "aware", -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True}, -# {"name": "label", "kind": "Text", "optional": True}, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# }, -# { -# "name": "Role", -# "namespace": "Builtin", -# "description": "Represent the role of an object", -# "include_in_menu": True, -# "icon": "mdi:ballot", -# "label": "Role", -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["label__value"], -# "branch": "aware", -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True}, -# {"name": "label", "kind": "Text", "optional": True}, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# }, -# { -# "name": "Location", -# "namespace": "Builtin", -# "description": "A location represent a physical element: a building, a site, a city", -# "include_in_menu": True, -# "icon": "mdi:map-marker-radius-outline", -# "label": "Location", -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["name__value"], -# "branch": "aware", -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True}, -# {"name": "description", "kind": "Text", "optional": True}, -# {"name": "type", "kind": "Text"}, -# ], -# "relationships": [ -# { -# "name": "tags", -# "peer": "BuiltinTag", -# "kind": "Attribute", -# "optional": True, -# "cardinality": "many", -# }, -# ], -# }, -# { -# "name": "Criticality", -# "namespace": "Builtin", -# "description": "Level of criticality expressed from 1 to 10.", -# "include_in_menu": True, -# "icon": "mdi:alert-octagon-outline", -# "label": "Criticality", -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["name__value"], -# "branch": "aware", -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True}, -# {"name": "level", "kind": "Number", "enum": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# }, -# ], -# } - -# return SCHEMA - - -# @pytest.fixture -# async def location_schema() -> NodeSchema: -# data = { -# "name": "Location", -# "namespace": "Builtin", -# "default_filter": "name__value", -# "attributes": [ -# {"name": "name", "kind": "String", "unique": True}, -# {"name": "description", "kind": "String", "optional": True}, -# {"name": "type", "kind": "String"}, -# ], -# "relationships": [ -# { -# "name": "tags", -# "peer": "BuiltinTag", -# "optional": True, -# "cardinality": "many", -# }, -# { -# "name": "primary_tag", -# "peer": "BultinTag", -# "optional": True, -# "cardinality": "one", -# }, -# ], -# } -# return NodeSchema(**data) - - -# @pytest.fixture -# async def location_cdg(db: InfrahubDatabase, tag_blue: Node, tag_red: Node) -> Node: -# obj = await Node.init(schema="BuiltinLocation", db=db) -# await obj.new(db=db, name="cdg01", type="SITE", tags=[tag_blue, tag_red]) -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def tag_blue(db: InfrahubDatabase) -> Node: -# obj = await Node.init(schema="BuiltinTag", db=db) -# await obj.new(db=db, name="Blue") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def tag_red(db: InfrahubDatabase) -> Node: -# obj = await Node.init(schema="BuiltinTag", db=db) -# await obj.new(db=db, name="Red") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def tag_green(db: InfrahubDatabase) -> Node: -# obj = await Node.init(schema="BuiltinTag", db=db) -# await obj.new(db=db, name="Green") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def first_account(db: InfrahubDatabase) -> Node: -# obj = await Node.init(db=db, schema="CoreAccount") -# await obj.new(db=db, name="First Account", account_type="Git", password="TestPassword123") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def second_account(db: InfrahubDatabase) -> Node: -# obj = await Node.init(db=db, schema="CoreAccount") -# await obj.new(db=db, name="Second Account", account_type="Git", password="TestPassword123") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def repo01(db: InfrahubDatabase) -> Node: -# obj = await Node.init(db=db, schema="CoreRepository") -# await obj.new(db=db, name="repo01", location="https://github.com/my/repo.git") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def repo99(db: InfrahubDatabase) -> Node: -# obj = await Node.init(db=db, schema="CoreRepository") -# await obj.new(db=db, name="repo99", location="https://github.com/my/repo99.git") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def gqlquery01(db: InfrahubDatabase) -> Node: -# obj = await Node.init(db=db, schema="CoreGraphQLQuery") -# await obj.new(db=db, name="query01", query="query { device { name { value }}}") -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def gqlquery02(db: InfrahubDatabase, repo01: Node, tag_blue: Node, tag_red: Node) -> Node: -# obj = await Node.init(db=db, schema="CoreGraphQLQuery") -# await obj.new( -# db=db, -# name="query02", -# query="query { CoreRepository { edges { node { name { value }}}}}", -# repository=repo01, -# tags=[tag_blue, tag_red], -# ) -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def gqlquery03(db: InfrahubDatabase, repo01: Node, tag_blue: Node, tag_red: Node) -> Node: -# obj = await Node.init(db=db, schema="CoreGraphQLQuery") -# await obj.new( -# db=db, -# name="query03", -# query="query { CoreRepository { edges { node { name { value }}}}}", -# repository=repo01, -# tags=[tag_blue, tag_red], -# ) -# await obj.save(db=db) -# return obj - - -# @pytest.fixture -# async def schema_extension_01() -> dict[str, Any]: -# return { -# "version": "1.0", -# "nodes": [ -# { -# "name": "Rack", -# "namespace": "Infra", -# "description": "A Rack represents a physical two- or four-post equipment rack in which devices can be installed.", -# "label": "Rack", -# "default_filter": "name__value", -# "display_labels": ["name__value"], -# "attributes": [ -# {"name": "name", "kind": "Text"}, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# "relationships": [ -# { -# "name": "tags", -# "peer": "BuiltinTag", -# "optional": True, -# "cardinality": "many", -# "kind": "Attribute", -# }, -# ], -# } -# ], -# "extensions": { -# "nodes": [ -# { -# "kind": "BuiltinTag", -# "relationships": [ -# { -# "name": "racks", -# "peer": "InfraRack", -# "optional": True, -# "cardinality": "many", -# "kind": "Generic", -# } -# ], -# } -# ] -# }, -# } - - -# @pytest.fixture -# async def schema_extension_02() -> dict[str, Any]: -# return { -# "version": "1.0", -# "nodes": [ -# { -# "name": "Contract", -# "namespace": "Procurement", -# "description": "Generic Contract", -# "label": "Contract", -# "display_labels": ["contract_ref__value"], -# "order_by": ["contract_ref__value"], -# "attributes": [ -# { -# "name": "contract_ref", -# "label": "Contract Reference", -# "kind": "Text", -# "unique": True, -# }, -# {"name": "description", "kind": "Text", "optional": True}, -# ], -# "relationships": [ -# { -# "name": "tags", -# "peer": "BuiltinTag", -# "optional": True, -# "cardinality": "many", -# "kind": "Attribute", -# }, -# ], -# } -# ], -# "extensions": { -# "nodes": [ -# { -# "kind": "BuiltinTag", -# "relationships": [ -# { -# "name": "contracts", -# "peer": "ProcurementContract", -# "optional": True, -# "cardinality": "many", -# "kind": "Generic", -# } -# ], -# } -# ] -# }, -# } - - -# @pytest.fixture(scope="module") -# async def ipam_schema() -> SchemaRoot: -# SCHEMA = { -# "version": "1.0", -# "nodes": [ -# { -# "name": "IPPrefix", -# "namespace": "Ipam", -# "include_in_menu": False, -# "inherit_from": ["BuiltinIPPrefix"], -# "description": "IPv4 or IPv6 network", -# "icon": "mdi:ip-network", -# "label": "IP Prefix", -# }, -# { -# "name": "IPAddress", -# "namespace": "Ipam", -# "include_in_menu": False, -# "inherit_from": ["BuiltinIPAddress"], -# "description": "IP Address", -# "icon": "mdi:ip-outline", -# "label": "IP Address", -# }, -# { -# "name": "Device", -# "namespace": "Infra", -# "label": "Device", -# "human_friendly_id": ["name__value"], -# "order_by": ["name__value"], -# "display_labels": ["name__value"], -# "attributes": [{"name": "name", "kind": "Text", "unique": True}], -# "relationships": [ -# { -# "name": "primary_address", -# "peer": "IpamIPAddress", -# "label": "Primary IP Address", -# "optional": True, -# "cardinality": "one", -# "kind": "Attribute", -# } -# ], -# }, -# ], -# } - -# return SCHEMA - - -# @pytest.fixture(scope="module") -# async def hierarchical_schema() -> dict: -# schema = { -# "version": "1.0", -# "generics": [ -# { -# "name": "Generic", -# "namespace": "Location", -# "description": "Generic hierarchical location", -# "label": "Location", -# "hierarchical": True, -# "human_friendly_id": ["name__value"], -# "include_in_menu": True, -# "attributes": [ -# {"name": "name", "kind": "Text", "unique": True, "order_weight": 900}, -# ], -# } -# ], -# "nodes": [ -# { -# "name": "Country", -# "namespace": "Location", -# "description": "A country within a continent.", -# "inherit_from": ["LocationGeneric"], -# "generate_profile": False, -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["name__value"], -# "children": "LocationSite", -# "attributes": [{"name": "shortname", "kind": "Text"}], -# }, -# { -# "name": "Site", -# "namespace": "Location", -# "description": "A site within a country.", -# "inherit_from": ["LocationGeneric"], -# "default_filter": "name__value", -# "order_by": ["name__value"], -# "display_labels": ["name__value"], -# "children": "", -# "parent": "LocationCountry", -# "attributes": [{"name": "shortname", "kind": "Text"}], -# }, -# ], -# } -# return schema - - -# class BusRecorder(InfrahubMessageBus): -# def __init__(self, component_type: Optional[ComponentType] = None): -# self.messages: list[InfrahubMessage] = [] -# self.messages_per_routing_key: dict[str, list[InfrahubMessage]] = {} - -# async def publish( -# self, message: InfrahubMessage, routing_key: str, delay: Optional[MessageTTL] = None, is_retry: bool = False -# ) -> None: -# self.messages.append(message) -# if routing_key not in self.messages_per_routing_key: -# self.messages_per_routing_key[routing_key] = [] -# self.messages_per_routing_key[routing_key].append(message) - -# @property -# def seen_routing_keys(self) -> list[str]: -# return list(self.messages_per_routing_key.keys()) +@pytest.fixture(scope="class") +def schema_extension_01() -> dict[str, Any]: + return { + "version": "1.0", + "nodes": [ + { + "name": "Rack", + "namespace": "Infra", + "description": "A Rack represents a physical two- or four-post equipment rack.", + "label": "Rack", + "default_filter": "name__value", + "display_labels": ["name__value"], + "attributes": [ + {"name": "name", "kind": "Text"}, + {"name": "description", "kind": "Text", "optional": True}, + ], + "relationships": [ + { + "name": "tags", + "peer": "BuiltinTag", + "optional": True, + "cardinality": "many", + "kind": "Attribute", + }, + ], + } + ], + "extensions": { + "nodes": [ + { + "kind": "BuiltinTag", + "relationships": [ + { + "name": "racks", + "peer": "InfraRack", + "optional": True, + "cardinality": "many", + "kind": "Generic", + } + ], + } + ] + }, + } + + +@pytest.fixture(scope="class") +def schema_extension_02() -> dict[str, Any]: + return { + "version": "1.0", + "nodes": [ + { + "name": "Contract", + "namespace": "Procurement", + "description": "Generic Contract", + "label": "Contract", + "display_labels": ["contract_ref__value"], + "order_by": ["contract_ref__value"], + "attributes": [ + { + "name": "contract_ref", + "label": "Contract Reference", + "kind": "Text", + "unique": True, + }, + {"name": "description", "kind": "Text", "optional": True}, + ], + "relationships": [ + { + "name": "tags", + "peer": "BuiltinTag", + "optional": True, + "cardinality": "many", + "kind": "Attribute", + }, + ], + } + ], + "extensions": { + "nodes": [ + { + "kind": "BuiltinTag", + "relationships": [ + { + "name": "contracts", + "peer": "ProcurementContract", + "optional": True, + "cardinality": "many", + "kind": "Generic", + } + ], + } + ] + }, + } + + +@pytest.fixture(scope="class") +def hierarchical_schema() -> dict[str, Any]: + return { + "version": "1.0", + "generics": [ + { + "name": "Generic", + "namespace": "Location", + "description": "Generic hierarchical location", + "label": "Location", + "hierarchical": True, + "human_friendly_id": ["name__value"], + "include_in_menu": True, + "attributes": [ + {"name": "name", "kind": "Text", "unique": True, "order_weight": 900}, + ], + } + ], + "nodes": [ + { + "name": "Country", + "namespace": "Location", + "description": "A country within a continent.", + "inherit_from": ["LocationGeneric"], + "generate_profile": False, + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "children": "LocationSite", + "attributes": [{"name": "shortname", "kind": "Text"}], + }, + { + "name": "Site", + "namespace": "Location", + "description": "A site within a country.", + "inherit_from": ["LocationGeneric"], + "default_filter": "name__value", + "order_by": ["name__value"], + "display_labels": ["name__value"], + "children": "", + "parent": "LocationCountry", + "attributes": [{"name": "shortname", "kind": "Text"}], + }, + ], + } diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 1d081ae2..314a3ffa 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -1,19 +1,13 @@ +from typing import Any + import pytest from infrahub_sdk import InfrahubClient from infrahub_sdk.exceptions import BranchNotFoundError +from infrahub_sdk.schema import NodeSchemaAPI from infrahub_sdk.testing.docker import TestInfrahubDockerClient -# from infrahub.core.schema import core_models -# from infrahub.server import app -# -# from infrahub_sdk.schema import NodeSchemaAPI -# -# from .conftest import InfrahubTestClient -# -# -# class TestInfrahubSchema(TestInfrahubDockerClient): async def test_query_schema_for_branch_not_found(self, client: InfrahubClient) -> None: with pytest.raises(BranchNotFoundError) as exc: @@ -21,49 +15,31 @@ async def test_query_schema_for_branch_not_found(self, client: InfrahubClient) - assert str(exc.value) == "The requested branch was not found on the server [I-do-not-exist]" + async def test_schema_all(self, client: InfrahubClient) -> None: + schema_nodes = await client.schema.all() + + assert [node for node in schema_nodes.values() if node.namespace == "Profile"] + + assert "BuiltinTag" in schema_nodes + assert isinstance(schema_nodes["BuiltinTag"], NodeSchemaAPI) + + async def test_schema_get(self, client: InfrahubClient) -> None: + schema_node = await client.schema.get(kind="BuiltinTag") + + assert isinstance(schema_node, NodeSchemaAPI) + assert client.default_branch in client.schema.cache + + +class TestInfrahubSchemaLoad(TestInfrahubDockerClient): + async def test_schema_load_many( + self, client: InfrahubClient, schema_extension_01: dict[str, Any], schema_extension_02: dict[str, Any] + ) -> None: + response = await client.schema.load( + schemas=[schema_extension_01, schema_extension_02], wait_until_converged=True + ) + + assert response.schema_updated -# class TestInfrahubSchema: -# @pytest.fixture(scope="class") -# async def client(self): -# return InfrahubTestClient(app) -# -# async def test_schema_all(self, client, init_db_base): -# config = Config(requester=client.async_request) -# ifc = InfrahubClient(config=config) -# schema_nodes = await ifc.schema.all() -# -# nodes = [node for node in core_models["nodes"] if node["namespace"] != "Internal"] -# generics = [node for node in core_models["generics"] if node["namespace"] != "Internal"] -# -# profiles = [node for node in schema_nodes.values() if node.namespace == "Profile"] -# assert profiles -# -# assert len(schema_nodes) == len(nodes) + len(generics) + len(profiles) -# assert "BuiltinTag" in schema_nodes -# assert isinstance(schema_nodes["BuiltinTag"], NodeSchemaAPI) -# -# async def test_schema_get(self, client, init_db_base): -# config = Config(username="admin", password="infrahub", requester=client.async_request) -# ifc = InfrahubClient(config=config) -# schema_node = await ifc.schema.get(kind="BuiltinTag") -# -# assert isinstance(schema_node, NodeSchemaAPI) -# assert ifc.default_branch in ifc.schema.cache -# nodes = [node for node in core_models["nodes"] if node["namespace"] != "Internal"] -# generics = [node for node in core_models["generics"] if node["namespace"] != "Internal"] -# -# schema_without_profiles = [ -# node for node in ifc.schema.cache[ifc.default_branch].values() if node.namespace != "Profile" -# ] -# assert len(schema_without_profiles) == len(nodes) + len(generics) -# -# async def test_schema_load_many(self, client, init_db_base, schema_extension_01, schema_extension_02): -# config = Config(username="admin", password="infrahub", requester=client.async_request) -# ifc = InfrahubClient(config=config) -# response = await ifc.schema.load(schemas=[schema_extension_01, schema_extension_02]) -# -# assert response.schema_updated -# -# schema_nodes = await ifc.schema.all(refresh=True) -# assert "InfraRack" in schema_nodes.keys() -# assert "ProcurementContract" in schema_nodes.keys() + schema_nodes = await client.schema.all(refresh=True) + assert "InfraRack" in schema_nodes + assert "ProcurementContract" in schema_nodes From 6d15c6989dd9cdaee17aa5ffe9ca6490c94b17ca Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 10:28:15 +0100 Subject: [PATCH 04/10] Convert node integration tests --- tests/integration/conftest.py | 46 ++++ tests/integration/test_node.py | 473 +++++++++++++++++---------------- 2 files changed, 290 insertions(+), 229 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 97640cb1..0d13ac8d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -153,3 +153,49 @@ def hierarchical_schema() -> dict[str, Any]: }, ], } + + +@pytest.fixture(scope="class") +def ipam_schema() -> dict[str, Any]: + return { + "version": "1.0", + "nodes": [ + { + "name": "IPPrefix", + "namespace": "Ipam", + "include_in_menu": False, + "inherit_from": ["BuiltinIPPrefix"], + "description": "IPv4 or IPv6 network", + "icon": "mdi:ip-network", + "label": "IP Prefix", + }, + { + "name": "IPAddress", + "namespace": "Ipam", + "include_in_menu": False, + "inherit_from": ["BuiltinIPAddress"], + "description": "IP Address", + "icon": "mdi:ip-outline", + "label": "IP Address", + }, + { + "name": "Device", + "namespace": "Infra", + "label": "Device", + "human_friendly_id": ["name__value"], + "order_by": ["name__value"], + "display_labels": ["name__value"], + "attributes": [{"name": "name", "kind": "Text", "unique": True}], + "relationships": [ + { + "name": "primary_address", + "peer": "IpamIPAddress", + "label": "Primary IP Address", + "optional": True, + "cardinality": "one", + "kind": "Attribute", + } + ], + }, + ], + } diff --git a/tests/integration/test_node.py b/tests/integration/test_node.py index 652e7203..ded99ac1 100644 --- a/tests/integration/test_node.py +++ b/tests/integration/test_node.py @@ -1,12 +1,20 @@ +from __future__ import annotations + +import ipaddress +from typing import TYPE_CHECKING, Any + import pytest -from infrahub_sdk import InfrahubClient -from infrahub_sdk.exceptions import NodeNotFoundError +from infrahub_sdk.exceptions import NodeNotFoundError, UninitializedError from infrahub_sdk.node import InfrahubNode +from infrahub_sdk.protocols import IpamNamespace from infrahub_sdk.schema import NodeSchema, NodeSchemaAPI, SchemaRoot from infrahub_sdk.testing.docker import TestInfrahubDockerClient from infrahub_sdk.testing.schemas.car_person import TESTING_CAR, TESTING_MANUFACTURER, SchemaCarPerson +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient + class TestInfrahubNode(TestInfrahubDockerClient, SchemaCarPerson): @pytest.fixture(scope="class") @@ -30,11 +38,7 @@ async def test_node_create( await node.save() assert node.id is not None - async def test_node_delete( - self, - client: InfrahubClient, - initial_schema: None, - ) -> None: + async def test_node_delete(self, client: InfrahubClient, initial_schema: None) -> None: obj = await client.create(kind=TESTING_MANUFACTURER, name="Dacia") await obj.save() @@ -46,12 +50,7 @@ async def test_node_delete( await client.get(kind=TESTING_MANUFACTURER, id=obj.id) async def test_node_create_with_relationships( - self, - default_branch: str, - client: InfrahubClient, - initial_schema: None, - manufacturer_mercedes, - person_joe, + self, default_branch: str, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe ) -> None: node = await client.create( kind=TESTING_CAR, name="Tiguan", color="Black", manufacturer=manufacturer_mercedes.id, owner=person_joe.id @@ -120,10 +119,7 @@ async def test_node_filters_include( assert node_after.owner.peer.id == person_joe.id, f"{person_joe.id=}" async def test_node_update_with_original_data( - self, - default_branch: str, - client: InfrahubClient, - initial_schema: None, + self, default_branch: str, client: InfrahubClient, initial_schema: None ) -> None: person_marina = await client.create(kind="TestingPerson", name="marina", age=20) await person_marina.save() @@ -138,72 +134,57 @@ async def test_node_update_with_original_data( node = await client.get(kind="TestingPerson", id=person_marina.id) assert node.age.value == 20, node.age.value - # async def test_node_update_payload_with_relationships( - # self, - # db: InfrahubDatabase, - # client: InfrahubClient, - # init_db_base, - # load_builtin_schema, - # tag_blue: Node, - # tag_red: Node, - # repo01: Node, - # gqlquery01: Node, - # ): - # data = { - # "name": "rfile10", - # "template_path": "mytemplate.j2", - # "query": gqlquery01.id, - # "repository": repo01.id, - # "tags": [tag_blue.id, tag_red.id], - # } - # schema = await client.schema.get(kind="CoreTransformJinja2", branch="main") - # create_payload = client.schema.generate_payload_create( - # schema=schema, data=data, source=repo01.id, is_protected=True - # ) - # obj = await client.create(kind="CoreTransformJinja2", branch="main", **create_payload) - # await obj.save() - - # assert obj.id is not None - # nodedb = await client.get(kind="CoreTransformJinja2", id=str(obj.id)) - - # input_data = nodedb._generate_input_data()["data"]["data"] - # assert input_data["name"]["value"] == "rfile10" - # # Validate that the source isn't a dictionary bit a reference to the repo - # assert input_data["name"]["source"] == repo01.id - - # async def test_node_create_with_properties( - # self, - # db: InfrahubDatabase, - # client: InfrahubClient, - # init_db_base, - # load_builtin_schema, - # tag_blue: Node, - # tag_red: Node, - # repo01: Node, - # gqlquery01: Node, - # first_account: Node, - # ): - # data = { - # "name": { - # "value": "rfile02", - # "is_protected": True, - # "source": first_account.id, - # "owner": first_account.id, - # }, - # "template_path": {"value": "mytemplate.j2"}, - # "query": {"id": gqlquery01.id}, # "source": first_account.id, "owner": first_account.id}, - # "repository": {"id": repo01.id}, # "source": first_account.id, "owner": first_account.id}, - # "tags": [tag_blue.id, tag_red.id], - # } - - # node = await client.create(kind="CoreTransformJinja2", data=data) - # await node.save() - - # assert node.id is not None - - # nodedb = await NodeManager.get_one(id=node.id, db=db, include_owner=True, include_source=True) - # assert nodedb.name.value == node.name.value - # assert nodedb.name.is_protected is True + async def test_node_generate_input_data_with_relationships( + self, + client: InfrahubClient, + initial_schema: None, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + tag_blue: InfrahubNode, + tag_red: InfrahubNode, + ) -> None: + car = await client.create( + kind=TESTING_CAR, + name="InputDataCar", + color="Silver", + manufacturer=manufacturer_mercedes.id, + owner=person_joe.id, + tags=[tag_blue.id, tag_red.id], + ) + await car.save() + assert car.id is not None + + input_data = car._generate_input_data()["data"]["data"] + + assert input_data["name"]["value"] == "InputDataCar" + assert input_data["color"]["value"] == "Silver" + assert "manufacturer" in input_data + assert input_data["manufacturer"]["id"] == manufacturer_mercedes.id + + async def test_node_create_with_properties( + self, + client: InfrahubClient, + initial_schema: None, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + ) -> None: + data = { + "name": {"value": "ProtectedCar", "is_protected": True}, + "color": {"value": "Gold"}, + "manufacturer": {"id": manufacturer_mercedes.id}, + "owner": {"id": person_joe.id}, + } + + node = await client.create(kind=TESTING_CAR, data=data) + await node.save() + + assert node.id is not None + assert node.name.value == "ProtectedCar" + assert node.name.is_protected + + node_fetched = await client.get(kind=TESTING_CAR, id=node.id, property=True) + assert node_fetched.name.value == "ProtectedCar" + assert node_fetched.name.is_protected async def test_node_update( self, @@ -238,149 +219,183 @@ async def test_node_update( await car3.tags.fetch() assert sorted([tag.id for tag in car3.tags.peers]) == sorted([tag_green.id, tag_blue.id]) - # async def test_node_update_3_idempotency( - # self, - # db: InfrahubDatabase, - # client: InfrahubClient, - # init_db_base, - # load_builtin_schema, - # tag_green: Node, - # tag_red: Node, - # tag_blue: Node, - # gqlquery03: Node, - # repo99: Node, - # ): - # node = await client.get(kind="CoreGraphQLQuery", name__value="query03") - # assert node.id is not None - - # updated_query = f"\n\n{node.query.value}" - # node.name.value = "query031" - # node.query.value = updated_query - # first_update = node._generate_input_data(exclude_unmodified=True) - # await node.save() - # nodedb = await NodeManager.get_one(id=node.id, db=db, include_owner=True, include_source=True) - - # node = await client.get(kind="CoreGraphQLQuery", name__value="query031") - - # node.name.value = "query031" - # node.query.value = updated_query - - # second_update = node._generate_input_data(exclude_unmodified=True) - - # assert nodedb.query.value == updated_query - # assert "query" in first_update["data"]["data"] - # assert "value" in first_update["data"]["data"]["query"] - # assert first_update["variables"] - # assert "query" not in second_update["data"]["data"] - # assert not second_update["variables"] - - # async def test_relationship_manager_errors_without_fetch(self, client: InfrahubClient, load_builtin_schema): - # organization = await client.create("TestOrganization", name="organization-1") - # await organization.save() - # tag = await client.create("BuiltinTag", name="blurple") - # await tag.save() - - # with pytest.raises(UninitializedError, match=r"Must call fetch"): - # organization.tags.add(tag) - - # await organization.tags.fetch() - # organization.tags.add(tag) - # await organization.save() - - # organization = await client.get("TestOrganization", name__value="organization-1") - # assert [t.id for t in organization.tags.peers] == [tag.id] - - # async def test_relationships_not_overwritten( - # self, client: InfrahubClient, load_builtin_schema, schema_extension_01 - # ): - # await client.schema.load(schemas=[schema_extension_01]) - # rack = await client.create("InfraRack", name="rack-1") - # await rack.save() - # tag = await client.create("BuiltinTag", name="blizzow") - # # TODO: is it a bug that we need to save the object and fetch the tags before adding to a RelationshipManager now? - # await tag.save() - # await tag.racks.fetch() - # tag.racks.add(rack) - # await tag.save() - # tag_2 = await client.create("BuiltinTag", name="blizzow2") - # await tag_2.save() - - # # the "rack" object has no link to the "tag" object here - # # rack.tags.peers is empty - # rack.name.value = "New Rack Name" - # await rack.save() - - # # assert that the above rack.save() did not overwrite the existing Rack-Tag relationship - # refreshed_rack = await client.get("InfraRack", id=rack.id) - # await refreshed_rack.tags.fetch() - # assert [t.id for t in refreshed_rack.tags.peers] == [tag.id] - - # # check that we can purposefully remove a tag - # refreshed_rack.tags.remove(tag.id) - # await refreshed_rack.save() - # rack_without_tag = await client.get("InfraRack", id=rack.id) - # await rack_without_tag.tags.fetch() - # assert rack_without_tag.tags.peers == [] - - # # check that we can purposefully add a tag - # rack_without_tag.tags.add(tag_2) - # await rack_without_tag.save() - # refreshed_rack_with_tag = await client.get("InfraRack", id=rack.id) - # await refreshed_rack_with_tag.tags.fetch() - # assert [t.id for t in refreshed_rack_with_tag.tags.peers] == [tag_2.id] - - # async def test_node_create_from_pool( - # self, db: InfrahubDatabase, client: InfrahubClient, init_db_base, default_ipam_namespace, load_ipam_schema - # ): - # ip_prefix = await client.create(kind="IpamIPPrefix", prefix="192.0.2.0/24") - # await ip_prefix.save() - - # ip_pool = await client.create( - # kind="CoreIPAddressPool", - # name="Core loopbacks 1", - # default_address_type="IpamIPAddress", - # default_prefix_length=32, - # ip_namespace=default_ipam_namespace, - # resources=[ip_prefix], - # ) - # await ip_pool.save() - - # devices = [] - # for i in range(1, 5): - # d = await client.create(kind="InfraDevice", name=f"core0{i}", primary_address=ip_pool) - # await d.save() - # devices.append(d) - - # assert [str(device.primary_address.peer.address.value) for device in devices] == [ - # "192.0.2.1/32", - # "192.0.2.2/32", - # "192.0.2.3/32", - # "192.0.2.4/32", - # ] - - # async def test_node_update_from_pool( - # self, db: InfrahubDatabase, client: InfrahubClient, init_db_base, default_ipam_namespace, load_ipam_schema - # ): - # starter_ip_address = await client.create(kind="IpamIPAddress", address="10.0.0.1/32") - # await starter_ip_address.save() - - # ip_prefix = await client.create(kind="IpamIPPrefix", prefix="192.168.0.0/24") - # await ip_prefix.save() - - # ip_pool = await client.create( - # kind="CoreIPAddressPool", - # name="Core loopbacks 2", - # default_address_type="IpamIPAddress", - # default_prefix_length=32, - # ip_namespace=default_ipam_namespace, - # resources=[ip_prefix], - # ) - # await ip_pool.save() - - # device = await client.create(kind="InfraDevice", name="core05", primary_address=starter_ip_address) - # await device.save() - - # device.primary_address = ip_pool - # await device.save() - - # assert str(device.primary_address.peer.address.value) == "192.168.0.1/32" + async def test_relationship_manager_errors_without_fetch( + self, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe, tag_blue + ) -> None: + car = await client.create( + kind=TESTING_CAR, name="UnfetchedCar", color="Blue", manufacturer=manufacturer_mercedes, owner=person_joe + ) + await car.save() + + with pytest.raises(UninitializedError, match=r"Must call fetch"): + car.tags.add(tag_blue) + + await car.tags.fetch() + car.tags.add(tag_blue) + await car.save() + + car = await client.get(kind=TESTING_CAR, id=car.id) + await car.tags.fetch() + assert {t.id for t in car.tags.peers} == {tag_blue.id} + + async def test_relationships_not_overwritten( + self, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe, tag_blue, tag_red + ) -> None: + car = await client.create( + kind=TESTING_CAR, + name="RelationshipTestCar", + color="Green", + manufacturer=manufacturer_mercedes, + owner=person_joe, + ) + await car.save() + + await car.tags.fetch() + car.tags.add(tag_blue) + await car.save() + + car_refetch = await client.get(kind=TESTING_CAR, id=car.id) + car_refetch.color.value = "Red" + await car_refetch.save() + + # Verify the tag relationship was not overwritten + refreshed_car = await client.get(kind=TESTING_CAR, id=car.id) + await refreshed_car.tags.fetch() + assert [t.id for t in refreshed_car.tags.peers] == [tag_blue.id] + + # Check that we can purposefully remove a tag + refreshed_car.tags.remove(tag_blue.id) + await refreshed_car.save() + car_without_tag = await client.get(kind=TESTING_CAR, id=car.id) + await car_without_tag.tags.fetch() + assert car_without_tag.tags.peers == [] + + # Check that we can purposefully add a tag + car_without_tag.tags.add(tag_red) + await car_without_tag.save() + car_with_new_tag = await client.get(kind=TESTING_CAR, id=car.id) + await car_with_new_tag.tags.fetch() + assert [t.id for t in car_with_new_tag.tags.peers] == [tag_red.id] + + async def test_node_update_idempotency(self, client: InfrahubClient, initial_schema: None) -> None: + original_query = "query { CoreRepository { edges { node { name { value }}}}}" + node = await client.create(kind="CoreGraphQLQuery", name="idempotency-query", query=original_query) + await node.save() + + node = await client.get(kind="CoreGraphQLQuery", name__value="idempotency-query") + assert node.id is not None + + updated_query = f"\n\n{node.query.value}" + node.name.value = "idempotency-query-updated" + node.query.value = updated_query + first_update = node._generate_input_data(exclude_unmodified=True) + await node.save() + + # Verify the first update contains the changes + assert "query" in first_update["data"]["data"] + assert "value" in first_update["data"]["data"]["query"] + assert first_update["variables"] + + # Fetch the node again and set the same values + node = await client.get(kind="CoreGraphQLQuery", name__value="idempotency-query-updated") + node.name.value = "idempotency-query-updated" + node.query.value = updated_query + second_update = node._generate_input_data(exclude_unmodified=True) + + # Verify the second update doesn't contain any data (idempotent) + assert "query" not in second_update["data"]["data"] + assert not second_update["variables"] + + +class TestNodeWithPools(TestInfrahubDockerClient): + @pytest.fixture(scope="class") + async def load_ipam_schema(self, default_branch: str, client: InfrahubClient, ipam_schema: dict[str, Any]) -> None: + await client.schema.wait_until_converged(branch=default_branch) + resp = await client.schema.load(schemas=[ipam_schema], branch=default_branch, wait_until_converged=True) + assert resp.errors == {} + + @pytest.fixture(scope="class") + async def default_ipam_namespace(self, client: InfrahubClient, load_ipam_schema: None) -> IpamNamespace: + return await client.get(kind=IpamNamespace, name__value="default") + + @pytest.fixture(scope="class") + async def ip_prefix(self, client: InfrahubClient, load_ipam_schema: None) -> InfrahubNode: + prefix = await client.create(kind="IpamIPPrefix", prefix="192.0.2.0/24", member_type="address") + await prefix.save() + return prefix + + @pytest.fixture(scope="class") + async def ip_pool( + self, client: InfrahubClient, ip_prefix: InfrahubNode, default_ipam_namespace: IpamNamespace + ) -> InfrahubNode: + pool = await client.create( + kind="CoreIPAddressPool", + name="Test IP Pool", + default_address_type="IpamIPAddress", + default_prefix_length=32, + resources=[ip_prefix], + ip_namespace=default_ipam_namespace, + ) + await pool.save() + return pool + + async def test_node_create_from_pool( + self, client: InfrahubClient, ip_pool: InfrahubNode, load_ipam_schema: None + ) -> None: + devices = [] + for i in range(1, 4): + device = await client.create(kind="InfraDevice", name=f"device-{i:02d}", primary_address=ip_pool) + await device.save() + devices.append(device) + + ip_addresses = [] + devices = await client.all(kind="InfraDevice", prefetch_relationships=True) + for device in devices: + assert device.primary_address.peer is not None + ip_addresses.append(device.primary_address.peer.address.value) + + assert len(set(ip_addresses)) == len(devices) + + for ip in ip_addresses: + assert ip in ipaddress.ip_network("192.0.2.0/24") + + async def test_allocate_next_ip_address_idempotent( + self, client: InfrahubClient, ip_pool: InfrahubNode, load_ipam_schema: None + ) -> None: + identifier = "idempotent-allocation-test" + + # Allocate twice with the same identifier + ip1 = await client.allocate_next_ip_address(resource_pool=ip_pool, identifier=identifier) + ip2 = await client.allocate_next_ip_address(resource_pool=ip_pool, identifier=identifier) + + assert ip1.id == ip2.id + assert ip1.address.value == ip2.address.value + + async def test_node_update_from_pool( + self, client: InfrahubClient, load_ipam_schema: None, default_ipam_namespace: IpamNamespace + ) -> None: + starter_ip_address = await client.create(kind="IpamIPAddress", address="10.0.0.1/32") + await starter_ip_address.save() + + ip_prefix = await client.create(kind="IpamIPPrefix", prefix="192.168.0.0/24", member_type="address") + await ip_prefix.save() + + ip_pool = await client.create( + kind="CoreIPAddressPool", + name="Update Test Pool", + default_address_type="IpamIPAddress", + default_prefix_length=32, + resources=[ip_prefix], + ip_namespace=default_ipam_namespace, + ) + await ip_pool.save() + + device = await client.create(kind="InfraDevice", name="update-device", primary_address=starter_ip_address) + await device.save() + + device.primary_address = ip_pool + await device.save() + + fetched_device = await client.get(kind="InfraDevice", id=device.id, prefetch_relationships=True) + assert fetched_device.primary_address.peer is not None + assert fetched_device.primary_address.peer.address.value == ipaddress.ip_interface("192.168.0.1/32") From 3960ad8a661b1859aaa096118d3964c532a6ff45 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 10:29:05 +0100 Subject: [PATCH 05/10] Fix import --- tests/integration/test_infrahub_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_infrahub_client.py b/tests/integration/test_infrahub_client.py index 8fd96e5a..79c5e380 100644 --- a/tests/integration/test_infrahub_client.py +++ b/tests/integration/test_infrahub_client.py @@ -12,6 +12,7 @@ from infrahub_sdk.task.models import Task, TaskFilter, TaskLog, TaskState from infrahub_sdk.testing.docker import TestInfrahubDockerClient from infrahub_sdk.testing.schemas.animal import TESTING_ANIMAL, TESTING_CAT, TESTING_DOG, TESTING_PERSON, SchemaAnimal +from infrahub_sdk.types import Order if TYPE_CHECKING: from infrahub_sdk import InfrahubClient From e5ec9f02a0aa077299ad48ef81a09bebbb205579 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 12:37:45 +0100 Subject: [PATCH 06/10] Convert infrahub client and client sync tests --- pyproject.toml | 3 +- tests/integration/conftest.py | 8 +- tests/integration/test_infrahub_client.py | 311 ++++---- .../integration/test_infrahub_client_sync.py | 700 ++++++++++-------- 4 files changed, 554 insertions(+), 468 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 874f0033..59fa2ad5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -348,7 +348,8 @@ max-complexity = 17 "tests/unit/sdk/graphql/test_query.py" = ["ANN001"] # 7 errors # tests/integration/ - 60 errors total -"tests/integration/test_infrahub_client.py" = ["ANN001"] # 32 errors +"tests/integration/test_infrahub_client.py" = ["ANN001", "PLR0904"] # 32 errors +"tests/integration/test_infrahub_client_sync.py" = ["ANN001", "PLR0904"] # 32 errors "tests/integration/test_node.py" = ["ANN001"] # 15 errors "tests/integration/test_infrahubctl.py" = ["ANN001"] # 9 errors "tests/integration/test_convert_object_type.py" = ["ANN001"] # 3 errors diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0d13ac8d..25347480 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -11,7 +11,7 @@ TEST_IN_DOCKER = str_to_bool(os.environ.get("INFRAHUB_TEST_IN_DOCKER", "false")) -@pytest.fixture(scope="class") +@pytest.fixture(scope="module") def schema_extension_01() -> dict[str, Any]: return { "version": "1.0", @@ -57,7 +57,7 @@ def schema_extension_01() -> dict[str, Any]: } -@pytest.fixture(scope="class") +@pytest.fixture(scope="module") def schema_extension_02() -> dict[str, Any]: return { "version": "1.0", @@ -108,7 +108,7 @@ def schema_extension_02() -> dict[str, Any]: } -@pytest.fixture(scope="class") +@pytest.fixture(scope="module") def hierarchical_schema() -> dict[str, Any]: return { "version": "1.0", @@ -155,7 +155,7 @@ def hierarchical_schema() -> dict[str, Any]: } -@pytest.fixture(scope="class") +@pytest.fixture(scope="module") def ipam_schema() -> dict[str, Any]: return { "version": "1.0", diff --git a/tests/integration/test_infrahub_client.py b/tests/integration/test_infrahub_client.py index 79c5e380..b67b022a 100644 --- a/tests/integration/test_infrahub_client.py +++ b/tests/integration/test_infrahub_client.py @@ -1,22 +1,23 @@ from __future__ import annotations from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING +from pathlib import Path import pytest +from infrahub_sdk import Config, InfrahubClient from infrahub_sdk.branch import BranchData +from infrahub_sdk.constants import InfrahubClientMode from infrahub_sdk.exceptions import BranchNotFoundError, URLNotFoundError from infrahub_sdk.node import InfrahubNode -from infrahub_sdk.schema import ProfileSchemaAPI +from infrahub_sdk.playback import JSONPlayback +from infrahub_sdk.recorder import JSONRecorder +from infrahub_sdk.schema import GenericSchema, NodeSchema, ProfileSchemaAPI from infrahub_sdk.task.models import Task, TaskFilter, TaskLog, TaskState from infrahub_sdk.testing.docker import TestInfrahubDockerClient from infrahub_sdk.testing.schemas.animal import TESTING_ANIMAL, TESTING_CAT, TESTING_DOG, TESTING_PERSON, SchemaAnimal from infrahub_sdk.types import Order -if TYPE_CHECKING: - from infrahub_sdk import InfrahubClient - class TestInfrahubNode(TestInfrahubDockerClient, SchemaAnimal): @pytest.fixture(scope="class") @@ -107,6 +108,19 @@ async def test_get_generic_fragment(self, client: InfrahubClient, base_dataset) assert nodes[0].typename in {TESTING_DOG, TESTING_CAT} assert nodes[0].breed.value is not None + async def test_get_generic_filter_source(self, client: InfrahubClient, base_dataset, person_liam) -> None: + admin = await client.get(kind="CoreAccount", name__value="admin") + + obj = await client.create( + kind=TESTING_CAT, name={"value": "SourceFilterCat", "source": admin.id}, breed="Siamese", owner=person_liam + ) + await obj.save() + + nodes = await client.filters(kind="CoreNode", any__source__id=admin.id) + assert len(nodes) == 1 + assert nodes[0].typename == TESTING_CAT + assert nodes[0].id == obj.id + async def test_get_related_nodes(self, client: InfrahubClient, base_dataset, person_ethan) -> None: ethan = await client.get(kind=TESTING_PERSON, id=person_ethan.id) assert ethan @@ -115,16 +129,21 @@ async def test_get_related_nodes(self, client: InfrahubClient, base_dataset, per await ethan.animals.fetch() assert len(ethan.animals.peers) == 3 + async def test_count(self, client: InfrahubClient, base_dataset) -> None: + count = await client.count(kind=TESTING_PERSON) + assert count == 3 + + async def test_count_with_filter(self, client: InfrahubClient, base_dataset) -> None: + count = await client.count(kind=TESTING_PERSON, name__values=["Liam Walker", "Ethan Carter"]) + assert count == 2 + async def test_profile(self, client: InfrahubClient, base_dataset, person_liam) -> None: profile_schema_kind = f"Profile{TESTING_DOG}" profile_schema = await client.schema.get(kind=profile_schema_kind) assert isinstance(profile_schema, ProfileSchemaAPI) profile1 = await client.create( - kind=profile_schema_kind, - profile_name="profile1", - profile_priority=1000, - color="#111111", + kind=profile_schema_kind, profile_name="profile1", profile_priority=1000, color="#111111" ) await profile1.save() @@ -136,6 +155,29 @@ async def test_profile(self, client: InfrahubClient, base_dataset, person_liam) obj1 = await client.get(kind=TESTING_DOG, id=obj.id) assert obj1.color.value == "#111111" + @pytest.mark.xfail(reason="Require Infrahub v1.7") + async def test_profile_relationship_is_from_profile( + self, client: InfrahubClient, base_dataset, person_liam + ) -> None: + tag = await client.create(kind="BuiltinTag", name="profile-tag-test") + await tag.save() + + profile_schema_kind = f"Profile{TESTING_PERSON}" + profile = await client.create( + kind=profile_schema_kind, profile_name="person-profile-with-tag", profile_priority=1000, tags=[tag] + ) + await profile.save() + + person = await client.create(kind=TESTING_PERSON, name="Profile Relationship Test Person", profiles=[profile]) + await person.save() + + fetched_person = await client.get(kind=TESTING_PERSON, id=person.id, property=True, include=["tags"]) + assert fetched_person.tags.initialized + assert len(fetched_person.tags.peers) == 1 + assert fetched_person.tags.peers[0].id == tag.id + assert fetched_person.tags.peers[0].is_from_profile + assert fetched_person.tags.is_from_profile + async def test_create_branch(self, client: InfrahubClient, base_dataset) -> None: branch = await client.branch.create(branch_name="new-branch-1") assert isinstance(branch, BranchData) @@ -145,20 +187,18 @@ async def test_create_branch_async(self, client: InfrahubClient, base_dataset) - task_id = await client.branch.create(branch_name="new-branch-2", wait_until_completion=False) assert isinstance(task_id, str) - async def test_count(self, client: InfrahubClient, base_dataset) -> None: - count = await client.count(kind=TESTING_PERSON) - assert count == 3 - - async def test_count_with_filter(self, client: InfrahubClient, base_dataset) -> None: - count = await client.count(kind=TESTING_PERSON, name__values=["Liam Walker", "Ethan Carter"]) - assert count == 2 - async def test_query_unexisting_branch(self, client: InfrahubClient) -> None: with pytest.raises(URLNotFoundError, match=r"/graphql/unexisting` not found."): await client.execute_graphql(query="unused", branch_name="unexisting") async def test_create_generic_rel_with_hfid( - self, client: InfrahubClient, base_dataset, cat_luna, person_sophia, schema_animal, schema_cat + self, + client: InfrahubClient, + base_dataset: None, + cat_luna: InfrahubNode, + person_sophia: InfrahubNode, + schema_animal: GenericSchema, + schema_cat: NodeSchema, ) -> None: # See https://github.com/opsmill/infrahub-sdk-python/issues/277 assert schema_animal.human_friendly_id != schema_cat.human_friendly_id, ( @@ -219,128 +259,113 @@ async def test_task_query(self, client: InfrahubClient, base_dataset, set_pagina assert all_logs[0].timestamp assert all_logs[0].severity - # async def test_get_generic_filter_source(self, client: InfrahubClient, base_dataset): - # admin = await client.get(kind="CoreAccount", name__value="admin") - - # obj1 = await client.create( - # kind="BuiltinLocation", name={"value": "jfk3", "source": admin.id}, description="new york", type="site" - # ) - # await obj1.save() - - # nodes = await client.filters(kind="CoreNode", any__source__id=admin.id) - # assert len(nodes) == 1 - # assert nodes[0].typename == "BuiltinLocation" - # assert nodes[0].id == obj1.id - - -# async def test_tracking_mode(self, client: InfrahubClient, db: InfrahubDatabase, init_db_base, base_dataset): -# tag_names = ["BLUE", "RED", "YELLOW"] -# orgname = "Acme" -# -# async def create_org_with_tag(clt: InfrahubClient, nbr_tags: int): -# tags = [] -# for idx in range(nbr_tags): -# obj = await clt.create(kind="BuiltinTag", name=f"tracking-{tag_names[idx]}") -# await obj.save(allow_upsert=True) -# tags.append(obj) -# -# org = await clt.create(kind="TestOrganization", name=orgname, tags=tags) -# await org.save(allow_upsert=True) -# -# # First execution, we create one org with 3 tags -# nbr_tags = 3 -# async with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# await create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = await client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 4 -# tags = await client.all(kind="BuiltinTag") -# assert len(tags) == 3 -# -# # Second execution, we create one org with 2 tags but we don't delete the third one -# nbr_tags = 2 -# async with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=False) as clt: -# await create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = await client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 3 -# tags = await client.all(kind="BuiltinTag") -# assert len(tags) == 3 -# -# # Third execution, we create one org with 1 tag and we delete the second one -# nbr_tags = 1 -# async with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# await create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = await client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 2 -# -# tags = await client.all(kind="BuiltinTag") -# assert len(tags) == 2 -# -# # Forth one, validate that the group will not be updated if there is an exception -# nbr_tags = 3 -# with pytest.raises(ValueError): -# async with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# await create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# raise ValueError("something happened") -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = await client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 2 -# -# async def test_recorder_with_playback_rewrite_host( -# self, client: InfrahubClient, db: InfrahubDatabase, init_db_base, base_dataset, tmp_path: Path -# ): -# client.config.custom_recorder = JSONRecorder(host="recorder-test", directory=str(tmp_path)) -# nodes = await client.all(kind="CoreRepository") -# -# playback_config = JSONPlayback(directory=str(tmp_path)) -# config = Config( -# address="http://recorder-test:8000", -# requester=playback_config.async_request, -# ) -# playback = InfrahubClient(config=config) -# recorded_nodes = await playback.all(kind="CoreRepository") -# -# assert len(nodes) == 1 -# assert nodes == recorded_nodes -# assert recorded_nodes[0].name.value == "repository1" -# - - -# # See issue #148. -# async def test_hierarchical( -# self, client: InfrahubClient, db: InfrahubDatabase, init_db_base, base_dataset, hierarchical_schema -# ): -# await client.schema.load(schemas=[hierarchical_schema]) -# -# location_country = await client.create( -# kind="LocationCountry", name="country_name", shortname="country_shortname" -# ) -# await location_country.save() -# -# location_site = await client.create( -# kind="LocationSite", name="site_name", shortname="site_shortname", parent=location_country -# ) -# await location_site.save() -# -# nodes = await client.all(kind="LocationSite", prefetch_relationships=True, populate_store=True) -# assert len(nodes) == 1 -# site_node = nodes[0] -# assert site_node.name.value == "site_name" -# assert site_node.shortname.value == "site_shortname" -# country_node = site_node.parent.get() -# assert country_node.name.value == "country_name" -# assert country_node.shortname.value == "country_shortname" + async def test_tracking_mode(self, client: InfrahubClient, base_dataset) -> None: + tag_names = ["BLUE", "RED", "YELLOW"] + person_name = "TrackingTestPerson" + + async def create_person_with_tags(clt: InfrahubClient, nbr_tags: int) -> None: + tags = [] + for idx in range(nbr_tags): + obj = await clt.create(kind="BuiltinTag", name=f"tracking-{tag_names[idx]}") + await obj.save(allow_upsert=True) + tags.append(obj) + + person = await clt.create(kind=TESTING_PERSON, name=person_name, tags=tags) + await person.save(allow_upsert=True) + + # First execution, we create one person with 3 tags + nbr_tags = 3 + async with client.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt: + await create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client.mode == InfrahubClientMode.DEFAULT + group = await client.get( + kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 4 # 1 person + 3 tags + + # Second execution, we create one person with 2 tags but we don't delete the third one + nbr_tags = 2 + async with client.start_tracking(params={"person_name": person_name}, delete_unused_nodes=False) as clt: + await create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client.mode == InfrahubClientMode.DEFAULT + group = await client.get( + kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 3 # 1 person + 2 tags (third tag still exists but not in group) + + # Third execution, we create one person with 1 tag and we delete the second one + nbr_tags = 1 + async with client.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt: + await create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client.mode == InfrahubClientMode.DEFAULT + group = await client.get( + kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 2 # 1 person + 1 tag + + # Fourth execution, validate that the group will not be updated if there is an exception + nbr_tags = 3 + with pytest.raises(ValueError): + async with client.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt: + await create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + raise ValueError("something happened") + + # Group should still have 2 members since the exception caused a rollback + group = await client.get( + kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 2 + + @pytest.mark.xfail(reason="https://github.com/opsmill/infrahub-sdk-python/issues/733") + async def test_recorder_with_playback_rewrite_host( + self, base_dataset: None, tmp_path: Path, infrahub_port: int + ) -> None: + # Create a fresh client for recording to ensure clean state (no cached schema) + recorder_config = Config( + username="admin", + password="infrahub", + address=f"http://localhost:{infrahub_port}", + custom_recorder=JSONRecorder(host="recorder-test", directory=str(tmp_path)), + ) + recorder_client = InfrahubClient(config=recorder_config) + + query = "query { BuiltinTag { edges { node { id name { value } } } } }" + result = await recorder_client.execute_graphql(query=query) + + playback_config = JSONPlayback(directory=str(tmp_path)) + config = Config(address=f"http://recorder-test:{infrahub_port}", requester=playback_config.async_request) + playback = InfrahubClient(config=config) + recorded_result = await playback.execute_graphql(query=query) + + assert result == recorded_result + assert result.get("BuiltinTag", {}).get("edges") is not None + + +class TestHierarchicalSchema(TestInfrahubDockerClient): + @pytest.fixture(scope="class") + async def load_hierarchical_schema(self, client: InfrahubClient, hierarchical_schema: dict) -> None: + resp = await client.schema.load(schemas=[hierarchical_schema], wait_until_converged=True) + assert resp.errors == {} + + async def test_hierarchical(self, client: InfrahubClient, load_hierarchical_schema: None) -> None: + location_country = await client.create( + kind="LocationCountry", name="country_name", shortname="country_shortname" + ) + await location_country.save() + + location_site = await client.create( + kind="LocationSite", name="site_name", shortname="site_shortname", parent=location_country + ) + await location_site.save() + + nodes = await client.all(kind="LocationSite", prefetch_relationships=True, populate_store=True) + assert len(nodes) == 1 + site_node = nodes[0] + assert site_node.name.value == "site_name" + assert site_node.shortname.value == "site_shortname" + + country_node = site_node.parent.get() + assert country_node.name.value == "country_name" diff --git a/tests/integration/test_infrahub_client_sync.py b/tests/integration/test_infrahub_client_sync.py index d5b9ad48..f156983c 100644 --- a/tests/integration/test_infrahub_client_sync.py +++ b/tests/integration/test_infrahub_client_sync.py @@ -1,320 +1,380 @@ -# from __future__ import annotations -# -# from typing import TYPE_CHECKING -# -# import pytest -# from infrahub.core.initialization import create_branch -# from infrahub.core.node import Node -# from infrahub.server import app -# -# from infrahub_sdk import Config, InfrahubClientSync -# from infrahub_sdk.branch import BranchData -# from infrahub_sdk.constants import InfrahubClientMode -# from infrahub_sdk.exceptions import BranchNotFoundError -# from infrahub_sdk.node import InfrahubNodeSync -# from infrahub_sdk.playback import JSONPlayback -# from infrahub_sdk.recorder import JSONRecorder -# from infrahub_sdk.schema import ProfileSchema -# -# from .conftest import InfrahubTestClient -# -# if TYPE_CHECKING: -# from pathlib import Path -# -# from infrahub.database import InfrahubDatabase -# -# -# -# -# -# class TestInfrahubClientSync: -# @pytest.fixture(scope="class") -# async def test_client(self) -> InfrahubTestClient: -# return InfrahubTestClient(app) -# -# @pytest.fixture -# def client(self, test_client: InfrahubTestClient): -# config = Config( -# username="admin", -# password="infrahub", -# sync_requester=test_client.sync_request, -# ) -# return InfrahubClientSync(config=config) -# -# @pytest.fixture(scope="class") -# async def base_dataset(self, db: InfrahubDatabase, test_client: InfrahubTestClient, builtin_org_schema): -# config = Config(username="admin", password="infrahub", sync_requester=test_client.sync_request) -# client = InfrahubClientSync(config=config) -# response = client.schema.load(schemas=[builtin_org_schema]) -# assert not response.errors -# -# await create_branch(branch_name="branch01", db=db) -# -# query_string = """ -# query { -# branch { -# id -# name -# } -# } -# """ -# obj1 = await Node.init(schema="CoreGraphQLQuery", db=db) -# await obj1.new(db=db, name="test_query2", description="test query", query=query_string) -# await obj1.save(db=db) -# -# obj2 = await Node.init(schema="CoreRepository", db=db) -# await obj2.new( -# db=db, -# name="repository1", -# description="test repository", -# location="git@github.com:mock/test.git", -# ) -# await obj2.save(db=db) -# -# obj3 = await Node.init(schema="CoreTransformJinja2", db=db) -# await obj3.new( -# db=db, -# name="rfile1", -# description="test rfile", -# template_path="mytemplate.j2", -# repository=obj2, -# query=obj1, -# ) -# await obj3.save(db=db) -# -# obj4 = await Node.init(schema="CoreTransformPython", db=db) -# await obj4.new( -# db=db, -# name="transform01", -# description="test transform01", -# file_path="mytransformation.py", -# class_name="Transform01", -# query=obj1, -# repository=obj2, -# ) -# await obj4.save(db=db) -# -# async def test_query_branches(self, client: InfrahubClientSync, init_db_base, base_dataset): -# branches = client.branch.all() -# main = client.branch.get(branch_name="main") -# -# with pytest.raises(BranchNotFoundError): -# client.branch.get(branch_name="not-found") -# -# assert main.name == "main" -# assert "main" in branches -# assert "branch01" in branches -# -# async def test_branch_delete(self, client: InfrahubClientSync, init_db_base, base_dataset, db): -# async_branch = "async-delete-branch" -# await create_branch(branch_name=async_branch, db=db) -# -# pre_delete = client.branch.all() -# client.branch.delete(async_branch) -# post_delete = client.branch.all() -# assert async_branch in pre_delete.keys() -# assert async_branch not in post_delete.keys() -# -# async def test_get_all(self, client: InfrahubClientSync, init_db_base, base_dataset): -# obj1 = client.create(kind="BuiltinLocation", name="jfk1", description="new york", type="site") -# obj1.save() -# -# obj2 = client.create(kind="BuiltinLocation", name="sfo1", description="san francisco", type="site") -# obj2.save() -# -# nodes = client.all(kind="BuiltinLocation") -# assert len(nodes) == 2 -# assert isinstance(nodes[0], InfrahubNodeSync) -# assert sorted([node.name.value for node in nodes]) == ["jfk1", "sfo1"] # type: ignore[attr-defined] -# -# async def test_get_one(self, client: InfrahubClientSync, init_db_base, base_dataset): -# obj1 = client.create(kind="BuiltinLocation", name="jfk2", description="new york", type="site") -# obj1.save() -# -# obj2 = client.create(kind="BuiltinLocation", name="sfo2", description="san francisco", type="site") -# obj2.save() -# -# node1 = client.get(kind="BuiltinLocation", id=obj1.id) -# assert isinstance(node1, InfrahubNodeSync) -# assert node1.name.value == "jfk2" # type: ignore[attr-defined] -# -# node2 = client.get(kind="BuiltinLocation", id="jfk2") -# assert isinstance(node2, InfrahubNodeSync) -# assert node2.name.value == "jfk2" # type: ignore[attr-defined] -# -# async def test_filters_partial_match(self, client: InfrahubClientSync, init_db_base, base_dataset): -# nodes = client.filters(kind="BuiltinLocation", name__value="jfk") -# assert not nodes -# -# nodes = client.filters(kind="BuiltinLocation", partial_match=True, name__value="jfk") -# assert len(nodes) == 2 -# assert isinstance(nodes[0], InfrahubNodeSync) -# assert sorted([node.name.value for node in nodes]) == ["jfk1", "jfk2"] # type: ignore[attr-defined] -# -# async def test_get_generic(self, client: InfrahubClientSync, init_db_base): -# nodes = client.all(kind="CoreNode") -# assert len(nodes) -# -# async def test_get_generic_fragment(self, client: InfrahubClientSync, init_db_base, base_dataset): -# nodes = client.all(kind="CoreGenericAccount", fragment=True, exclude=["type"]) -# assert len(nodes) -# assert nodes[0].typename == "CoreAccount" -# assert nodes[0].name.value is not None # type: ignore[attr-defined] -# -# async def test_get_generic_filter_source(self, client: InfrahubClientSync, init_db_base, base_dataset): -# admin = client.get(kind="CoreAccount", name__value="admin") -# -# obj1 = client.create( -# kind="BuiltinLocation", name={"value": "jfk3", "source": admin.id}, description="new york", type="site" -# ) -# obj1.save() -# -# nodes = client.filters(kind="CoreNode", any__source__id=admin.id) -# assert len(nodes) == 1 -# assert nodes[0].typename == "BuiltinLocation" -# assert nodes[0].id == obj1.id -# -# async def test_get_related_nodes(self, client: InfrahubClientSync, init_db_base, base_dataset): -# nodes = client.all(kind="CoreRepository") -# assert len(nodes) == 1 -# repo = nodes[0] -# -# assert repo.transformations.peers == [] # type: ignore[attr-defined] -# repo.transformations.fetch() # type: ignore[attr-defined] -# assert len(repo.transformations.peers) == 2 # type: ignore[attr-defined] -# -# def test_tracking_mode(self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset): -# tag_names = ["BLUE", "RED", "YELLOW"] -# orgname = "Acme" -# -# def create_org_with_tag(clt: InfrahubClientSync, nbr_tags: int): -# tags = [] -# for idx in range(nbr_tags): -# obj = clt.create(kind="BuiltinTag", name=f"tracking-{tag_names[idx]}") -# obj.save(allow_upsert=True) -# tags.append(obj) -# -# org = clt.create(kind="TestOrganization", name=orgname, tags=tags) -# org.save(allow_upsert=True) -# -# # First execution, we create one org with 3 tags -# nbr_tags = 3 -# with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 4 -# tags = client.all(kind="BuiltinTag") -# assert len(tags) == 3 -# -# # Second execution, we create one org with 2 tags but we don't delete the third one -# nbr_tags = 2 -# with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=False) as clt: -# create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 3 -# tags = client.all(kind="BuiltinTag") -# assert len(tags) == 3 -# -# # Third execution, we create one org with 1 tag and we delete the second one -# nbr_tags = 1 -# with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 2 -# -# tags = client.all(kind="BuiltinTag") -# assert len(tags) == 2 -# -# # Forth one, validate that the group will not be updated if there is an exception -# nbr_tags = 3 -# with pytest.raises(ValueError): -# with client.start_tracking(params={"orgname": orgname}, delete_unused_nodes=True) as clt: -# create_org_with_tag(clt=clt, nbr_tags=nbr_tags) -# raise ValueError("something happened") -# -# assert client.mode == InfrahubClientMode.DEFAULT -# group = client.get( -# kind="CoreStandardGroup", name__value=client.group_context._generate_group_name(), include=["members"] -# ) -# assert len(group.members.peers) == 2 -# -# def test_recorder_with_playback( -# self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset, tmp_path: Path -# ): -# client.config.custom_recorder = JSONRecorder(directory=str(tmp_path)) -# nodes = client.all(kind="CoreRepository") -# -# playback_config = JSONPlayback(directory=str(tmp_path)) -# config = Config( -# address=client.config.address, -# sync_requester=playback_config.sync_request, -# ) -# playback = InfrahubClientSync(config=config) -# recorded_nodes = playback.all(kind="CoreRepository") -# -# assert len(nodes) == 1 -# assert nodes == recorded_nodes -# assert recorded_nodes[0].name.value == "repository1" -# -# def test_profile(self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset): -# profile_schema_kind = "ProfileBuiltinStatus" -# profile_schema = client.schema.get(kind=profile_schema_kind) -# assert isinstance(profile_schema, ProfileSchema) -# -# profile1 = client.create( -# kind=profile_schema_kind, -# profile_name="profile1", -# profile_priority=1000, -# description="description in profile", -# ) -# profile1.save() -# -# obj = client.create(kind="BuiltinStatus", name="planned", profiles=[profile1]) -# obj.save() -# -# obj1 = client.get(kind="BuiltinStatus", id=obj.id) -# assert obj1.description.value == "description in profile" -# -# def test_create_branch(self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset): -# branch = client.branch.create(branch_name="new-branch-1") -# assert isinstance(branch, BranchData) -# assert branch.id is not None -# -# def test_create_branch_async(self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset): -# task_id = client.branch.create(branch_name="new-branch-2", wait_until_completion=False) -# assert isinstance(task_id, str) -# -# # See issue #148. -# def test_hierarchical( -# self, client: InfrahubClientSync, db: InfrahubDatabase, init_db_base, base_dataset, hierarchical_schema -# ): -# client.schema.load(schemas=[hierarchical_schema]) -# -# location_country = client.create(kind="LocationCountry", name="country_name", shortname="country_shortname") -# location_country.save() -# -# location_site = client.create( -# kind="LocationSite", name="site_name", shortname="site_shortname", parent=location_country -# ) -# location_site.save() -# -# nodes = client.all(kind="LocationSite", prefetch_relationships=True, populate_store=True) -# assert len(nodes) == 1 -# site_node = nodes[0] -# assert site_node.name.value == "site_name" -# assert site_node.shortname.value == "site_shortname" -# country_node = site_node.parent.get() -# assert country_node.name.value == "country_name" -# assert country_node.shortname.value == "country_shortname" +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest + +from infrahub_sdk import Config, InfrahubClientSync +from infrahub_sdk.branch import BranchData +from infrahub_sdk.constants import InfrahubClientMode +from infrahub_sdk.exceptions import BranchNotFoundError, URLNotFoundError +from infrahub_sdk.node import InfrahubNodeSync +from infrahub_sdk.playback import JSONPlayback +from infrahub_sdk.recorder import JSONRecorder +from infrahub_sdk.schema import GenericSchema, NodeSchema, ProfileSchemaAPI +from infrahub_sdk.task.models import Task, TaskFilter, TaskLog, TaskState +from infrahub_sdk.testing.docker import TestInfrahubDockerClient +from infrahub_sdk.testing.schemas.animal import TESTING_ANIMAL, TESTING_CAT, TESTING_DOG, TESTING_PERSON, SchemaAnimal +from infrahub_sdk.types import Order + +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient + from infrahub_sdk.node import InfrahubNode + + +class TestInfrahubClientSync(TestInfrahubDockerClient, SchemaAnimal): + @pytest.fixture(scope="class") + async def base_dataset( + self, + client: InfrahubClient, + load_schema: None, + person_liam: InfrahubNode, + person_ethan: InfrahubNode, + person_sophia: InfrahubNode, + cat_luna: InfrahubNode, + cat_bella: InfrahubNode, + dog_daisy: InfrahubNode, + dog_rocky: InfrahubNode, + ) -> None: + await client.branch.create(branch_name="sync-branch01") + + def test_query_branches(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + branches = client_sync.branch.all() + main = client_sync.branch.get(branch_name="main") + + with pytest.raises(BranchNotFoundError): + client_sync.branch.get(branch_name="not-found") + + assert main.name == "main" + assert "main" in branches + assert "sync-branch01" in branches + + def test_branch_delete(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + sync_branch = "sync-delete-branch" + client_sync.branch.create(branch_name=sync_branch) + pre_delete = client_sync.branch.all() + client_sync.branch.delete(sync_branch) + post_delete = client_sync.branch.all() + assert sync_branch in pre_delete + assert sync_branch not in post_delete + + def test_get_all(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.all(kind=TESTING_CAT) + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNodeSync) + assert [node.name.value for node in nodes] == ["Bella", "Luna"] + + def test_get_all_no_order(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.all(kind=TESTING_CAT, order=Order(disable=True)) + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNodeSync) + assert {node.name.value for node in nodes} == {"Bella", "Luna"} + + def test_get_filters_no_order(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.filters(kind=TESTING_CAT, order=Order(disable=True)) + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNodeSync) + assert {node.name.value for node in nodes} == {"Bella", "Luna"} + + def test_get_one( + self, client_sync: InfrahubClientSync, base_dataset: None, cat_luna: InfrahubNode, person_sophia: InfrahubNode + ) -> None: + node1 = client_sync.get(kind=TESTING_CAT, id=cat_luna.id) + assert isinstance(node1, InfrahubNodeSync) + assert node1.name.value == "Luna" + + node2 = client_sync.get(kind=TESTING_PERSON, id=person_sophia.id) + assert isinstance(node2, InfrahubNodeSync) + assert node2.name.value == "Sophia Walker" + + def test_filters_partial_match(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.filters(kind=TESTING_PERSON, name__value="Walker") + assert not nodes + + nodes = client_sync.filters(kind=TESTING_PERSON, partial_match=True, name__value="Walker") + assert len(nodes) == 2 + assert isinstance(nodes[0], InfrahubNodeSync) + assert sorted([node.name.value for node in nodes]) == ["Liam Walker", "Sophia Walker"] + + def test_get_generic(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.all(kind=TESTING_ANIMAL) + assert len(nodes) == 4 + + def test_get_generic_fragment(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nodes = client_sync.all(kind=TESTING_ANIMAL, fragment=True) + assert len(nodes) + assert nodes[0].typename in {TESTING_DOG, TESTING_CAT} + assert nodes[0].breed.value is not None + + def test_get_generic_filter_source( + self, client_sync: InfrahubClientSync, base_dataset: None, person_liam: InfrahubNode + ) -> None: + admin = client_sync.get(kind="CoreAccount", name__value="admin") + + obj = client_sync.create( + kind=TESTING_CAT, + name={"value": "SyncSourceFilterCat", "source": admin.id}, + breed="Siamese", + owner=person_liam, + ) + obj.save() + + nodes = client_sync.filters(kind="CoreNode", any__source__id=admin.id) + assert len(nodes) == 1 + assert nodes[0].typename == TESTING_CAT + assert nodes[0].id == obj.id + + def test_get_related_nodes( + self, client_sync: InfrahubClientSync, base_dataset: None, person_ethan: InfrahubNode + ) -> None: + ethan = client_sync.get(kind=TESTING_PERSON, id=person_ethan.id) + assert ethan + + assert ethan.animals.peers == [] + ethan.animals.fetch() + assert len(ethan.animals.peers) == 3 + + def test_count(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + count = client_sync.count(kind=TESTING_PERSON) + assert count == 3 + + def test_count_with_filter(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + count = client_sync.count(kind=TESTING_PERSON, name__values=["Liam Walker", "Ethan Carter"]) + assert count == 2 + + def test_profile(self, client_sync: InfrahubClientSync, base_dataset: None, person_liam: InfrahubNode) -> None: + profile_schema_kind = f"Profile{TESTING_DOG}" + profile_schema = client_sync.schema.get(kind=profile_schema_kind) + assert isinstance(profile_schema, ProfileSchemaAPI) + + profile1 = client_sync.create( + kind=profile_schema_kind, profile_name="sync-profile1", profile_priority=1000, color="#222222" + ) + profile1.save() + + obj = client_sync.create( + kind=TESTING_DOG, name="Sync-Sparky", breed="Poodle", owner=person_liam, profiles=[profile1] + ) + obj.save() + + obj1 = client_sync.get(kind=TESTING_DOG, id=obj.id) + assert obj1.color.value == "#222222" + + @pytest.mark.xfail(reason="Require Infrahub v1.7") + def test_profile_relationship_is_from_profile( + self, client_sync: InfrahubClientSync, base_dataset: None, person_liam: InfrahubNode + ) -> None: + tag = client_sync.create(kind="BuiltinTag", name="sync-profile-tag-test") + tag.save() + + profile_schema_kind = f"Profile{TESTING_PERSON}" + profile = client_sync.create( + kind=profile_schema_kind, profile_name="sync-person-profile-with-tag", profile_priority=1000, tags=[tag] + ) + profile.save() + + person = client_sync.create( + kind=TESTING_PERSON, name="Sync Profile Relationship Test Person", profiles=[profile] + ) + person.save() + + fetched_person = client_sync.get(kind=TESTING_PERSON, id=person.id, property=True, include=["tags"]) + assert fetched_person.tags.initialized + assert len(fetched_person.tags.peers) == 1 + assert fetched_person.tags.peers[0].id == tag.id + assert fetched_person.tags.peers[0].is_from_profile + assert fetched_person.tags.is_from_profile + + def test_create_branch(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + branch = client_sync.branch.create(branch_name="sync-new-branch-1") + assert isinstance(branch, BranchData) + assert branch.id is not None + + def test_create_branch_async(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + task_id = client_sync.branch.create(branch_name="sync-new-branch-2", wait_until_completion=False) + assert isinstance(task_id, str) + + def test_query_unexisting_branch(self, client_sync: InfrahubClientSync) -> None: + with pytest.raises(URLNotFoundError, match=r"/graphql/unexisting` not found."): + client_sync.execute_graphql(query="unused", branch_name="unexisting") + + def test_create_generic_rel_with_hfid( + self, + client_sync: InfrahubClientSync, + base_dataset: None, + cat_luna: InfrahubNode, + person_sophia: InfrahubNode, + schema_animal: GenericSchema, + schema_cat: NodeSchema, + ) -> None: + # See https://github.com/opsmill/infrahub-sdk-python/issues/277 + assert schema_animal.human_friendly_id != schema_cat.human_friendly_id, ( + "Inherited node schema should have a different hfid than generic one for this test to be relevant" + ) + person_sophia_sync = client_sync.get(kind=TESTING_PERSON, id=person_sophia.id) + person_sophia_sync.favorite_animal = {"hfid": cat_luna.hfid, "kind": TESTING_CAT} + person_sophia_sync.save() + person_sophia_sync = client_sync.get(kind=TESTING_PERSON, id=person_sophia.id, prefetch_relationships=True) + assert person_sophia_sync.favorite_animal.id == cat_luna.id + + # Ensure that nullify it will remove the relationship related node + person_sophia_sync.favorite_animal = None + person_sophia_sync.save() + person_sophia_sync = client_sync.get(kind=TESTING_PERSON, id=person_sophia.id, prefetch_relationships=True) + assert not person_sophia_sync.favorite_animal.id + + def test_task_query(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + nbr_tasks = client_sync.task.count() + assert nbr_tasks + + tasks = client_sync.task.filter(filter=TaskFilter(state=[TaskState.COMPLETED])) + assert tasks + task_ids = [task.id for task in tasks] + + # Query Tasks using Parallel mode + tasks_parallel = client_sync.task.filter(filter=TaskFilter(state=[TaskState.COMPLETED]), parallel=True) + assert tasks_parallel + task_parallel_ids = [task.id for task in tasks_parallel] + + # Additional tasks might have been completed between the two queries + # validate that we get at least as many tasks as in the first query + # and that all task IDs from the first query are present in the second one + assert len(tasks_parallel) >= len(tasks) + assert set(task_ids).issubset(set(task_parallel_ids)) + + # Query Tasks by ID + tasks_parallel_filtered = client_sync.task.filter(filter=TaskFilter(ids=task_ids[:2]), parallel=True) + assert tasks_parallel_filtered + assert len(tasks_parallel_filtered) == 2 + + # Query individual Task + task = client_sync.task.get(id=tasks[0].id) + assert task + assert isinstance(task, Task) + assert task.logs == [] + + # Wait for Task completion + task = client_sync.task.wait_for_completion(id=tasks[0].id) + assert task + assert isinstance(task, Task) + + # Query Tasks with logs + tasks = client_sync.task.filter(filter=TaskFilter(state=[TaskState.COMPLETED]), include_logs=True) + all_logs = [log for task in tasks for log in task.logs] + assert all_logs + assert isinstance(all_logs[0], TaskLog) + assert all_logs[0].message + assert all_logs[0].timestamp + assert all_logs[0].severity + + def test_tracking_mode(self, client_sync: InfrahubClientSync, base_dataset: None) -> None: + tag_names = ["BLUE", "RED", "YELLOW"] + person_name = "SyncTrackingTestPerson" + + def create_person_with_tags(clt: InfrahubClientSync, nbr_tags: int) -> None: + tags = [] + for idx in range(nbr_tags): + obj = clt.create(kind="BuiltinTag", name=f"sync-tracking-{tag_names[idx]}") + obj.save(allow_upsert=True) + tags.append(obj) + + person = clt.create(kind=TESTING_PERSON, name=person_name, tags=tags) + person.save(allow_upsert=True) + + # First execution, we create one person with 3 tags + nbr_tags = 3 + with client_sync.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt: + create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client_sync.mode == InfrahubClientMode.DEFAULT + group = client_sync.get( + kind="CoreStandardGroup", name__value=client_sync.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 4 # 1 person + 3 tags + + # Second execution, we create one person with 2 tags but we don't delete the third one + nbr_tags = 2 + with client_sync.start_tracking(params={"person_name": person_name}, delete_unused_nodes=False) as clt: + create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client_sync.mode == InfrahubClientMode.DEFAULT + group = client_sync.get( + kind="CoreStandardGroup", name__value=client_sync.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 3 # 1 person + 2 tags (third tag still exists but not in group) + + # Third execution, we create one person with 1 tag and we delete the second one + nbr_tags = 1 + with client_sync.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt: + create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + + assert client_sync.mode == InfrahubClientMode.DEFAULT + group = client_sync.get( + kind="CoreStandardGroup", name__value=client_sync.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 2 # 1 person + 1 tag + + # Fourth execution, validate that the group will not be updated if there is an exception + nbr_tags = 3 + with ( + pytest.raises(ValueError), + client_sync.start_tracking(params={"person_name": person_name}, delete_unused_nodes=True) as clt, + ): + create_person_with_tags(clt=clt, nbr_tags=nbr_tags) + raise ValueError("something happened") + + # Group should still have 2 members since the exception caused a rollback + group = client_sync.get( + kind="CoreStandardGroup", name__value=client_sync.group_context._generate_group_name(), include=["members"] + ) + assert len(group.members.peers) == 2 + + @pytest.mark.xfail(reason="https://github.com/opsmill/infrahub-sdk-python/issues/733") + def test_recorder_with_playback_rewrite_host(self, base_dataset: None, tmp_path: Path, infrahub_port: int) -> None: + # Create a fresh client for recording to ensure clean state (no cached schema) + recorder_config = Config( + username="admin", + password="infrahub", + address=f"http://localhost:{infrahub_port}", + custom_recorder=JSONRecorder(host="recorder-test", directory=str(tmp_path)), + ) + recorder_client = InfrahubClientSync(config=recorder_config) + + query = "query { BuiltinTag { edges { node { id name { value } } } } }" + result = recorder_client.execute_graphql(query=query) + + playback_config = JSONPlayback(directory=str(tmp_path)) + config = Config(address=f"http://recorder-test:{infrahub_port}", sync_requester=playback_config.sync_request) + playback = InfrahubClientSync(config=config) + recorded_result = playback.execute_graphql(query=query) + + assert result == recorded_result + assert result.get("BuiltinTag", {}).get("edges") is not None + + +class TestHierarchicalSchema(TestInfrahubDockerClient): + @pytest.fixture(scope="class") + async def load_hierarchical_schema(self, client_sync: InfrahubClientSync, hierarchical_schema: dict) -> None: + resp = client_sync.schema.load(schemas=[hierarchical_schema], wait_until_converged=True) + assert resp.errors == {} + + async def test_hierarchical(self, client_sync: InfrahubClientSync, load_hierarchical_schema: None) -> None: + location_country = client_sync.create( + kind="LocationCountry", name="country_name", shortname="country_shortname" + ) + location_country.save() + + location_site = client_sync.create( + kind="LocationSite", name="site_name", shortname="site_shortname", parent=location_country + ) + location_site.save() + + nodes = client_sync.all(kind="LocationSite", prefetch_relationships=True, populate_store=True) + assert len(nodes) == 1 + site_node = nodes[0] + assert site_node.name.value == "site_name" + assert site_node.shortname.value == "site_shortname" + + country_node = site_node.parent.get() + assert country_node.name.value == "country_name" From 7974ac8a1ea0f6a734f240fe3e640033b0ffccfe Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 13:14:08 +0100 Subject: [PATCH 07/10] Convert import/export tests --- tests/integration/test_export_import.py | 872 +++++++++--------------- 1 file changed, 327 insertions(+), 545 deletions(-) diff --git a/tests/integration/test_export_import.py b/tests/integration/test_export_import.py index 846a4803..a138728f 100644 --- a/tests/integration/test_export_import.py +++ b/tests/integration/test_export_import.py @@ -1,545 +1,327 @@ -# from pathlib import Path -# from typing import Any, Dict -# -# import pytest -# import ujson -# -# from infrahub_sdk import InfrahubClient -# from infrahub_sdk.ctl.exporter import LineDelimitedJSONExporter -# from infrahub_sdk.ctl.importer import LineDelimitedJSONImporter -# from infrahub_sdk.exceptions import SchemaNotFoundError -# from infrahub_sdk.transfer.exceptions import TransferFileNotFoundError -# from infrahub_sdk.transfer.schema_sorter import InfrahubSchemaTopologicalSorter -# from tests.helpers.test_app import TestInfrahubApp -# -# PERSON_KIND = "TestingPerson" -# POOL_KIND = "TestingPool" -# CAR_KIND = "TestingCar" -# MANUFACTURER_KIND = "TestingManufacturer" -# TAG_KIND = "TestingTag" -# -# -# -# -# class TestSchemaExportImportBase(TestInfrahubApp): -# @pytest.fixture(scope="class") -# def temporary_directory(self, tmp_path_factory) -> Path: -# return tmp_path_factory.mktemp("infrahub-integration-tests") -# -# @pytest.fixture(scope="class") -# def schema_person_base(self) -> Dict[str, Any]: -# return { -# "name": "Person", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Person", -# "attributes": [ -# {"name": "name", "kind": "Text"}, -# {"name": "description", "kind": "Text", "optional": True}, -# {"name": "height", "kind": "Number", "optional": True}, -# ], -# "relationships": [ -# {"name": "cars", "kind": "Generic", "optional": True, "peer": "TestingCar", "cardinality": "many"} -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema_car_base(self) -> Dict[str, Any]: -# return { -# "name": "Car", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Car", -# "attributes": [ -# {"name": "name", "kind": "Text"}, -# {"name": "description", "kind": "Text", "optional": True}, -# {"name": "color", "kind": "Text"}, -# ], -# "relationships": [ -# { -# "name": "owner", -# "kind": "Attribute", -# "optional": False, -# "peer": "TestingPerson", -# "cardinality": "one", -# }, -# { -# "name": "manufacturer", -# "kind": "Attribute", -# "optional": False, -# "peer": "TestingManufacturer", -# "cardinality": "one", -# "identifier": "car__manufacturer", -# }, -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema_manufacturer_base(self) -> Dict[str, Any]: -# return { -# "name": "Manufacturer", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Manufacturer", -# "attributes": [{"name": "name", "kind": "Text"}, {"name": "description", "kind": "Text", "optional": True}], -# "relationships": [ -# { -# "name": "cars", -# "kind": "Generic", -# "optional": True, -# "peer": "TestingCar", -# "cardinality": "many", -# "identifier": "car__manufacturer", -# }, -# { -# "name": "customers", -# "kind": "Generic", -# "optional": True, -# "peer": "TestingPerson", -# "cardinality": "many", -# "identifier": "person__manufacturer", -# }, -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema_tag_base(self) -> Dict[str, Any]: -# return { -# "name": "Tag", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Testing Tag", -# "attributes": [{"name": "name", "kind": "Text"}], -# "relationships": [ -# {"name": "cars", "kind": "Generic", "optional": True, "peer": "TestingCar", "cardinality": "many"}, -# { -# "name": "persons", -# "kind": "Generic", -# "optional": True, -# "peer": "TestingPerson", -# "cardinality": "many", -# }, -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema(self, schema_car_base, schema_person_base, schema_manufacturer_base, schema_tag_base) -> Dict[str, Any]: -# return { -# "version": "1.0", -# "nodes": [schema_person_base, schema_car_base, schema_manufacturer_base, schema_tag_base], -# } -# -# @pytest.fixture(scope="class") -# async def initial_dataset(self, client: InfrahubClient, schema): -# await client.schema.load(schemas=[schema]) -# -# john = await client.create( -# kind=PERSON_KIND, data=dict(name="John", height=175, description="The famous Joe Doe") -# ) -# await john.save() -# -# jane = await client.create( -# kind=PERSON_KIND, data=dict(name="Jane", height=165, description="The famous Jane Doe") -# ) -# await jane.save() -# -# honda = await client.create(kind=MANUFACTURER_KIND, data=dict(name="honda", description="Honda Motor Co., Ltd")) -# await honda.save() -# -# renault = await client.create( -# kind=MANUFACTURER_KIND, -# data=dict(name="renault", description="Groupe Renault is a French multinational automobile manufacturer"), -# ) -# await renault.save() -# -# accord = await client.create( -# kind=CAR_KIND, -# data=dict(name="accord", description="Honda Accord", color="#3443eb", manufacturer=honda, owner=jane), -# ) -# await accord.save() -# -# civic = await client.create( -# kind=CAR_KIND, -# data=dict(name="civic", description="Honda Civic", color="#c9eb34", manufacturer=honda, owner=jane), -# ) -# await civic.save() -# -# megane = await client.create( -# kind=CAR_KIND, -# data=dict(name="Megane", description="Renault Megane", color="#c93420", manufacturer=renault, owner=john), -# ) -# await megane.save() -# -# blue = await client.create(kind=TAG_KIND, data=dict(name="blue", cars=[accord, civic], persons=[jane])) -# await blue.save() -# -# red = await client.create(kind=TAG_KIND, data=dict(name="red", persons=[john])) -# await red.save() -# -# objs = { -# "john": john.id, -# "jane": jane.id, -# "honda": honda.id, -# "renault": renault.id, -# "accord": accord.id, -# "civic": civic.id, -# "megane": megane.id, -# "blue": blue.id, -# "red": red.id, -# } -# -# return objs -# -# def reset_export_directory(self, temporary_directory: Path): -# for file in temporary_directory.iterdir(): -# if file.is_file(): -# file.unlink() -# -# async def test_step01_export_no_schema(self, client: InfrahubClient, temporary_directory: Path): -# exporter = LineDelimitedJSONExporter(client=client) -# await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) -# -# nodes_file = temporary_directory / "nodes.json" -# relationships_file = temporary_directory / "relationships.json" -# -# # Export should create files even if they do not really hold any data -# assert nodes_file.exists() -# assert relationships_file.exists() -# -# # Verify that only the admin account has been exported -# with nodes_file.open() as f: -# admin_account_node_dump = ujson.loads(f.readline()) -# assert admin_account_node_dump -# assert admin_account_node_dump["kind"] == "CoreAccount" -# assert ujson.loads(admin_account_node_dump["graphql_json"])["name"]["value"] == "admin" -# -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert relationships_dump -# -# async def test_step02_import_no_schema(self, client: InfrahubClient, temporary_directory: Path): -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # Schema should not be present -# for kind in (PERSON_KIND, CAR_KIND, MANUFACTURER_KIND, TAG_KIND): -# with pytest.raises(SchemaNotFoundError): -# await client.all(kind=kind) -# -# # Cleanup for next tests -# self.reset_export_directory(temporary_directory) -# -# async def test_step03_export_empty_dataset(self, client: InfrahubClient, temporary_directory: Path, schema): -# await client.schema.load(schemas=[schema]) -# -# exporter = LineDelimitedJSONExporter(client=client) -# await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) -# -# nodes_file = temporary_directory / "nodes.json" -# relationships_file = temporary_directory / "relationships.json" -# -# # Export should create files even if they do not really hold any data -# assert nodes_file.exists() -# assert relationships_file.exists() -# -# # Verify that only the admin account has been exported -# with nodes_file.open() as f: -# admin_account_node_dump = ujson.loads(f.readline()) -# assert admin_account_node_dump -# assert admin_account_node_dump["kind"] == "CoreAccount" -# assert ujson.loads(admin_account_node_dump["graphql_json"])["name"]["value"] == "admin" -# -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert relationships_dump -# -# async def test_step04_import_empty_dataset(self, client: InfrahubClient, temporary_directory: Path, schema): -# await client.schema.load(schemas=[schema]) -# -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # No data for any kind should be retrieved -# for kind in (PERSON_KIND, CAR_KIND, MANUFACTURER_KIND, TAG_KIND): -# assert not await client.all(kind=kind) -# -# # Cleanup for next tests -# self.reset_export_directory(temporary_directory) -# -# async def test_step05_export_initial_dataset( -# self, client: InfrahubClient, temporary_directory: Path, initial_dataset -# ): -# exporter = LineDelimitedJSONExporter(client=client) -# await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) -# -# nodes_file = temporary_directory / "nodes.json" -# relationships_file = temporary_directory / "relationships.json" -# -# # Export should create files -# assert nodes_file.exists() -# assert relationships_file.exists() -# -# # Verify that nodes have been exported -# nodes_dump = [] -# with nodes_file.open() as reader: -# while line := reader.readline(): -# nodes_dump.append(ujson.loads(line)) -# assert len(nodes_dump) == len(initial_dataset) + 5 # add number to account for default data -# -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert relationships_dump -# -# async def test_step06_import_initial_dataset(self, client: InfrahubClient, temporary_directory: Path, schema): -# await client.schema.load(schemas=[schema]) -# -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # Each kind must have nodes -# for kind in (PERSON_KIND, CAR_KIND, MANUFACTURER_KIND, TAG_KIND): -# assert await client.all(kind=kind) -# -# async def test_step07_import_initial_dataset_with_existing_data( -# self, client: InfrahubClient, temporary_directory: Path, initial_dataset -# ): -# # Count existing nodes -# counters: Dict[str, int] = {} -# for kind in (PERSON_KIND, CAR_KIND, MANUFACTURER_KIND, TAG_KIND): -# nodes = await client.all(kind=kind) -# counters[kind] = len(nodes) -# -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # Nodes must not be duplicated -# for kind in (PERSON_KIND, CAR_KIND, MANUFACTURER_KIND, TAG_KIND): -# nodes = await client.all(kind=kind) -# assert len(nodes) == counters[kind] -# -# # Cleanup for next tests -# self.reset_export_directory(temporary_directory) -# -# async def test_step99_import_wrong_drectory(self, client: InfrahubClient): -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# # Using a directory that does not exist, should lead to exception -# with pytest.raises(TransferFileNotFoundError): -# await importer.import_data(import_directory=Path("this_directory_does_not_exist"), branch="main") -# -# -# class TestSchemaExportImportManyRelationships(TestInfrahubApp): -# @pytest.fixture(scope="class") -# def temporary_directory(self, tmp_path_factory) -> Path: -# return tmp_path_factory.mktemp("infrahub-integration-tests") -# -# @pytest.fixture(scope="class") -# def schema_pool_base(self) -> Dict[str, Any]: -# return { -# "name": "Pool", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Pool", -# "attributes": [{"name": "name", "kind": "Text"}, {"name": "description", "kind": "Text", "optional": True}], -# "relationships": [ -# { -# "name": "cars", -# "kind": "Attribute", -# "optional": True, -# "peer": "TestingCar", -# "cardinality": "many", -# "identifier": "car__pool", -# } -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema_car_base(self) -> Dict[str, Any]: -# return { -# "name": "Car", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Car", -# "attributes": [ -# {"name": "name", "kind": "Text"}, -# {"name": "description", "kind": "Text", "optional": True}, -# {"name": "color", "kind": "Text"}, -# ], -# "relationships": [ -# { -# "name": "pools", -# "kind": "Attribute", -# "optional": True, -# "peer": "TestingPool", -# "cardinality": "many", -# "identifier": "car__pool", -# }, -# { -# "name": "manufacturer", -# "kind": "Attribute", -# "optional": False, -# "peer": "TestingManufacturer", -# "cardinality": "one", -# "identifier": "car__manufacturer", -# }, -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema_manufacturer_base(self) -> Dict[str, Any]: -# return { -# "name": "Manufacturer", -# "namespace": "Testing", -# "include_in_menu": True, -# "label": "Manufacturer", -# "attributes": [{"name": "name", "kind": "Text"}, {"name": "description", "kind": "Text", "optional": True}], -# "relationships": [ -# { -# "name": "cars", -# "kind": "Generic", -# "optional": True, -# "peer": "TestingCar", -# "cardinality": "many", -# "identifier": "car__manufacturer", -# } -# ], -# } -# -# @pytest.fixture(scope="class") -# def schema(self, schema_car_base, schema_pool_base, schema_manufacturer_base) -> Dict[str, Any]: -# return { -# "version": "1.0", -# "nodes": [schema_pool_base, schema_car_base, schema_manufacturer_base], -# } -# -# @pytest.fixture(scope="class") -# async def initial_dataset(self, client: InfrahubClient, schema): -# await client.schema.load(schemas=[schema]) -# -# bmw = await client.create( -# kind=MANUFACTURER_KIND, -# data=dict( -# name="BMW", -# description="Bayerische Motoren Werke AG is a German multinational manufacturer of luxury vehicles and motorcycles", -# ), -# ) -# await bmw.save() -# -# fiat = await client.create( -# kind=MANUFACTURER_KIND, -# data=dict(name="Fiat", description="Fiat Automobiles S.p.A. is an Italian automobile manufacturer"), -# ) -# await fiat.save() -# -# five_series = await client.create( -# kind=CAR_KIND, data=dict(name="5 series", description="BMW 5 series", color="#000000", manufacturer=bmw) -# ) -# await five_series.save() -# -# five_hundred = await client.create( -# kind=CAR_KIND, data=dict(name="500", description="Fiat 500", color="#540302", manufacturer=fiat) -# ) -# await five_hundred.save() -# -# premium = await client.create( -# kind=POOL_KIND, data=dict(name="Premium", description="Premium cars", cars=[five_series]) -# ) -# await premium.save() -# -# compact = await client.create( -# kind=POOL_KIND, data=dict(name="Compact", description="Compact cars", cars=[five_hundred]) -# ) -# await compact.save() -# -# sedan = await client.create( -# kind=POOL_KIND, data=dict(name="Sedan", description="Sedan cars", cars=[five_series]) -# ) -# await sedan.save() -# -# city_cars = await client.create( -# kind=POOL_KIND, data=dict(name="City", description="City cars", cars=[five_hundred]) -# ) -# await city_cars.save() -# -# objs = { -# "bmw": bmw.id, -# "fiat": fiat.id, -# "5series": five_series.id, -# "500": five_hundred.id, -# "premium": premium.id, -# "compact": compact.id, -# "sedan": sedan.id, -# "city_cars": city_cars.id, -# } -# -# return objs -# -# def reset_export_directory(self, temporary_directory: Path): -# for file in temporary_directory.iterdir(): -# if file.is_file(): -# file.unlink() -# -# async def test_step01_export_initial_dataset( -# self, client: InfrahubClient, temporary_directory: Path, initial_dataset -# ): -# exporter = LineDelimitedJSONExporter(client=client) -# await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) -# -# nodes_file = temporary_directory / "nodes.json" -# relationships_file = temporary_directory / "relationships.json" -# -# # Export should create files -# assert nodes_file.exists() -# assert relationships_file.exists() -# -# # Verify that nodes have been exported -# nodes_dump = [] -# with nodes_file.open() as reader: -# while line := reader.readline(): -# nodes_dump.append(ujson.loads(line)) -# assert len(nodes_dump) == len(initial_dataset) + 5 # add number to account for default data -# -# # Make sure there are as many relationships as there are in the database -# relationship_count = 0 -# for node in await client.all(kind=POOL_KIND): -# await node.cars.fetch() -# relationship_count += len(node.cars.peers) -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert len(relationships_dump) == relationship_count + 1 # add number to account for default data -# -# async def test_step02_import_initial_dataset(self, client: InfrahubClient, temporary_directory: Path, schema): -# await client.schema.load(schemas=[schema]) -# -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # Each kind must have nodes -# for kind in (POOL_KIND, CAR_KIND, MANUFACTURER_KIND): -# assert await client.all(kind=kind) -# -# # Make sure relationships were properly imported -# relationship_count = 0 -# for node in await client.all(kind=POOL_KIND): -# await node.cars.fetch() -# relationship_count += len(node.cars.peers) -# relationships_file = temporary_directory / "relationships.json" -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert len(relationships_dump) == relationship_count + 1 # add number to account for default data -# -# async def test_step03_import_initial_dataset_with_existing_data( -# self, client: InfrahubClient, temporary_directory: Path, initial_dataset -# ): -# importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) -# await importer.import_data(import_directory=temporary_directory, branch="main") -# -# # Each kind must have nodes -# for kind in (POOL_KIND, CAR_KIND, MANUFACTURER_KIND): -# assert await client.all(kind=kind) -# -# # Make sure relationships were properly imported -# relationship_count = 0 -# for node in await client.all(kind=POOL_KIND): -# await node.cars.fetch() -# relationship_count += len(node.cars.peers) -# relationships_file = temporary_directory / "relationships.json" -# relationships_dump = ujson.loads(relationships_file.read_text()) -# assert len(relationships_dump) == relationship_count + 1 # add number to account for default data -# -# # Cleanup for next tests -# self.reset_export_directory(temporary_directory) +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import pytest +import ujson + +from infrahub_sdk.exceptions import SchemaNotFoundError +from infrahub_sdk.testing.docker import TestInfrahubDockerClient +from infrahub_sdk.testing.schemas.car_person import TESTING_CAR, TESTING_MANUFACTURER, TESTING_PERSON, SchemaCarPerson +from infrahub_sdk.transfer.exceptions import TransferFileNotFoundError +from infrahub_sdk.transfer.exporter.json import LineDelimitedJSONExporter +from infrahub_sdk.transfer.importer.json import LineDelimitedJSONImporter +from infrahub_sdk.transfer.schema_sorter import InfrahubSchemaTopologicalSorter + +if TYPE_CHECKING: + from pytest import TempPathFactory + + from infrahub_sdk import InfrahubClient + from infrahub_sdk.node import InfrahubNode + from infrahub_sdk.schema import SchemaRoot + + +class TestSchemaExportImportBase(TestInfrahubDockerClient, SchemaCarPerson): + @pytest.fixture(scope="class") + def temporary_directory(self, tmp_path_factory: TempPathFactory) -> Path: + return tmp_path_factory.mktemp("infrahub-integration-tests") + + @pytest.fixture(scope="class") + async def load_schema(self, client: InfrahubClient, schema_base: SchemaRoot) -> None: + resp = await client.schema.load(schemas=[schema_base.to_schema_dict()], wait_until_converged=True) + assert resp.errors == {} + + @pytest.fixture(scope="class") + async def initial_dataset( + self, + client: InfrahubClient, + load_schema: None, + person_joe: InfrahubNode, + person_jane: InfrahubNode, + tag_blue: InfrahubNode, + tag_red: InfrahubNode, + ) -> dict[str, Any]: + honda = await client.create(kind=TESTING_MANUFACTURER, name="Honda", description="Honda Motor Co., Ltd") + await honda.save() + + renault = await client.create(kind=TESTING_MANUFACTURER, name="Renault", description="Groupe Renault") + await renault.save() + + accord = await client.create( + kind=TESTING_CAR, + name="Accord", + description="Honda Accord", + color="#3443eb", + manufacturer=honda, + owner=person_jane, + ) + await accord.save() + + civic = await client.create( + kind=TESTING_CAR, + name="Civic", + description="Honda Civic", + color="#c9eb34", + manufacturer=honda, + owner=person_jane, + ) + await civic.save() + + megane = await client.create( + kind=TESTING_CAR, + name="Megane", + description="Renault Megane", + color="#c93420", + manufacturer=renault, + owner=person_joe, + ) + await megane.save() + + await accord.tags.fetch() + accord.tags.add(tag_blue) + await accord.save() + + await civic.tags.fetch() + civic.tags.add(tag_blue) + await civic.save() + + return { + "joe": person_joe.id, + "jane": person_jane.id, + "honda": honda.id, + "renault": renault.id, + "accord": accord.id, + "civic": civic.id, + "megane": megane.id, + "blue": tag_blue.id, + "red": tag_red.id, + } + + def reset_export_directory(self, temporary_directory: Path) -> None: + for file in temporary_directory.iterdir(): + if file.is_file(): + file.unlink() + + async def test_step01_export_no_schema(self, client: InfrahubClient, temporary_directory: Path) -> None: + exporter = LineDelimitedJSONExporter(client=client) + await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) + + nodes_file = temporary_directory / "nodes.json" + relationships_file = temporary_directory / "relationships.json" + + assert nodes_file.exists() + assert relationships_file.exists() + + admin_found = False + with nodes_file.open() as f: + for line in f: + node_dump = ujson.loads(line) + if node_dump.get("kind") == "CoreAccount": + graphql_data = ujson.loads(node_dump["graphql_json"]) + if graphql_data.get("name", {}).get("value") == "admin": + admin_found = True + break + assert admin_found, "Admin account not found in exported nodes" + + async def test_step02_import_no_schema(self, client: InfrahubClient, temporary_directory: Path) -> None: + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + await importer.import_data(import_directory=temporary_directory, branch="main") + + for kind in (TESTING_PERSON, TESTING_CAR, TESTING_MANUFACTURER): + with pytest.raises(SchemaNotFoundError): + await client.all(kind=kind) + + self.reset_export_directory(temporary_directory) + + async def test_step03_export_initial_dataset( + self, client: InfrahubClient, temporary_directory: Path, initial_dataset: dict[str, Any] + ) -> None: + exporter = LineDelimitedJSONExporter(client=client) + await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) + + nodes_file = temporary_directory / "nodes.json" + relationships_file = temporary_directory / "relationships.json" + + assert nodes_file.exists() + assert relationships_file.exists() + + nodes_dump = [] + with nodes_file.open() as reader: + while line := reader.readline(): + nodes_dump.append(ujson.loads(line)) + assert len(nodes_dump) >= len(initial_dataset) + + relationships_dump = ujson.loads(relationships_file.read_text()) + assert relationships_dump + + async def test_step04_import_initial_dataset( + self, client: InfrahubClient, temporary_directory: Path, load_schema: None + ) -> None: + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + await importer.import_data(import_directory=temporary_directory, branch="main") + + for kind in (TESTING_PERSON, TESTING_CAR, TESTING_MANUFACTURER): + assert await client.all(kind=kind) + + async def test_step05_import_initial_dataset_with_existing_data( + self, client: InfrahubClient, temporary_directory: Path, initial_dataset: dict[str, Any] + ) -> None: + counters: dict[str, int] = {} + for kind in (TESTING_PERSON, TESTING_CAR, TESTING_MANUFACTURER): + nodes = await client.all(kind=kind) + counters[kind] = len(nodes) + + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + await importer.import_data(import_directory=temporary_directory, branch="main") + + for kind in (TESTING_PERSON, TESTING_CAR, TESTING_MANUFACTURER): + nodes = await client.all(kind=kind) + assert len(nodes) == counters[kind] + + self.reset_export_directory(temporary_directory) + + async def test_step99_import_wrong_directory(self, client: InfrahubClient) -> None: + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + with pytest.raises(TransferFileNotFoundError): + await importer.import_data(import_directory=Path("this_directory_does_not_exist"), branch="main") + + +class TestSchemaExportImportManyRelationships(TestInfrahubDockerClient, SchemaCarPerson): + @pytest.fixture(scope="class") + def temporary_directory(self, tmp_path_factory: TempPathFactory) -> Path: + return tmp_path_factory.mktemp("infrahub-integration-tests-many") + + @pytest.fixture(scope="class") + async def load_schema(self, client: InfrahubClient, schema_base: SchemaRoot) -> None: + resp = await client.schema.load(schemas=[schema_base.to_schema_dict()], wait_until_converged=True) + assert resp.errors == {} + + @pytest.fixture(scope="class") + async def initial_dataset( + self, + client: InfrahubClient, + load_schema: None, + person_joe: InfrahubNode, + person_jane: InfrahubNode, + tag_blue: InfrahubNode, + tag_red: InfrahubNode, + tag_green: InfrahubNode, + ) -> dict[str, Any]: + bmw = await client.create( + kind=TESTING_MANUFACTURER, + name="BMW", + description="Bayerische Motoren Werke AG is a German multinational manufacturer", + ) + await bmw.save() + + fiat = await client.create( + kind=TESTING_MANUFACTURER, + name="Fiat", + description="Fiat Automobiles S.p.A. is an Italian automobile manufacturer", + ) + await fiat.save() + + five_series = await client.create( + kind=TESTING_CAR, + name="5 series", + description="BMW 5 series", + color="#000000", + manufacturer=bmw, + owner=person_joe, + ) + await five_series.save() + + five_hundred = await client.create( + kind=TESTING_CAR, + name="500", + description="Fiat 500", + color="#540302", + manufacturer=fiat, + owner=person_jane, + ) + await five_hundred.save() + + await five_series.tags.fetch() + five_series.tags.add(tag_blue) + five_series.tags.add(tag_green) + await five_series.save() + + await five_hundred.tags.fetch() + five_hundred.tags.add(tag_red) + five_hundred.tags.add(tag_green) + await five_hundred.save() + + return { + "bmw": bmw.id, + "fiat": fiat.id, + "5series": five_series.id, + "500": five_hundred.id, + "blue": tag_blue.id, + "red": tag_red.id, + "green": tag_green.id, + } + + def reset_export_directory(self, temporary_directory: Path) -> None: + for file in temporary_directory.iterdir(): + if file.is_file(): + file.unlink() + + async def test_step01_export_initial_dataset( + self, client: InfrahubClient, temporary_directory: Path, initial_dataset: dict[str, Any] + ) -> None: + exporter = LineDelimitedJSONExporter(client=client) + await exporter.export(export_directory=temporary_directory, branch="main", namespaces=[]) + + nodes_file = temporary_directory / "nodes.json" + relationships_file = temporary_directory / "relationships.json" + + assert nodes_file.exists() + assert relationships_file.exists() + + nodes_dump = [] + with nodes_file.open() as reader: + while line := reader.readline(): + nodes_dump.append(ujson.loads(line)) + assert len(nodes_dump) >= len(initial_dataset) + + relationships_dump = ujson.loads(relationships_file.read_text()) + assert relationships_dump + + async def test_step02_import_initial_dataset( + self, client: InfrahubClient, temporary_directory: Path, initial_dataset: dict[str, Any] + ) -> None: + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + await importer.import_data(import_directory=temporary_directory, branch="main") + + for kind in (TESTING_CAR, TESTING_MANUFACTURER): + assert await client.all(kind=kind) + + relationship_count = 0 + for node in await client.all(kind=TESTING_CAR): + await node.tags.fetch() + relationship_count += len(node.tags.peers) + assert relationship_count >= 4 + + async def test_step03_import_initial_dataset_with_existing_data( + self, client: InfrahubClient, temporary_directory: Path, initial_dataset: dict[str, Any] + ) -> None: + relationship_count_before = 0 + for node in await client.all(kind=TESTING_CAR): + await node.tags.fetch() + relationship_count_before += len(node.tags.peers) + + importer = LineDelimitedJSONImporter(client=client, topological_sorter=InfrahubSchemaTopologicalSorter()) + await importer.import_data(import_directory=temporary_directory, branch="main") + + for kind in (TESTING_CAR, TESTING_MANUFACTURER): + assert await client.all(kind=kind) + + relationship_count_after = 0 + for node in await client.all(kind=TESTING_CAR): + await node.tags.fetch() + relationship_count_after += len(node.tags.peers) + + assert relationship_count_after == relationship_count_before + + self.reset_export_directory(temporary_directory) From 4c7fcac511a0c7c14f2663419836d91304443b5a Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 13:31:57 +0100 Subject: [PATCH 08/10] Clean ruff ANN001 for integration tests --- pyproject.toml | 10 +-- tests/integration/test_convert_object_type.py | 9 ++- tests/integration/test_infrahub_client.py | 65 +++++++++++-------- .../integration/test_infrahub_client_sync.py | 6 +- tests/integration/test_infrahubctl.py | 26 ++++---- tests/integration/test_node.py | 54 +++++++++------ tests/integration/test_repository.py | 4 +- 7 files changed, 103 insertions(+), 71 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 59fa2ad5..b90696c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -347,13 +347,9 @@ max-complexity = 17 "tests/unit/sdk/test_object_store.py" = ["ANN001"] # 7 errors "tests/unit/sdk/graphql/test_query.py" = ["ANN001"] # 7 errors -# tests/integration/ - 60 errors total -"tests/integration/test_infrahub_client.py" = ["ANN001", "PLR0904"] # 32 errors -"tests/integration/test_infrahub_client_sync.py" = ["ANN001", "PLR0904"] # 32 errors -"tests/integration/test_node.py" = ["ANN001"] # 15 errors -"tests/integration/test_infrahubctl.py" = ["ANN001"] # 9 errors -"tests/integration/test_convert_object_type.py" = ["ANN001"] # 3 errors -"tests/integration/test_repository.py" = ["ANN001"] # 1 error +# tests/integration/ +"tests/integration/test_infrahub_client.py" = ["PLR0904"] +"tests/integration/test_infrahub_client_sync.py" = ["PLR0904"] "tasks.py" = [ "PLC0415", # `import` should be at the top-level of a file diff --git a/tests/integration/test_convert_object_type.py b/tests/integration/test_convert_object_type.py index 7aee141a..f5581e2f 100644 --- a/tests/integration/test_convert_object_type.py +++ b/tests/integration/test_convert_object_type.py @@ -1,7 +1,7 @@ from __future__ import annotations import uuid -from typing import Any +from typing import TYPE_CHECKING, Any import pytest @@ -9,6 +9,9 @@ from infrahub_sdk.testing.docker import TestInfrahubDockerClient from tests.constants import CLIENT_TYPE_ASYNC, CLIENT_TYPES +if TYPE_CHECKING: + from infrahub_sdk import InfrahubClient, InfrahubClientSync + SCHEMA: dict[str, Any] = { "version": "1.0", "generics": [ @@ -63,7 +66,9 @@ class TestConvertObjectType(TestInfrahubDockerClient): @pytest.mark.parametrize("client_type", CLIENT_TYPES) - async def test_convert_object_type(self, client, client_sync, client_type) -> None: + async def test_convert_object_type( + self, client: InfrahubClient, client_sync: InfrahubClientSync, client_type: str + ) -> None: resp = await client.schema.load(schemas=[SCHEMA], wait_until_converged=True) assert not resp.errors diff --git a/tests/integration/test_infrahub_client.py b/tests/integration/test_infrahub_client.py index b67b022a..2fe9d801 100644 --- a/tests/integration/test_infrahub_client.py +++ b/tests/integration/test_infrahub_client.py @@ -2,6 +2,7 @@ from collections.abc import AsyncGenerator from pathlib import Path +from typing import Any import pytest @@ -24,14 +25,14 @@ class TestInfrahubNode(TestInfrahubDockerClient, SchemaAnimal): async def base_dataset( self, client: InfrahubClient, - load_schema, - person_liam, - person_ethan, - person_sophia, - cat_luna, - cat_bella, - dog_daisy, - dog_rocky, + load_schema: None, + person_liam: InfrahubNode, + person_ethan: InfrahubNode, + person_sophia: InfrahubNode, + cat_luna: InfrahubNode, + cat_bella: InfrahubNode, + dog_daisy: InfrahubNode, + dog_rocky: InfrahubNode, ) -> None: await client.branch.create(branch_name="branch01") @@ -42,7 +43,7 @@ async def set_pagination_size3(self, client: InfrahubClient) -> AsyncGenerator: yield client.pagination_size = original_pagination_size - async def test_query_branches(self, client: InfrahubClient, base_dataset) -> None: + async def test_query_branches(self, client: InfrahubClient, base_dataset: None) -> None: branches = await client.branch.all() main = await client.branch.get(branch_name="main") @@ -53,7 +54,7 @@ async def test_query_branches(self, client: InfrahubClient, base_dataset) -> Non assert "main" in branches assert "branch01" in branches - async def test_branch_delete(self, client: InfrahubClient, base_dataset) -> None: + async def test_branch_delete(self, client: InfrahubClient, base_dataset: None) -> None: async_branch = "async-delete-branch" await client.branch.create(branch_name=async_branch) pre_delete = await client.branch.all() @@ -62,25 +63,27 @@ async def test_branch_delete(self, client: InfrahubClient, base_dataset) -> None assert async_branch in pre_delete assert async_branch not in post_delete - async def test_get_all(self, client: InfrahubClient, base_dataset) -> None: + async def test_get_all(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.all(kind=TESTING_CAT) assert len(nodes) == 2 assert isinstance(nodes[0], InfrahubNode) assert [node.name.value for node in nodes] == ["Bella", "Luna"] - async def test_get_all_no_order(self, client: InfrahubClient, base_dataset) -> None: + async def test_get_all_no_order(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.all(kind=TESTING_CAT, order=Order(disable=True)) assert len(nodes) == 2 assert isinstance(nodes[0], InfrahubNode) assert {node.name.value for node in nodes} == {"Bella", "Luna"} - async def test_get_filters_no_order(self, client: InfrahubClient, base_dataset) -> None: + async def test_get_filters_no_order(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.filters(kind=TESTING_CAT, order=Order(disable=True)) assert len(nodes) == 2 assert isinstance(nodes[0], InfrahubNode) assert {node.name.value for node in nodes} == {"Bella", "Luna"} - async def test_get_one(self, client: InfrahubClient, base_dataset, cat_luna, person_sophia) -> None: + async def test_get_one( + self, client: InfrahubClient, base_dataset: None, cat_luna: InfrahubNode, person_sophia: InfrahubNode + ) -> None: node1 = await client.get(kind=TESTING_CAT, id=cat_luna.id) assert isinstance(node1, InfrahubNode) assert node1.name.value == "Luna" @@ -89,7 +92,7 @@ async def test_get_one(self, client: InfrahubClient, base_dataset, cat_luna, per assert isinstance(node2, InfrahubNode) assert node2.name.value == "Sophia Walker" - async def test_filters_partial_match(self, client: InfrahubClient, base_dataset) -> None: + async def test_filters_partial_match(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.filters(kind=TESTING_PERSON, name__value="Walker") assert not nodes @@ -98,17 +101,19 @@ async def test_filters_partial_match(self, client: InfrahubClient, base_dataset) assert isinstance(nodes[0], InfrahubNode) assert sorted([node.name.value for node in nodes]) == ["Liam Walker", "Sophia Walker"] - async def test_get_generic(self, client: InfrahubClient, base_dataset) -> None: + async def test_get_generic(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.all(kind=TESTING_ANIMAL) assert len(nodes) == 4 - async def test_get_generic_fragment(self, client: InfrahubClient, base_dataset) -> None: + async def test_get_generic_fragment(self, client: InfrahubClient, base_dataset: None) -> None: nodes = await client.all(kind=TESTING_ANIMAL, fragment=True) assert len(nodes) assert nodes[0].typename in {TESTING_DOG, TESTING_CAT} assert nodes[0].breed.value is not None - async def test_get_generic_filter_source(self, client: InfrahubClient, base_dataset, person_liam) -> None: + async def test_get_generic_filter_source( + self, client: InfrahubClient, base_dataset: None, person_liam: InfrahubNode + ) -> None: admin = await client.get(kind="CoreAccount", name__value="admin") obj = await client.create( @@ -121,7 +126,9 @@ async def test_get_generic_filter_source(self, client: InfrahubClient, base_data assert nodes[0].typename == TESTING_CAT assert nodes[0].id == obj.id - async def test_get_related_nodes(self, client: InfrahubClient, base_dataset, person_ethan) -> None: + async def test_get_related_nodes( + self, client: InfrahubClient, base_dataset: None, person_ethan: InfrahubNode + ) -> None: ethan = await client.get(kind=TESTING_PERSON, id=person_ethan.id) assert ethan @@ -129,15 +136,15 @@ async def test_get_related_nodes(self, client: InfrahubClient, base_dataset, per await ethan.animals.fetch() assert len(ethan.animals.peers) == 3 - async def test_count(self, client: InfrahubClient, base_dataset) -> None: + async def test_count(self, client: InfrahubClient, base_dataset: None) -> None: count = await client.count(kind=TESTING_PERSON) assert count == 3 - async def test_count_with_filter(self, client: InfrahubClient, base_dataset) -> None: + async def test_count_with_filter(self, client: InfrahubClient, base_dataset: None) -> None: count = await client.count(kind=TESTING_PERSON, name__values=["Liam Walker", "Ethan Carter"]) assert count == 2 - async def test_profile(self, client: InfrahubClient, base_dataset, person_liam) -> None: + async def test_profile(self, client: InfrahubClient, base_dataset: None, person_liam: InfrahubNode) -> None: profile_schema_kind = f"Profile{TESTING_DOG}" profile_schema = await client.schema.get(kind=profile_schema_kind) assert isinstance(profile_schema, ProfileSchemaAPI) @@ -157,7 +164,7 @@ async def test_profile(self, client: InfrahubClient, base_dataset, person_liam) @pytest.mark.xfail(reason="Require Infrahub v1.7") async def test_profile_relationship_is_from_profile( - self, client: InfrahubClient, base_dataset, person_liam + self, client: InfrahubClient, base_dataset: None, person_liam: InfrahubNode ) -> None: tag = await client.create(kind="BuiltinTag", name="profile-tag-test") await tag.save() @@ -178,12 +185,12 @@ async def test_profile_relationship_is_from_profile( assert fetched_person.tags.peers[0].is_from_profile assert fetched_person.tags.is_from_profile - async def test_create_branch(self, client: InfrahubClient, base_dataset) -> None: + async def test_create_branch(self, client: InfrahubClient, base_dataset: None) -> None: branch = await client.branch.create(branch_name="new-branch-1") assert isinstance(branch, BranchData) assert branch.id is not None - async def test_create_branch_async(self, client: InfrahubClient, base_dataset) -> None: + async def test_create_branch_async(self, client: InfrahubClient, base_dataset: None) -> None: task_id = await client.branch.create(branch_name="new-branch-2", wait_until_completion=False) assert isinstance(task_id, str) @@ -215,7 +222,9 @@ async def test_create_generic_rel_with_hfid( person_sophia = await client.get(kind=TESTING_PERSON, id=person_sophia.id, prefetch_relationships=True) assert not person_sophia.favorite_animal.id - async def test_task_query(self, client: InfrahubClient, base_dataset, set_pagination_size3) -> None: + async def test_task_query( + self, client: InfrahubClient, base_dataset: None, set_pagination_size3: AsyncGenerator[None, None] + ) -> None: nbr_tasks = await client.task.count() assert nbr_tasks @@ -259,7 +268,7 @@ async def test_task_query(self, client: InfrahubClient, base_dataset, set_pagina assert all_logs[0].timestamp assert all_logs[0].severity - async def test_tracking_mode(self, client: InfrahubClient, base_dataset) -> None: + async def test_tracking_mode(self, client: InfrahubClient, base_dataset: None) -> None: tag_names = ["BLUE", "RED", "YELLOW"] person_name = "TrackingTestPerson" @@ -346,7 +355,7 @@ async def test_recorder_with_playback_rewrite_host( class TestHierarchicalSchema(TestInfrahubDockerClient): @pytest.fixture(scope="class") - async def load_hierarchical_schema(self, client: InfrahubClient, hierarchical_schema: dict) -> None: + async def load_hierarchical_schema(self, client: InfrahubClient, hierarchical_schema: dict[str, Any]) -> None: resp = await client.schema.load(schemas=[hierarchical_schema], wait_until_converged=True) assert resp.errors == {} diff --git a/tests/integration/test_infrahub_client_sync.py b/tests/integration/test_infrahub_client_sync.py index f156983c..f13bad5b 100644 --- a/tests/integration/test_infrahub_client_sync.py +++ b/tests/integration/test_infrahub_client_sync.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import pytest @@ -355,7 +355,9 @@ def test_recorder_with_playback_rewrite_host(self, base_dataset: None, tmp_path: class TestHierarchicalSchema(TestInfrahubDockerClient): @pytest.fixture(scope="class") - async def load_hierarchical_schema(self, client_sync: InfrahubClientSync, hierarchical_schema: dict) -> None: + async def load_hierarchical_schema( + self, client_sync: InfrahubClientSync, hierarchical_schema: dict[str, Any] + ) -> None: resp = client_sync.schema.load(schemas=[hierarchical_schema], wait_until_converged=True) assert resp.errors == {} diff --git a/tests/integration/test_infrahubctl.py b/tests/integration/test_infrahubctl.py index a4fa197b..f9b47c42 100644 --- a/tests/integration/test_infrahubctl.py +++ b/tests/integration/test_infrahubctl.py @@ -4,7 +4,6 @@ import os import shutil import tempfile -from collections.abc import Generator from pathlib import Path from typing import TYPE_CHECKING @@ -20,7 +19,10 @@ from tests.helpers.utils import change_directory, strip_color if TYPE_CHECKING: + from collections.abc import Generator + from infrahub_sdk import InfrahubClient + from infrahub_sdk.node import InfrahubNode FIXTURE_BASE_DIR = Path(Path(Path(__file__).resolve()).parent / ".." / "fixtures") @@ -33,20 +35,20 @@ class TestInfrahubCtl(TestInfrahubDockerClient, SchemaAnimal): async def base_dataset( self, client: InfrahubClient, - load_schema, - person_liam, - person_ethan, - person_sophia, - cat_luna, - cat_bella, - dog_daisy, - dog_rocky, - ctl_client_config, + load_schema: None, + person_liam: InfrahubNode, + person_ethan: InfrahubNode, + person_sophia: InfrahubNode, + cat_luna: InfrahubNode, + cat_bella: InfrahubNode, + dog_daisy: InfrahubNode, + dog_rocky: InfrahubNode, + ctl_client_config: Generator[None, None, None], ) -> None: await client.branch.create(branch_name="branch01") @pytest.fixture(scope="class") - def repository(self) -> Generator[str]: + def repository(self) -> Generator[str, None, None]: temp_dir = tempfile.mkdtemp() try: @@ -61,7 +63,7 @@ def repository(self) -> Generator[str]: shutil.rmtree(temp_dir) @pytest.fixture(scope="class") - def ctl_client_config(self, client: InfrahubClient) -> Generator: + def ctl_client_config(self, client: InfrahubClient) -> Generator[None, None, None]: load_configuration(value="infrahubctl.toml") assert config.SETTINGS._settings config.SETTINGS._settings.server_address = client.config.address diff --git a/tests/integration/test_node.py b/tests/integration/test_node.py index ded99ac1..eb629e6e 100644 --- a/tests/integration/test_node.py +++ b/tests/integration/test_node.py @@ -50,10 +50,15 @@ async def test_node_delete(self, client: InfrahubClient, initial_schema: None) - await client.get(kind=TESTING_MANUFACTURER, id=obj.id) async def test_node_create_with_relationships( - self, default_branch: str, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe + self, + default_branch: str, + client: InfrahubClient, + initial_schema: None, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, ) -> None: node = await client.create( - kind=TESTING_CAR, name="Tiguan", color="Black", manufacturer=manufacturer_mercedes.id, owner=person_joe.id + kind=TESTING_CAR, name="CLS", color="Black", manufacturer=manufacturer_mercedes.id, owner=person_joe.id ) await node.save() assert node.id is not None @@ -67,13 +72,13 @@ async def test_node_create_with_relationships_using_related_node( default_branch: str, client: InfrahubClient, initial_schema: None, - manufacturer_mercedes, - car_golf, - person_joe, + manufacturer_mercedes: InfrahubNode, + car_golf: InfrahubNode, + person_joe: InfrahubNode, ) -> None: related_node = car_golf.owner node = await client.create( - kind=TESTING_CAR, name="Tiguan", color="Black", manufacturer=manufacturer_mercedes, owner=related_node + kind=TESTING_CAR, name="CLS", color="Black", manufacturer=manufacturer_mercedes, owner=related_node ) await node.save(allow_upsert=True) assert node.id is not None @@ -89,13 +94,13 @@ async def test_node_filters_include( default_branch: str, client: InfrahubClient, initial_schema: None, - manufacturer_mercedes, - person_joe, - tag_red, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + tag_red: InfrahubNode, ) -> None: car = await client.create( kind=TESTING_CAR, - name="Tiguan2", + name="CLS AMG", color="Black", manufacturer=manufacturer_mercedes, owner=person_joe, @@ -191,13 +196,13 @@ async def test_node_update( default_branch: str, client: InfrahubClient, initial_schema: None, - manufacturer_mercedes, - person_joe, - person_jane, - car_golf, - tag_blue, - tag_red, - tag_green, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + person_jane: InfrahubNode, + car_golf: InfrahubNode, + tag_blue: InfrahubNode, + tag_red: InfrahubNode, + tag_green: InfrahubNode, ) -> None: car_golf.color.value = "White" await car_golf.tags.fetch() @@ -220,7 +225,12 @@ async def test_node_update( assert sorted([tag.id for tag in car3.tags.peers]) == sorted([tag_green.id, tag_blue.id]) async def test_relationship_manager_errors_without_fetch( - self, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe, tag_blue + self, + client: InfrahubClient, + initial_schema: None, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + tag_blue: InfrahubNode, ) -> None: car = await client.create( kind=TESTING_CAR, name="UnfetchedCar", color="Blue", manufacturer=manufacturer_mercedes, owner=person_joe @@ -239,7 +249,13 @@ async def test_relationship_manager_errors_without_fetch( assert {t.id for t in car.tags.peers} == {tag_blue.id} async def test_relationships_not_overwritten( - self, client: InfrahubClient, initial_schema: None, manufacturer_mercedes, person_joe, tag_blue, tag_red + self, + client: InfrahubClient, + initial_schema: None, + manufacturer_mercedes: InfrahubNode, + person_joe: InfrahubNode, + tag_blue: InfrahubNode, + tag_red: InfrahubNode, ) -> None: car = await client.create( kind=TESTING_CAR, diff --git a/tests/integration/test_repository.py b/tests/integration/test_repository.py index 37bc7089..942e3bc4 100644 --- a/tests/integration/test_repository.py +++ b/tests/integration/test_repository.py @@ -7,11 +7,13 @@ from infrahub_sdk.utils import get_fixtures_dir if TYPE_CHECKING: + from pathlib import Path + from infrahub_sdk import InfrahubClient class TestInfrahubRepository(TestInfrahubDockerClient): - async def test_add_repository(self, client: InfrahubClient, remote_repos_dir) -> None: + async def test_add_repository(self, client: InfrahubClient, remote_repos_dir: Path) -> None: src_directory = get_fixtures_dir() / "integration/mock_repo" repo = GitRepo(name="mock_repo", src_directory=src_directory, dst_directory=remote_repos_dir) commit = repo._repo.git[repo._repo.git.head()] From c5b7aa62603a27734f5b6720a5759f4a205ce805 Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 14:19:22 +0100 Subject: [PATCH 09/10] Add changelog record --- changelog/187.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/187.fixed.md diff --git a/changelog/187.fixed.md b/changelog/187.fixed.md new file mode 100644 index 00000000..1911c8dc --- /dev/null +++ b/changelog/187.fixed.md @@ -0,0 +1 @@ +Rewrite and re-enable integration tests \ No newline at end of file From 0c798dc7b2a1e832a7f95945917247630e0ed61f Mon Sep 17 00:00:00 2001 From: Guillaume Mazoyer Date: Fri, 2 Jan 2026 14:29:47 +0100 Subject: [PATCH 10/10] Unneeded async --- tests/integration/test_infrahub_client_sync.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_infrahub_client_sync.py b/tests/integration/test_infrahub_client_sync.py index f13bad5b..472c3378 100644 --- a/tests/integration/test_infrahub_client_sync.py +++ b/tests/integration/test_infrahub_client_sync.py @@ -355,13 +355,11 @@ def test_recorder_with_playback_rewrite_host(self, base_dataset: None, tmp_path: class TestHierarchicalSchema(TestInfrahubDockerClient): @pytest.fixture(scope="class") - async def load_hierarchical_schema( - self, client_sync: InfrahubClientSync, hierarchical_schema: dict[str, Any] - ) -> None: + def load_hierarchical_schema(self, client_sync: InfrahubClientSync, hierarchical_schema: dict[str, Any]) -> None: resp = client_sync.schema.load(schemas=[hierarchical_schema], wait_until_converged=True) assert resp.errors == {} - async def test_hierarchical(self, client_sync: InfrahubClientSync, load_hierarchical_schema: None) -> None: + def test_hierarchical(self, client_sync: InfrahubClientSync, load_hierarchical_schema: None) -> None: location_country = client_sync.create( kind="LocationCountry", name="country_name", shortname="country_shortname" )