From b3141c4c55e9d2f72195a44533557e46011c2f32 Mon Sep 17 00:00:00 2001 From: Baptiste Date: Sat, 27 Dec 2025 12:08:09 +0100 Subject: [PATCH 1/4] Graphql graph CTL command --- infrahub_sdk/ctl/graphql.py | 101 ++- infrahub_sdk/query_analyzer.py | 760 ++++++++++++++++++ infrahub_sdk/schema/main.py | 66 ++ .../unit/sdk/test_infrahub_query_analyzer.py | 182 +++++ 4 files changed, 1108 insertions(+), 1 deletion(-) create mode 100644 infrahub_sdk/query_analyzer.py create mode 100644 tests/unit/sdk/test_infrahub_query_analyzer.py diff --git a/infrahub_sdk/ctl/graphql.py b/infrahub_sdk/ctl/graphql.py index 6229d42a..10dbf1d8 100644 --- a/infrahub_sdk/ctl/graphql.py +++ b/infrahub_sdk/ctl/graphql.py @@ -16,13 +16,15 @@ ) from ariadne_codegen.settings import ClientSettings, CommentsStrategy from ariadne_codegen.utils import ast_to_str -from graphql import DefinitionNode, GraphQLSchema, NoUnusedFragmentsRule, parse, specified_rules, validate +from graphql import DefinitionNode, GraphQLSchema, NoUnusedFragmentsRule, build_schema, parse, specified_rules, validate from rich.console import Console from ..async_typer import AsyncTyper from ..ctl.client import initialize_client from ..ctl.utils import catch_exception from ..graphql.utils import insert_fragments_inline, remove_fragment_import +from ..query_analyzer import InfrahubQueryAnalyzer +from ..schema import BranchSchema from .parameters import CONFIG_PARAM app = AsyncTyper() @@ -181,3 +183,100 @@ async def generate_return_types( for file_name in package_generator._result_types_files: console.print(f"[green]Generated {file_name} in {directory}") + + +@app.command() +@catch_exception(console=console) +async def check( + query: Path | None = typer.Argument( + None, help="Path to the GraphQL query file or directory. Defaults to current directory if not specified." + ), + branch: str = typer.Option(None, help="Branch to use for schema."), + _: str = CONFIG_PARAM, +) -> None: + """Check if GraphQL queries target single or multiple objects. + + A single-target query is one that will return at most one object per query operation. + This is determined by checking if the query uses uniqueness constraints (like filtering by ID or name). + + Multi-target queries may return multiple objects and should be used with caution in artifact definitions. + """ + query = Path.cwd() if query is None else query + + try: + gql_files = find_gql_files(query) + except FileNotFoundError as exc: + console.print(f"[red]{exc}") + raise typer.Exit(1) from exc + + if not gql_files: + console.print(f"[red]No .gql files found in: {query}") + raise typer.Exit(1) + + client = initialize_client() + + schema_data = await client.schema.all(branch=branch) + branch_schema = BranchSchema(hash="", nodes=schema_data) + + graphql_schema_text = await client.schema.get_graphql_schema() + graphql_schema = build_schema(graphql_schema_text) + + total_files = len(gql_files) + console.print(f"[bold]Checking {total_files} GraphQL file{'s' if total_files > 1 else ''}...[/bold]") + console.print() + + single_target_count = 0 + multi_target_count = 0 + error_count = 0 + + for idx, query_file in enumerate(gql_files, 1): + query_content = query_file.read_text(encoding="utf-8") + + analyzer = InfrahubQueryAnalyzer( + query=query_content, + schema_branch=branch_schema, + schema=graphql_schema, + ) + + console.print(f"[dim]{'─' * 60}[/dim]") + console.print(f"[bold cyan][{idx}/{total_files}][/bold cyan] {query_file}") + + is_valid, errors = analyzer.is_valid + if not is_valid: + console.print("[red] Validation failed:[/red]") + for error in errors or []: + console.print(f" - {error.message}") + error_count += 1 + continue + + report = analyzer.query_report + console.print(f"[bold] Top-level kinds:[/bold] {', '.join(report.top_level_kinds) or 'None'}") + + if not report.top_level_kinds: + console.print("[yellow] Warning: No Infrahub models found in query.[/yellow]") + console.print(" The query may reference types not in the schema, or only use non-model fields.") + error_count += 1 + continue + + if report.only_has_unique_targets: + console.print("[green] Result: Single-target query (good)[/green]") + console.print(" This query targets unique nodes, enabling selective artifact regeneration.") + single_target_count += 1 + else: + console.print("[yellow] Result: Multi-target query[/yellow]") + console.print(" May cause excessive artifact regeneration. Fix: filter by ID or unique attribute.") + multi_target_count += 1 + + console.print(f"[dim]{'─' * 60}[/dim]") + console.print() + console.print("[bold]Summary:[/bold]") + if single_target_count: + console.print(f" [green]{single_target_count} single-target[/green]") + if multi_target_count: + console.print(f" [yellow]{multi_target_count} multi-target[/yellow]") + console.print(" See: https://docs.infrahub.app/topics/graphql") + if error_count: + console.print(f" [red]{error_count} errors[/red]") + + if error_count: + raise typer.Exit(1) diff --git a/infrahub_sdk/query_analyzer.py b/infrahub_sdk/query_analyzer.py new file mode 100644 index 00000000..fc32a463 --- /dev/null +++ b/infrahub_sdk/query_analyzer.py @@ -0,0 +1,760 @@ +from __future__ import annotations + +from collections import deque +from copy import deepcopy +from dataclasses import dataclass, field +from enum import StrEnum +from functools import cached_property +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +from graphql import ( + FieldNode, + FragmentDefinitionNode, + FragmentSpreadNode, + GraphQLSchema, + InlineFragmentNode, + ListTypeNode, + NamedTypeNode, + NonNullTypeNode, + OperationDefinitionNode, + OperationType, + SelectionSetNode, + TypeNode, +) +from graphql.language.ast import ( + BooleanValueNode, + ConstListValueNode, + ConstObjectValueNode, + EnumValueNode, + FloatValueNode, + IntValueNode, + ListValueNode, + NullValueNode, + ObjectValueNode, + StringValueNode, + ValueNode, + VariableNode, +) + +from .analyzer import GraphQLQueryAnalyzer +from .schema import ( + GenericSchemaAPI, + NodeSchemaAPI, + ProfileSchemaAPI, + RelationshipCardinality, + TemplateSchemaAPI, +) + +if TYPE_CHECKING: + from .schema import BranchSchema + +# Type alias for schema types returned by the SDK +MainSchemaTypesAPI = NodeSchemaAPI | GenericSchemaAPI | ProfileSchemaAPI | TemplateSchemaAPI + + +@runtime_checkable +class SchemaBranchProtocol(Protocol): + """Protocol defining the minimal interface required from a schema branch.""" + + @property + def node_names(self) -> list[str]: ... + + @property + def generic_names(self) -> list[str]: ... + + @property + def profile_names(self) -> list[str]: ... + + def get(self, name: str, duplicate: bool = False) -> MainSchemaTypesAPI: ... + + def get_node(self, name: str, duplicate: bool = False) -> NodeSchemaAPI: ... + + def get_generic(self, name: str, duplicate: bool = False) -> GenericSchemaAPI: ... + + def get_profile(self, name: str, duplicate: bool = False) -> ProfileSchemaAPI: ... + + +class MutateAction(StrEnum): + CREATE = "create" + DELETE = "delete" + UPDATE = "update" + + +class ContextType(StrEnum): + EDGE = "edge" + NODE = "node" + DIRECT = "direct" + OBJECT = "object" + + @classmethod + def from_operation(cls, operation: OperationType) -> ContextType: + match operation: + case OperationType.QUERY: + return cls.EDGE + case OperationType.MUTATION: + return cls.OBJECT + case OperationType.SUBSCRIPTION: + return cls.EDGE + + @classmethod + def from_relationship_cardinality(cls, cardinality: str) -> ContextType: + if cardinality in {RelationshipCardinality.MANY, "many"}: + return cls.EDGE + return cls.NODE + + +class GraphQLOperationType(StrEnum): + QUERY = "query" + MUTATION = "mutation" + SUBSCRIPTION = "subscription" + UNDEFINED = "undefined" + + @classmethod + def from_operation(cls, operation: OperationType) -> GraphQLOperationType: + match operation: + case OperationType.QUERY: + return cls.QUERY + case OperationType.MUTATION: + return cls.MUTATION + case OperationType.SUBSCRIPTION: + return cls.SUBSCRIPTION + + +@dataclass +class GraphQLSelectionSet: + field_nodes: list[FieldNode] + fragment_spread_nodes: list[FragmentSpreadNode] + inline_fragment_nodes: list[InlineFragmentNode] + + +@dataclass +class GraphQLArgumentInfo: + name: str + value: Any + kind: str + + @property + def is_variable(self) -> bool: + return self.kind == "variable" + + @property + def as_variable_name(self) -> str: + """Return the name without a $ prefix""" + return str(self.value).removeprefix("$") + + @property + def fields(self) -> list[str]: + if self.kind != "object_value" or not isinstance(self.value, dict): + return [] + return sorted(self.value.keys()) + + +@dataclass +class ObjectAccess: + attributes: set[str] = field(default_factory=set) + relationships: set[str] = field(default_factory=set) + + +@dataclass +class GraphQLVariableInfo: + name: str + type: str + required: bool + is_list: bool = False + inner_required: bool = False + default: Any | None = None + + +@dataclass +class GraphQLQueryModel: + model: MainSchemaTypesAPI + root: bool + arguments: list[GraphQLArgumentInfo] + attributes: set[str] + relationships: set[str] + mutate_actions: list[MutateAction] = field(default_factory=list) + + +@dataclass +class GraphQLQueryNode: + path: str + operation: GraphQLOperationType = field(default=GraphQLOperationType.UNDEFINED) + arguments: list[GraphQLArgumentInfo] = field(default_factory=list) + variables: list[GraphQLVariableInfo] = field(default_factory=list) + context_type: ContextType = field(default=ContextType.EDGE) + parent: GraphQLQueryNode | None = field(default=None) + children: list[GraphQLQueryNode] = field(default_factory=list) + infrahub_model: MainSchemaTypesAPI | None = field(default=None) + infrahub_node_models: list[MainSchemaTypesAPI] = field(default_factory=list) + infrahub_attributes: set[str] = field(default_factory=set) + infrahub_relationships: set[str] = field(default_factory=set) + field_node: FieldNode | None = field(default=None) + mutate_actions: list[MutateAction] = field(default_factory=list) + + def context_model(self) -> MainSchemaTypesAPI | None: + """Return the closest Infrahub object by going up in the tree""" + if self.infrahub_model: + return self.infrahub_model + if self.parent: + return self.parent.context_model() + + return None + + def context_path(self) -> str: + """Return the relative path for the current context with the closest Infrahub object as the root""" + if self.infrahub_model: + return f"/{self.path}" + if self.parent: + return f"{self.parent.context_path()}/{self.path}" + return self.path + + def properties_path(self) -> str: + """Indicate the expected path to where Infrahub attributes and relationships would be defined.""" + if self.infrahub_model: + match self.context_type: + case ContextType.DIRECT: + return f"/{self.path}" + case ContextType.EDGE: + return f"/{self.path}/edges/node" + case ContextType.NODE: + return f"/{self.path}/node" + case ContextType.OBJECT: + return f"/{self.path}/object" + if self.parent: + return self.parent.properties_path() + + return self.path + + def full_path(self) -> str: + """Return the full path within the tree for the current context.""" + if self.parent: + return f"{self.parent.full_path()}/{self.path}" + return self.path + + @property + def at_root(self) -> bool: + return not self.parent + + @property + def in_property_level(self) -> bool: + """Indicate if properties, i.e., attributes and relationships could exist at this level.""" + return self.context_path() == self.properties_path() + + def append_attribute(self, attribute: str) -> None: + """Add attributes to the closes parent Infrahub object.""" + if self.infrahub_model: + self.infrahub_attributes.add(attribute) + elif self.parent: + self.parent.append_attribute(attribute=attribute) + + def append_relationship(self, relationship: str) -> None: + """Add relationships to the closes parent Infrahub object.""" + if self.infrahub_model: + self.infrahub_relationships.add(relationship) + elif self.parent: + self.parent.append_relationship(relationship=relationship) + + def get_models(self) -> list[GraphQLQueryModel]: + """Return all models defined on this node along with child nodes""" + models: list[GraphQLQueryModel] = [] + if self.infrahub_model: + models.append( + GraphQLQueryModel( + model=self.infrahub_model, + root=self.at_root, + arguments=self.arguments, + attributes=self.infrahub_attributes, + relationships=self.infrahub_relationships, + mutate_actions=self.mutate_actions, + ) + ) + for used_by in self.infrahub_node_models: + models.append( + GraphQLQueryModel( + model=used_by, + root=self.at_root, + arguments=self.arguments, + attributes=self.infrahub_attributes, + relationships=self.infrahub_relationships, + mutate_actions=self.mutate_actions, + ) + ) + + for child in self.children: + models.extend(child.get_models()) + return models + + +@dataclass +class GraphQLQueryReport: + queries: list[GraphQLQueryNode] + + @property + def impacted_models(self) -> list[str]: + """Return a list of all Infrahub objects that are impacted by queries within the request""" + models: set[str] = set() + for query in self.queries: + query_models = query.get_models() + models.update([query_model.model.kind for query_model in query_models]) + + return sorted(models) + + @cached_property + def requested_read(self) -> dict[str, ObjectAccess]: + """Return Infrahub objects and the fields (attributes and relationships) that this query would attempt to read""" + access: dict[str, ObjectAccess] = {} + for query in self.queries: + query_models = query.get_models() + for query_model in query_models: + if query_model.model.kind not in access: + access[query_model.model.kind] = ObjectAccess() + access[query_model.model.kind].attributes.update(query_model.attributes) + access[query_model.model.kind].relationships.update(query_model.relationships) + + return access + + def fields_by_kind(self, kind: str) -> list[str]: + fields: list[str] = [] + if access := self.requested_read.get(kind): + fields.extend(list(access.attributes)) + fields.extend(list(access.relationships)) + + return fields + + @cached_property + def variables(self) -> list[GraphQLVariableInfo]: + """Return input variables defined on the query document + + All subqueries will use the same document level queries, + so only the first entry is required + """ + if self.queries: + return self.queries[0].variables + return [] + + def required_argument(self, argument: GraphQLArgumentInfo) -> bool: + if argument.name == "ids" and argument.kind == "list_value": + for variable in self.variables: + if f"['${variable.name}']" == argument.as_variable_name and variable.required: + return True + + return False + + if not argument.is_variable: + # If the argument isn't a variable it would have been + # statically defined in the input and as such required + return True + for variable in self.variables: + if variable.name == argument.as_variable_name and variable.required: + return True + + return False + + @cached_property + def top_level_kinds(self) -> list[str]: + return [query.infrahub_model.kind for query in self.queries if query.infrahub_model] + + @cached_property + def kind_action_map(self) -> dict[str, set[MutateAction]]: + access: dict[str, set[MutateAction]] = {} + root_models: set[str] = set() + includes_mutations: bool = False + for query in self.queries: + query_models = query.get_models() + for query_model in query_models: + if query_model.mutate_actions: + includes_mutations = True + if includes_mutations: + if query_model.model.kind not in access: + access[query_model.model.kind] = set() + if query_model.root: + root_models.add(query_model.model.kind) + access[query_model.model.kind].update(query_model.mutate_actions) + + # Until we properly analyze the data payload it is assumed that the required permissions on non-root objects is update + # the idea around this is that at this point even if we only return data from a relationship without actually updating + # that relationship we'd still expect to have UPDATE permissions on that related object. However, this is still a step + # in the right direction as we'd previously require the same permissions as that of the base object so this is still + # more correct. + for node_kind, node_actions in access.items(): + if node_kind not in root_models: + node_actions.add(MutateAction.UPDATE) + + return access + + @property + def only_has_unique_targets(self) -> bool: + """Indicate if the query document is defined so that it will return a single root level object""" + if not self.queries: + return False + + for query in self.queries: + targets_single_query = False + if query.infrahub_model and query.infrahub_model.uniqueness_constraints: + for argument in query.arguments: + if [[argument.name]] == query.infrahub_model.uniqueness_constraints: + if self.required_argument(argument=argument): + targets_single_query = True + elif argument.name == "ids" and self.required_argument(argument=argument): + targets_single_query = True + + if not targets_single_query: + return False + + return True + + +class InfrahubQueryAnalyzer(GraphQLQueryAnalyzer): + """GraphQL query analyzer for Infrahub queries. + + This analyzer extends the base GraphQLQueryAnalyzer to provide Infrahub-specific + analysis capabilities, including: + - Identifying which Infrahub models are referenced in the query + - Tracking which attributes and relationships are accessed + - Determining if a query targets single or multiple objects + """ + + def __init__( + self, + query: str, + schema_branch: SchemaBranchProtocol | BranchSchema, + schema: GraphQLSchema | None = None, + query_variables: dict[str, Any] | None = None, + operation_name: str | None = None, + ) -> None: + self.schema_branch = schema_branch + self.operation_name = operation_name + self.query_variables: dict[str, Any] = query_variables or {} + self._named_fragments: dict[str, GraphQLQueryNode] = {} + self._fragment_dependencies: dict[str, set[str]] = {} + super().__init__(query=query, schema=schema) + + @property + def operation_names(self) -> list[str]: + return [operation.name for operation in self.operations if operation.name is not None] + + @cached_property + def _fragment_definitions(self) -> list[FragmentDefinitionNode]: + return [ + definition for definition in self.document.definitions if isinstance(definition, FragmentDefinitionNode) + ] + + @cached_property + def _operation_definitions(self) -> list[OperationDefinitionNode]: + return [ + definition for definition in self.document.definitions if isinstance(definition, OperationDefinitionNode) + ] + + def get_named_fragment_with_parent(self, name: str, parent: GraphQLQueryNode) -> GraphQLQueryNode: + """Return a copy of the named fragment and attach it to a parent. + + We return a copy of the object as a named fragment could be used by multiple queries and as we're + generally working with references to objects we wouldn't want to override the parent of a previously + assigned object + """ + named_fragment = deepcopy(self._named_fragments[name]) + named_fragment.parent = parent + return named_fragment + + @cached_property + def query_report(self) -> GraphQLQueryReport: + self._populate_named_fragments() + operations = self._get_operations() + + return GraphQLQueryReport(queries=operations) + + def _get_operations(self) -> list[GraphQLQueryNode]: + operations: list[GraphQLQueryNode] = [] + for operation_definition in self._operation_definitions: + selections = self._get_selections(selection_set=operation_definition.selection_set) + + for field_node in selections.field_nodes: + schema_model: MainSchemaTypesAPI + infrahub_node_models: list[MainSchemaTypesAPI] = [] + model_name = self._get_model_name(node=field_node, operation_definition=operation_definition) + + if model_name in self.schema_branch.node_names: + schema_model = self.schema_branch.get_node(name=model_name, duplicate=False) + elif model_name in self.schema_branch.generic_names: + schema_model = self.schema_branch.get_generic(name=model_name, duplicate=False) + infrahub_node_models = [ + self.schema_branch.get(name=used_by, duplicate=False) for used_by in schema_model.used_by + ] + elif model_name in self.schema_branch.profile_names: + schema_model = self.schema_branch.get_profile(name=model_name, duplicate=False) + else: + continue + + operational_node = GraphQLQueryNode( + operation=GraphQLOperationType.from_operation(operation=operation_definition.operation), + path=schema_model.kind, + infrahub_model=schema_model, + infrahub_node_models=infrahub_node_models, + mutate_actions=self._get_model_mutations( + node=field_node, operation_definition=operation_definition + ), + context_type=ContextType.from_operation(operation=operation_definition.operation), + arguments=self._parse_arguments(field_node=field_node), + variables=self._get_variables(operation=operation_definition), + ) + + if field_node.selection_set: + selections = self._get_selections(selection_set=field_node.selection_set) + for selection_field_node in selections.field_nodes: + operational_node.children.append( + self._populate_field_node(node=selection_field_node, query_node=operational_node) + ) + operations.append(operational_node) + return operations + + @staticmethod + def _get_model_name(node: FieldNode, operation_definition: OperationDefinitionNode) -> str: + if operation_definition.operation == OperationType.MUTATION and node.name.value.endswith( + ("Create", "Delete", "Update", "Upsert") + ): + return node.name.value[:-6] + return node.name.value + + @staticmethod + def _get_model_mutations(node: FieldNode, operation_definition: OperationDefinitionNode) -> list[MutateAction]: + if operation_definition.operation == OperationType.MUTATION: + if node.name.value.endswith("Create"): + return [MutateAction.CREATE] + if node.name.value.endswith("Delete"): + return [MutateAction.DELETE] + if node.name.value.endswith("Update"): + return [MutateAction.UPDATE] + if node.name.value.endswith("Upsert"): + return [MutateAction.CREATE, MutateAction.UPDATE] + return [] + + @property + def _sorted_fragment_definitions(self) -> list[FragmentDefinitionNode]: + """Sort fragments so that we start processing fragments that don't depend on other fragments""" + dependencies = deepcopy(self._fragment_dependencies) + + independent_fragments = deque([frag for frag, deps in dependencies.items() if not deps]) + + sorted_fragments = [] + + while independent_fragments: + fragment_name = independent_fragments.popleft() + sorted_fragments.append(fragment_name) + + for dependent, deps in dependencies.items(): + if fragment_name in deps: + deps.remove(fragment_name) + if not deps: + independent_fragments.append(dependent) + + if len(sorted_fragments) != len(self._fragment_dependencies): + raise ValueError("Circular fragment dependency detected.") + + fragment_name_to_definition = {frag.name.value: frag for frag in self._fragment_definitions} + return [fragment_name_to_definition[name] for name in sorted_fragments] + + def _populate_fragment_dependency(self, name: str, selection_set: SelectionSetNode | None) -> None: + if selection_set: + for selection in selection_set.selections: + if isinstance(selection, FragmentSpreadNode): + self._fragment_dependencies[name].add(selection.name.value) + elif isinstance(selection, FieldNode): + self._populate_fragment_dependency(name=name, selection_set=selection.selection_set) + elif isinstance(selection, InlineFragmentNode): + self._populate_fragment_dependency(name=name, selection_set=selection.selection_set) + + def _populate_fragment_dependencies(self) -> None: + for fragment in self._fragment_definitions: + fragment_name = fragment.name.value + self._fragment_dependencies[fragment_name] = set() + self._populate_fragment_dependency(name=fragment_name, selection_set=fragment.selection_set) + + def _populate_named_fragments(self) -> None: + self._populate_fragment_dependencies() + self._named_fragments = {} + + for fragment_definition in self._sorted_fragment_definitions: + fragment_name = fragment_definition.name.value + condition_name = fragment_definition.type_condition.name.value + selections = self._get_selections(selection_set=fragment_definition.selection_set) + + try: + infrahub_model = self.schema_branch.get(name=condition_name, duplicate=False) + except KeyError: + infrahub_model = None + + named_fragment = GraphQLQueryNode( + path=fragment_definition.type_condition.name.value, + context_type=ContextType.DIRECT, + infrahub_model=infrahub_model, + ) + for field_node in selections.field_nodes: + named_fragment.children.append(self._populate_field_node(node=field_node, query_node=named_fragment)) + for inline_fragment_node in selections.inline_fragment_nodes: + named_fragment.children.append( + self._populate_inline_fragment_node(node=inline_fragment_node, query_node=named_fragment) + ) + + self._named_fragments[fragment_name] = named_fragment + + def _populate_field_node(self, node: FieldNode, query_node: GraphQLQueryNode) -> GraphQLQueryNode: + context_type = query_node.context_type + infrahub_model = None + infrahub_node_models: list[MainSchemaTypesAPI] = [] + if query_node.in_property_level: + if model := query_node.context_model(): + if node.name.value in model.attribute_names or node.name.value == "display_label": + query_node.append_attribute(attribute=node.name.value) + elif node.name.value in model.relationship_names: + rel = model.get_relationship_or_none(name=node.name.value) + if rel: + infrahub_model = self.schema_branch.get(name=rel.peer, duplicate=False) + if isinstance(infrahub_model, GenericSchemaAPI): + infrahub_node_models = [ + self.schema_branch.get(name=used_by, duplicate=False) + for used_by in infrahub_model.used_by + ] + + context_type = ContextType.from_relationship_cardinality(cardinality=rel.cardinality) + query_node.append_relationship(relationship=node.name.value) + + current_node = GraphQLQueryNode( + parent=query_node, + path=node.name.value, + context_type=context_type, + infrahub_model=infrahub_model, + infrahub_node_models=infrahub_node_models, + arguments=self._parse_arguments(field_node=node), + ) + + if node.selection_set: + selections = self._get_selections(selection_set=node.selection_set) + for field_node in selections.field_nodes: + current_node.children.append(self._populate_field_node(node=field_node, query_node=current_node)) + for inline_fragment_node in selections.inline_fragment_nodes: + current_node.children.append( + self._populate_inline_fragment_node(node=inline_fragment_node, query_node=current_node) + ) + for fragment_spread_node in selections.fragment_spread_nodes: + current_node.children.append( + self._populate_fragment_spread_node(node=fragment_spread_node, query_node=current_node) + ) + + return current_node + + def _populate_inline_fragment_node( + self, node: InlineFragmentNode, query_node: GraphQLQueryNode + ) -> GraphQLQueryNode: + infrahub_model = self.schema_branch.get(name=node.type_condition.name.value, duplicate=False) + context_type = ContextType.DIRECT + current_node = GraphQLQueryNode( + parent=query_node, + path=node.type_condition.name.value, + context_type=context_type, + infrahub_model=infrahub_model, + ) + if node.selection_set: + selections = self._get_selections(selection_set=node.selection_set) + for field_node in selections.field_nodes: + current_node.children.append(self._populate_field_node(node=field_node, query_node=current_node)) + for inline_fragment_node in selections.inline_fragment_nodes: + current_node.children.append( + self._populate_inline_fragment_node(node=inline_fragment_node, query_node=current_node) + ) + + return current_node + + def _populate_fragment_spread_node( + self, node: FragmentSpreadNode, query_node: GraphQLQueryNode + ) -> GraphQLQueryNode: + return self.get_named_fragment_with_parent(name=node.name.value, parent=query_node) + + @staticmethod + def _get_selections(selection_set: SelectionSetNode) -> GraphQLSelectionSet: + return GraphQLSelectionSet( + field_nodes=[selection for selection in selection_set.selections if isinstance(selection, FieldNode)], + fragment_spread_nodes=[ + selection for selection in selection_set.selections if isinstance(selection, FragmentSpreadNode) + ], + inline_fragment_nodes=[ + selection for selection in selection_set.selections if isinstance(selection, InlineFragmentNode) + ], + ) + + def _get_variables(self, operation: OperationDefinitionNode) -> list[GraphQLVariableInfo]: + variables: list[GraphQLVariableInfo] = [] + + for variable in operation.variable_definitions or []: + type_node: TypeNode = variable.type + required = False + is_list = False + inner_required = False + + if isinstance(type_node, NonNullTypeNode): + required = True + type_node = type_node.type + + if isinstance(type_node, ListTypeNode): + is_list = True + inner_type = type_node.type + + if isinstance(inner_type, NonNullTypeNode): + inner_required = True + inner_type = inner_type.type + + if isinstance(inner_type, NamedTypeNode): + type_name = inner_type.name.value + else: + raise TypeError(f"Unsupported inner type node: {inner_type}") + elif isinstance(type_node, NamedTypeNode): + type_name = type_node.name.value + else: + raise TypeError(f"Unsupported type node: {type_node}") + + variables.append( + GraphQLVariableInfo( + name=variable.variable.name.value, + type=type_name, + required=required, + is_list=is_list, + inner_required=inner_required, + default=self._parse_value(variable.default_value) if variable.default_value else None, + ) + ) + + return variables + + def _parse_arguments(self, field_node: FieldNode) -> list[GraphQLArgumentInfo]: + return [ + GraphQLArgumentInfo( + name=argument.name.value, + value=self._parse_value(argument.value), + kind=argument.value.kind, + ) + for argument in field_node.arguments + ] + + def _parse_value(self, node: ValueNode) -> Any: + match node: + case VariableNode(): + value: Any = f"${node.name.value}" + case IntValueNode(): + value = int(node.value) + case FloatValueNode(): + value = float(node.value) + case StringValueNode(): + value = node.value + case BooleanValueNode(): + value = node.value + case NullValueNode(): + value = None + case EnumValueNode(): + value = node.value + case ListValueNode() | ConstListValueNode(): + value = [self._parse_value(item) for item in node.values] + case ObjectValueNode() | ConstObjectValueNode(): + value = {field.name.value: self._parse_value(field.value) for field in node.fields} + case _: + raise TypeError(f"Unsupported value node: {node}") + + return value diff --git a/infrahub_sdk/schema/main.py b/infrahub_sdk/schema/main.py index 34a35177..2c2ca60b 100644 --- a/infrahub_sdk/schema/main.py +++ b/infrahub_sdk/schema/main.py @@ -364,6 +364,72 @@ class BranchSchema(BaseModel): default_factory=dict ) + @property + def node_names(self) -> list[str]: + """Return names of all NodeSchema objects.""" + return [k for k, v in self.nodes.items() if isinstance(v, NodeSchemaAPI)] + + @property + def generic_names(self) -> list[str]: + """Return names of all GenericSchema objects.""" + return [k for k, v in self.nodes.items() if isinstance(v, GenericSchemaAPI)] + + @property + def profile_names(self) -> list[str]: + """Return names of all ProfileSchema objects.""" + return [k for k, v in self.nodes.items() if isinstance(v, ProfileSchemaAPI)] + + def get( + self, + name: str, + duplicate: bool = False, # noqa: ARG002 + ) -> GenericSchemaAPI | NodeSchemaAPI | ProfileSchemaAPI | TemplateSchemaAPI: + """Get a schema by name. + + Args: + name: The schema kind name to look up. + duplicate: Unused, kept for API compatibility with backend SchemaBranch. + """ + if name not in self.nodes: + raise KeyError(f"Schema '{name}' not found") + return self.nodes[name] + + def get_node(self, name: str, duplicate: bool = False) -> NodeSchemaAPI: # noqa: ARG002 + """Get a NodeSchema by name. + + Args: + name: The schema kind name to look up. + duplicate: Unused, kept for API compatibility with backend SchemaBranch. + """ + schema = self.get(name) + if not isinstance(schema, NodeSchemaAPI): + raise TypeError(f"Schema '{name}' is not a NodeSchema") + return schema + + def get_generic(self, name: str, duplicate: bool = False) -> GenericSchemaAPI: # noqa: ARG002 + """Get a GenericSchema by name. + + Args: + name: The schema kind name to look up. + duplicate: Unused, kept for API compatibility with backend SchemaBranch. + """ + schema = self.get(name) + if not isinstance(schema, GenericSchemaAPI): + raise TypeError(f"Schema '{name}' is not a GenericSchema") + return schema + + def get_profile(self, name: str, duplicate: bool = False) -> ProfileSchemaAPI: # noqa: ARG002 + """Get a ProfileSchema by name. + + Args: + name: The schema kind name to look up. + duplicate: Unused, kept for API compatibility with backend SchemaBranch. + """ + schema = self.get(name) + if not isinstance(schema, ProfileSchemaAPI): + raise TypeError(f"Schema '{name}' is not a ProfileSchema") + return schema + @classmethod def from_api_response(cls, data: MutableMapping[str, Any]) -> Self: """ diff --git a/tests/unit/sdk/test_infrahub_query_analyzer.py b/tests/unit/sdk/test_infrahub_query_analyzer.py new file mode 100644 index 00000000..52efcf59 --- /dev/null +++ b/tests/unit/sdk/test_infrahub_query_analyzer.py @@ -0,0 +1,182 @@ +import pytest + +from infrahub_sdk.query_analyzer import GraphQLQueryReport, InfrahubQueryAnalyzer +from infrahub_sdk.schema import BranchSchema, NodeSchema, NodeSchemaAPI + + +@pytest.fixture +def tag_schema_with_uniqueness() -> NodeSchemaAPI: + """Tag schema with uniqueness constraints on name__value (matching GraphQL filter format).""" + data = { + "name": "Tag", + "namespace": "Builtin", + "default_filter": "name__value", + # The uniqueness_constraints must match the GraphQL argument name format + "uniqueness_constraints": [["name__value"]], + "attributes": [ + {"name": "name", "kind": "Text", "unique": True}, + {"name": "description", "kind": "Text", "optional": True}, + ], + } + return NodeSchema(**data).convert_api() + + +@pytest.fixture +def branch_schema_with_tag(tag_schema_with_uniqueness: NodeSchemaAPI) -> BranchSchema: + """A BranchSchema containing only the BuiltinTag schema.""" + return BranchSchema(hash="test", nodes={"BuiltinTag": tag_schema_with_uniqueness}) + + +class TestGraphQLQueryReportOnlyHasUniqueTargets: + """Tests for GraphQLQueryReport.only_has_unique_targets property.""" + + def test_empty_queries_returns_false(self) -> None: + """When there are no queries, only_has_unique_targets should return False.""" + report = GraphQLQueryReport(queries=[]) + assert report.only_has_unique_targets is False + + def test_query_without_filter_returns_false( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A query without any filter should return False (multi-target).""" + query = """ + query BuiltinTag { + BuiltinTag { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + assert report.only_has_unique_targets is False + + def test_query_with_required_unique_filter_returns_true( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A query with a required filter on a unique field should return True.""" + query = """ + query BuiltinTag($name: String!) { + BuiltinTag(name__value: $name) { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + assert report.only_has_unique_targets is True + + def test_query_with_optional_unique_filter_returns_false( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A query with an optional filter should return False (variable might not be provided).""" + query = """ + query BuiltinTag($name: String) { + BuiltinTag(name__value: $name) { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + assert report.only_has_unique_targets is False + + def test_query_with_static_unique_filter_returns_true( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A query with a static (non-variable) filter on unique field should return True.""" + query = """ + query { + BuiltinTag(name__value: "my-tag") { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + assert report.only_has_unique_targets is True + + def test_query_with_required_ids_filter_returns_true( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A query filtering by ids with a required variable should return True.""" + query = """ + query BuiltinTag($ids: [ID]!) { + BuiltinTag(ids: $ids) { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + assert report.only_has_unique_targets is True + + +class TestGraphQLQueryReportTopLevelKinds: + """Tests for GraphQLQueryReport.top_level_kinds property.""" + + def test_empty_queries_returns_empty_list(self) -> None: + """When there are no queries, top_level_kinds should return empty list.""" + report = GraphQLQueryReport(queries=[]) + assert report.top_level_kinds == [] + + def test_single_query_returns_kind( + self, branch_schema_with_tag: BranchSchema + ) -> None: + """A single query should return its kind in top_level_kinds.""" + query = """ + query { + BuiltinTag { + edges { node { id } } + } + } + """ + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema_with_tag, + ) + report = analyzer.query_report + + assert report.top_level_kinds == ["BuiltinTag"] + + def test_unknown_kind_not_in_top_level_kinds(self) -> None: + """A query for an unknown kind should not appear in top_level_kinds.""" + query = """ + query { + UnknownKind { + edges { node { id } } + } + } + """ + branch_schema = BranchSchema(hash="test", nodes={}) + analyzer = InfrahubQueryAnalyzer( + query=query, + schema_branch=branch_schema, + ) + report = analyzer.query_report + + assert report.top_level_kinds == [] From 359d04465da336d06495f415e1e3e72bf8c94fda Mon Sep 17 00:00:00 2001 From: Baptiste Date: Sat, 27 Dec 2025 12:27:25 +0100 Subject: [PATCH 2/4] Split to many methods so ruff is happy --- infrahub_sdk/ctl/graphql.py | 131 ++++++++++++++++++++++-------------- 1 file changed, 81 insertions(+), 50 deletions(-) diff --git a/infrahub_sdk/ctl/graphql.py b/infrahub_sdk/ctl/graphql.py index 10dbf1d8..4bf15ec4 100644 --- a/infrahub_sdk/ctl/graphql.py +++ b/infrahub_sdk/ctl/graphql.py @@ -27,6 +27,83 @@ from ..schema import BranchSchema from .parameters import CONFIG_PARAM + +class CheckResults: + """Container for check command results.""" + + def __init__(self) -> None: + self.single_target_count = 0 + self.multi_target_count = 0 + self.error_count = 0 + + +def _print_query_result(console: Console, report: object, results: CheckResults) -> None: + """Print the result for a single query analysis.""" + if report.only_has_unique_targets: + console.print("[green] Result: Single-target query (good)[/green]") + console.print(" This query targets unique nodes, enabling selective artifact regeneration.") + results.single_target_count += 1 + else: + console.print("[yellow] Result: Multi-target query[/yellow]") + console.print(" May cause excessive artifact regeneration. Fix: filter by ID or unique attribute.") + results.multi_target_count += 1 + + +def _analyze_query_file( + console: Console, + query_file: Path, + branch_schema: BranchSchema, + graphql_schema: GraphQLSchema, + idx: int, + total_files: int, + results: CheckResults, +) -> None: + """Analyze a single GraphQL query file and print results.""" + query_content = query_file.read_text(encoding="utf-8") + + analyzer = InfrahubQueryAnalyzer( + query=query_content, + schema_branch=branch_schema, + schema=graphql_schema, + ) + + console.print(f"[dim]{'─' * 60}[/dim]") + console.print(f"[bold cyan][{idx}/{total_files}][/bold cyan] {query_file}") + + is_valid, errors = analyzer.is_valid + if not is_valid: + console.print("[red] Validation failed:[/red]") + for error in errors or []: + console.print(f" - {error.message}") + results.error_count += 1 + return + + report = analyzer.query_report + console.print(f"[bold] Top-level kinds:[/bold] {', '.join(report.top_level_kinds) or 'None'}") + + if not report.top_level_kinds: + console.print("[yellow] Warning: No Infrahub models found in query.[/yellow]") + console.print(" The query may reference types not in the schema, or only use non-model fields.") + results.error_count += 1 + return + + _print_query_result(console, report, results) + + +def _print_summary(console: Console, results: CheckResults) -> None: + """Print the summary of check results.""" + console.print(f"[dim]{'─' * 60}[/dim]") + console.print() + console.print("[bold]Summary:[/bold]") + if results.single_target_count: + console.print(f" [green]{results.single_target_count} single-target[/green]") + if results.multi_target_count: + console.print(f" [yellow]{results.multi_target_count} multi-target[/yellow]") + console.print(" See: https://docs.infrahub.app/topics/graphql") + if results.error_count: + console.print(f" [red]{results.error_count} errors[/red]") + + app = AsyncTyper() console = Console() @@ -225,58 +302,12 @@ async def check( console.print(f"[bold]Checking {total_files} GraphQL file{'s' if total_files > 1 else ''}...[/bold]") console.print() - single_target_count = 0 - multi_target_count = 0 - error_count = 0 + results = CheckResults() for idx, query_file in enumerate(gql_files, 1): - query_content = query_file.read_text(encoding="utf-8") - - analyzer = InfrahubQueryAnalyzer( - query=query_content, - schema_branch=branch_schema, - schema=graphql_schema, - ) - - console.print(f"[dim]{'─' * 60}[/dim]") - console.print(f"[bold cyan][{idx}/{total_files}][/bold cyan] {query_file}") - - is_valid, errors = analyzer.is_valid - if not is_valid: - console.print("[red] Validation failed:[/red]") - for error in errors or []: - console.print(f" - {error.message}") - error_count += 1 - continue - - report = analyzer.query_report - console.print(f"[bold] Top-level kinds:[/bold] {', '.join(report.top_level_kinds) or 'None'}") - - if not report.top_level_kinds: - console.print("[yellow] Warning: No Infrahub models found in query.[/yellow]") - console.print(" The query may reference types not in the schema, or only use non-model fields.") - error_count += 1 - continue - - if report.only_has_unique_targets: - console.print("[green] Result: Single-target query (good)[/green]") - console.print(" This query targets unique nodes, enabling selective artifact regeneration.") - single_target_count += 1 - else: - console.print("[yellow] Result: Multi-target query[/yellow]") - console.print(" May cause excessive artifact regeneration. Fix: filter by ID or unique attribute.") - multi_target_count += 1 + _analyze_query_file(console, query_file, branch_schema, graphql_schema, idx, total_files, results) - console.print(f"[dim]{'─' * 60}[/dim]") - console.print() - console.print("[bold]Summary:[/bold]") - if single_target_count: - console.print(f" [green]{single_target_count} single-target[/green]") - if multi_target_count: - console.print(f" [yellow]{multi_target_count} multi-target[/yellow]") - console.print(" See: https://docs.infrahub.app/topics/graphql") - if error_count: - console.print(f" [red]{error_count} errors[/red]") + _print_summary(console, results) - if error_count: + if results.error_count: raise typer.Exit(1) From e40b48bceebe5ec5a4d7081fb4c43caf6349de1a Mon Sep 17 00:00:00 2001 From: Baptiste Date: Sat, 27 Dec 2025 12:30:07 +0100 Subject: [PATCH 3/4] Reformat by ruff --- .../unit/sdk/test_infrahub_query_analyzer.py | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/tests/unit/sdk/test_infrahub_query_analyzer.py b/tests/unit/sdk/test_infrahub_query_analyzer.py index 52efcf59..6b5a51f7 100644 --- a/tests/unit/sdk/test_infrahub_query_analyzer.py +++ b/tests/unit/sdk/test_infrahub_query_analyzer.py @@ -35,9 +35,7 @@ def test_empty_queries_returns_false(self) -> None: report = GraphQLQueryReport(queries=[]) assert report.only_has_unique_targets is False - def test_query_without_filter_returns_false( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_query_without_filter_returns_false(self, branch_schema_with_tag: BranchSchema) -> None: """A query without any filter should return False (multi-target).""" query = """ query BuiltinTag { @@ -55,9 +53,7 @@ def test_query_without_filter_returns_false( assert report.top_level_kinds == ["BuiltinTag"] assert report.only_has_unique_targets is False - def test_query_with_required_unique_filter_returns_true( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_query_with_required_unique_filter_returns_true(self, branch_schema_with_tag: BranchSchema) -> None: """A query with a required filter on a unique field should return True.""" query = """ query BuiltinTag($name: String!) { @@ -75,9 +71,7 @@ def test_query_with_required_unique_filter_returns_true( assert report.top_level_kinds == ["BuiltinTag"] assert report.only_has_unique_targets is True - def test_query_with_optional_unique_filter_returns_false( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_query_with_optional_unique_filter_returns_false(self, branch_schema_with_tag: BranchSchema) -> None: """A query with an optional filter should return False (variable might not be provided).""" query = """ query BuiltinTag($name: String) { @@ -95,9 +89,7 @@ def test_query_with_optional_unique_filter_returns_false( assert report.top_level_kinds == ["BuiltinTag"] assert report.only_has_unique_targets is False - def test_query_with_static_unique_filter_returns_true( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_query_with_static_unique_filter_returns_true(self, branch_schema_with_tag: BranchSchema) -> None: """A query with a static (non-variable) filter on unique field should return True.""" query = """ query { @@ -115,9 +107,7 @@ def test_query_with_static_unique_filter_returns_true( assert report.top_level_kinds == ["BuiltinTag"] assert report.only_has_unique_targets is True - def test_query_with_required_ids_filter_returns_true( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_query_with_required_ids_filter_returns_true(self, branch_schema_with_tag: BranchSchema) -> None: """A query filtering by ids with a required variable should return True.""" query = """ query BuiltinTag($ids: [ID]!) { @@ -144,9 +134,7 @@ def test_empty_queries_returns_empty_list(self) -> None: report = GraphQLQueryReport(queries=[]) assert report.top_level_kinds == [] - def test_single_query_returns_kind( - self, branch_schema_with_tag: BranchSchema - ) -> None: + def test_single_query_returns_kind(self, branch_schema_with_tag: BranchSchema) -> None: """A single query should return its kind in top_level_kinds.""" query = """ query { From 68ad67638d36b9d588c7ddcb9a45912fdcb671ae Mon Sep 17 00:00:00 2001 From: Baptiste Date: Mon, 29 Dec 2025 16:33:53 +0100 Subject: [PATCH 4/4] Lint and doc --- docs/docs/infrahubctl/infrahubctl-graphql.mdx | 26 +++++++++++++++++++ infrahub_sdk/ctl/graphql.py | 13 ++++++---- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/docs/docs/infrahubctl/infrahubctl-graphql.mdx b/docs/docs/infrahubctl/infrahubctl-graphql.mdx index 180bd2b8..853c56b9 100644 --- a/docs/docs/infrahubctl/infrahubctl-graphql.mdx +++ b/docs/docs/infrahubctl/infrahubctl-graphql.mdx @@ -16,9 +16,35 @@ $ infrahubctl graphql [OPTIONS] COMMAND [ARGS]... **Commands**: +* `check`: Check if GraphQL queries target single or... * `export-schema`: Export the GraphQL schema to a file. * `generate-return-types`: Create Pydantic Models for GraphQL query... +## `infrahubctl graphql check` + +Check if GraphQL queries target single or multiple objects. + +A single-target query is one that will return at most one object per query operation. +This is determined by checking if the query uses uniqueness constraints (like filtering by ID or name). + +Multi-target queries may return multiple objects and should be used with caution in artifact definitions. + +**Usage**: + +```console +$ infrahubctl graphql check [OPTIONS] [QUERY] +``` + +**Arguments**: + +* `[QUERY]`: Path to the GraphQL query file or directory. Defaults to current directory if not specified. + +**Options**: + +* `--branch TEXT`: Branch to use for schema. +* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml] +* `--help`: Show this message and exit. + ## `infrahubctl graphql export-schema` Export the GraphQL schema to a file. diff --git a/infrahub_sdk/ctl/graphql.py b/infrahub_sdk/ctl/graphql.py index 4bf15ec4..11b7e0ed 100644 --- a/infrahub_sdk/ctl/graphql.py +++ b/infrahub_sdk/ctl/graphql.py @@ -3,6 +3,7 @@ import ast from collections import defaultdict from pathlib import Path +from typing import TYPE_CHECKING import typer from ariadne_codegen.client_generators.package import PackageGenerator, get_package_generator @@ -23,10 +24,12 @@ from ..ctl.client import initialize_client from ..ctl.utils import catch_exception from ..graphql.utils import insert_fragments_inline, remove_fragment_import -from ..query_analyzer import InfrahubQueryAnalyzer -from ..schema import BranchSchema +from ..query_analyzer import GraphQLQueryReport, InfrahubQueryAnalyzer from .parameters import CONFIG_PARAM +if TYPE_CHECKING: + from ..schema import BranchSchema + class CheckResults: """Container for check command results.""" @@ -37,7 +40,7 @@ def __init__(self) -> None: self.error_count = 0 -def _print_query_result(console: Console, report: object, results: CheckResults) -> None: +def _print_query_result(console: Console, report: GraphQLQueryReport, results: CheckResults) -> None: """Print the result for a single query analysis.""" if report.only_has_unique_targets: console.print("[green] Result: Single-target query (good)[/green]") @@ -292,8 +295,8 @@ async def check( client = initialize_client() - schema_data = await client.schema.all(branch=branch) - branch_schema = BranchSchema(hash="", nodes=schema_data) + await client.schema.all(branch=branch) + branch_schema = client.schema.cache[branch or client.default_branch] graphql_schema_text = await client.schema.get_graphql_schema() graphql_schema = build_schema(graphql_schema_text)