From 671e22f6a2c2e536b86003aa40bd1a4d83d784bf Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 03:44:29 +0000 Subject: [PATCH 1/7] Refactor: Resolve schema owner from ownerConfig Co-authored-by: yourton.ma --- .../source/database/database_service.py | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 7da55ecbc031..0f5ce9c8088a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -679,13 +679,34 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: """ try: parent_owner = None - database_schema_entity = getattr( - self.context.get(), "database_schema_entity", None - ) - if database_schema_entity: - schema_owners = database_schema_entity.owners - if schema_owners and schema_owners.root: - parent_owner = schema_owners.root[0].name + + # Get parent owner from schema's ownerConfig resolution + # We need to resolve the schema's owner first to enable proper inheritance + if ( + hasattr(self.source_config, "ownerConfig") + and self.source_config.ownerConfig + ): + schema_fqn = f"{self.context.get().database}.{self.context.get().database_schema}" + + # First, get the database owner for schema inheritance + database_owner = None + database_entity = getattr(self.context.get(), "database_entity", None) + if database_entity: + db_owners = database_entity.owners + if db_owners and db_owners.root: + database_owner = db_owners.root[0].name + + # Resolve schema owner (which may inherit from database) + schema_owner_ref = get_owner_from_config( + metadata=self.metadata, + owner_config=self.source_config.ownerConfig, + entity_type="databaseSchema", + entity_name=schema_fqn, + parent_owner=database_owner, + ) + + if schema_owner_ref and schema_owner_ref.root: + parent_owner = schema_owner_ref.root[0].name table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}" From 8a7b08ba573fc894ec70f5a9ad8e32cdb785bfdb Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 03:51:20 +0000 Subject: [PATCH 2/7] Refactor schema owner resolution for inheritance Co-authored-by: yourton.ma --- .../source/database/database_service.py | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 0f5ce9c8088a..518022466b8a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -680,33 +680,31 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: try: parent_owner = None - # Get parent owner from schema's ownerConfig resolution - # We need to resolve the schema's owner first to enable proper inheritance + # Get parent owner from schema entity + # Since schema entity is not stored in context, we fetch it from the API if ( hasattr(self.source_config, "ownerConfig") and self.source_config.ownerConfig ): - schema_fqn = f"{self.context.get().database}.{self.context.get().database_schema}" - - # First, get the database owner for schema inheritance - database_owner = None - database_entity = getattr(self.context.get(), "database_entity", None) - if database_entity: - db_owners = database_entity.owners - if db_owners and db_owners.root: - database_owner = db_owners.root[0].name - - # Resolve schema owner (which may inherit from database) - schema_owner_ref = get_owner_from_config( + schema_fqn = fqn.build( metadata=self.metadata, - owner_config=self.source_config.ownerConfig, - entity_type="databaseSchema", - entity_name=schema_fqn, - parent_owner=database_owner, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=self.context.get().database_schema, ) - if schema_owner_ref and schema_owner_ref.root: - parent_owner = schema_owner_ref.root[0].name + # Fetch schema entity to get its resolved owner + try: + schema_entity = self.metadata.get_by_name( + entity=DatabaseSchema, + fqn=schema_fqn, + fields=["owners"], + ) + if schema_entity and schema_entity.owners and schema_entity.owners.root: + parent_owner = schema_entity.owners.root[0].name + except Exception as exc: + logger.debug(f"Could not fetch schema entity for owner inheritance: {exc}") table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}" From c5f63d9059ed280b27c16ff9b81b285c9428acc7 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 03:55:31 +0000 Subject: [PATCH 3/7] feat: Cache schema owners to optimize table ingestion Co-authored-by: yourton.ma --- .../source/database/database_service.py | 36 ++++++++----------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 518022466b8a..998da0191332 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -13,7 +13,7 @@ """ import traceback from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Optional, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from pydantic import BaseModel, Field from sqlalchemy.engine import Inspector @@ -225,6 +225,8 @@ class DatabaseServiceSource( stored_procedure_source_state: Set = set() database_entity_source_state: Set = set() schema_entity_source_state: Set = set() + # Cache for schema owners to avoid repeated API calls during table ingestion + schema_owner_cache: Dict[str, Optional[str]] = {} # Big union of types we want to fetch dynamically service_connection: DatabaseConnection.model_fields["config"].annotation @@ -654,6 +656,9 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList parent_owner=parent_owner, ) if owner_ref: + # Cache the resolved owner for table inheritance + if owner_ref.root: + self.schema_owner_cache[schema_fqn] = owner_ref.root[0].name return owner_ref except Exception as exc: @@ -680,31 +685,20 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: try: parent_owner = None - # Get parent owner from schema entity - # Since schema entity is not stored in context, we fetch it from the API + # Get parent owner from cached schema owner + # The schema owner was cached when the schema was processed if ( hasattr(self.source_config, "ownerConfig") and self.source_config.ownerConfig ): - schema_fqn = fqn.build( - metadata=self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=self.context.get().database_schema, - ) + schema_fqn = f"{self.context.get().database}.{self.context.get().database_schema}" - # Fetch schema entity to get its resolved owner - try: - schema_entity = self.metadata.get_by_name( - entity=DatabaseSchema, - fqn=schema_fqn, - fields=["owners"], - ) - if schema_entity and schema_entity.owners and schema_entity.owners.root: - parent_owner = schema_entity.owners.root[0].name - except Exception as exc: - logger.debug(f"Could not fetch schema entity for owner inheritance: {exc}") + # Use cached schema owner if available + parent_owner = self.schema_owner_cache.get(schema_fqn) + + logger.debug( + f"Table '{table_name}': Using cached schema owner from '{schema_fqn}': {parent_owner}" + ) table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}" From 7657c85915463b189e7f1316329b8d96fcd561ea Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 03:58:51 +0000 Subject: [PATCH 4/7] Cache database owner for schema inheritance Co-authored-by: yourton.ma --- .../ingestion/source/database/database_service.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 998da0191332..73f9a808691a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -611,6 +611,9 @@ def get_database_owner_ref( parent_owner=None, # Database is top level ) if owner_ref: + # Cache the resolved owner for schema inheritance + if owner_ref.root: + self.schema_owner_cache[database_name] = owner_ref.root[0].name return owner_ref except Exception as exc: @@ -635,12 +638,14 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList EntityReferenceList with owner or None """ try: + # Get parent owner from cached database owner parent_owner = None - database_entity = getattr(self.context.get(), "database_entity", None) - if database_entity: - db_owners = database_entity.owners - if db_owners and db_owners.root: - parent_owner = db_owners.root[0].name + database_name = self.context.get().database + if database_name: + parent_owner = self.schema_owner_cache.get(database_name) + logger.debug( + f"Schema '{schema_name}': Using cached database owner from '{database_name}': {parent_owner}" + ) schema_fqn = f"{self.context.get().database}.{schema_name}" From 0cc7bca8968b7fa136280dff7cb3a50dc4ee69b3 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 04:05:09 +0000 Subject: [PATCH 5/7] Refactor: Use context for owner inheritance, remove cache Co-authored-by: yourton.ma --- .../metadata/ingestion/api/topology_runner.py | 14 ++++++ .../source/database/database_service.py | 43 ++++++------------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index f13d9a0668d7..065cb9068eba 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -446,6 +446,20 @@ def yield_and_update_context( ) self.context.get().update_context_name(stage=stage, right=right) + + # Store the entity in context for downstream inheritance + # After yielding the request to sink, fetch the entity from API to get the complete entity + # with all resolved fields (like owners) for child entities to inherit from + if stage.context and not same_fingerprint: + # Fetch the entity after it has been created/updated by the sink + entity = self.metadata.get_by_name( + entity=stage.type_, + fqn=entity_fqn, + fields=["owners"], # Only fetch owners field for performance + ) + if entity: + entity_context_key = f"{stage.context}_entity" + self.context.get().upsert(key=entity_context_key, value=entity) @yield_and_update_context.register def _( diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 73f9a808691a..4a4a2628bda8 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -225,8 +225,6 @@ class DatabaseServiceSource( stored_procedure_source_state: Set = set() database_entity_source_state: Set = set() schema_entity_source_state: Set = set() - # Cache for schema owners to avoid repeated API calls during table ingestion - schema_owner_cache: Dict[str, Optional[str]] = {} # Big union of types we want to fetch dynamically service_connection: DatabaseConnection.model_fields["config"].annotation @@ -598,7 +596,6 @@ def get_database_owner_ref( EntityReferenceList with owner or None """ try: - # Priority 1: Use ownerConfig if configured if ( hasattr(self.source_config, "ownerConfig") and self.source_config.ownerConfig @@ -611,9 +608,6 @@ def get_database_owner_ref( parent_owner=None, # Database is top level ) if owner_ref: - # Cache the resolved owner for schema inheritance - if owner_ref.root: - self.schema_owner_cache[database_name] = owner_ref.root[0].name return owner_ref except Exception as exc: @@ -638,14 +632,12 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList EntityReferenceList with owner or None """ try: - # Get parent owner from cached database owner parent_owner = None - database_name = self.context.get().database - if database_name: - parent_owner = self.schema_owner_cache.get(database_name) - logger.debug( - f"Schema '{schema_name}': Using cached database owner from '{database_name}': {parent_owner}" - ) + database_entity = getattr(self.context.get(), "database_entity", None) + if database_entity: + db_owners = database_entity.owners + if db_owners and db_owners.root: + parent_owner = db_owners.root[0].name schema_fqn = f"{self.context.get().database}.{schema_name}" @@ -661,9 +653,6 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList parent_owner=parent_owner, ) if owner_ref: - # Cache the resolved owner for table inheritance - if owner_ref.root: - self.schema_owner_cache[schema_fqn] = owner_ref.root[0].name return owner_ref except Exception as exc: @@ -689,21 +678,13 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: """ try: parent_owner = None - - # Get parent owner from cached schema owner - # The schema owner was cached when the schema was processed - if ( - hasattr(self.source_config, "ownerConfig") - and self.source_config.ownerConfig - ): - schema_fqn = f"{self.context.get().database}.{self.context.get().database_schema}" - - # Use cached schema owner if available - parent_owner = self.schema_owner_cache.get(schema_fqn) - - logger.debug( - f"Table '{table_name}': Using cached schema owner from '{schema_fqn}': {parent_owner}" - ) + database_schema_entity = getattr( + self.context.get(), "database_schema_entity", None + ) + if database_schema_entity: + schema_owners = database_schema_entity.owners + if schema_owners and schema_owners.root: + parent_owner = schema_owners.root[0].name table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}" From 500f8486b42770c995052340c47365a21b372f77 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 04:06:33 +0000 Subject: [PATCH 6/7] Refactor: Cache database and schema entities in context Co-authored-by: yourton.ma --- .../metadata/ingestion/api/topology_runner.py | 14 ------ .../source/database/database_service.py | 47 +++++++++++++++++++ 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 065cb9068eba..f13d9a0668d7 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -446,20 +446,6 @@ def yield_and_update_context( ) self.context.get().update_context_name(stage=stage, right=right) - - # Store the entity in context for downstream inheritance - # After yielding the request to sink, fetch the entity from API to get the complete entity - # with all resolved fields (like owners) for child entities to inherit from - if stage.context and not same_fingerprint: - # Fetch the entity after it has been created/updated by the sink - entity = self.metadata.get_by_name( - entity=stage.type_, - fqn=entity_fqn, - fields=["owners"], # Only fetch owners field for performance - ) - if entity: - entity_context_key = f"{stage.context}_entity" - self.context.get().upsert(key=entity_context_key, value=entity) @yield_and_update_context.register def _( diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 4a4a2628bda8..2a10b1560e02 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -633,7 +633,30 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList """ try: parent_owner = None + + # Try to get database_entity from context database_entity = getattr(self.context.get(), "database_entity", None) + + # If not in context, fetch from API and cache it + if database_entity is None and self.context.get().database: + try: + database_fqn = fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + ) + database_entity = self.metadata.get_by_name( + entity=Database, + fqn=database_fqn, + fields=["owners"], + ) + if database_entity: + # Cache in context for future use + self.context.get().upsert(key="database_entity", value=database_entity) + except Exception as exc: + logger.debug(f"Could not fetch database entity: {exc}") + if database_entity: db_owners = database_entity.owners if db_owners and db_owners.root: @@ -678,9 +701,33 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: """ try: parent_owner = None + + # Try to get database_schema_entity from context database_schema_entity = getattr( self.context.get(), "database_schema_entity", None ) + + # If not in context, fetch from API and cache it + if database_schema_entity is None and self.context.get().database_schema: + try: + schema_fqn = fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=self.context.get().database_schema, + ) + database_schema_entity = self.metadata.get_by_name( + entity=DatabaseSchema, + fqn=schema_fqn, + fields=["owners"], + ) + if database_schema_entity: + # Cache in context for future use + self.context.get().upsert(key="database_schema_entity", value=database_schema_entity) + except Exception as exc: + logger.debug(f"Could not fetch schema entity: {exc}") + if database_schema_entity: schema_owners = database_schema_entity.owners if schema_owners and schema_owners.root: From 60468414817b3beeb66fd993970931bdd0a119a8 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sun, 12 Oct 2025 04:09:24 +0000 Subject: [PATCH 7/7] Refactor: Cache database and schema entities for performance Co-authored-by: yourton.ma --- .../source/database/database_service.py | 86 +++++++++---------- 1 file changed, 41 insertions(+), 45 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 2a10b1560e02..ea1a3af9971c 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -608,6 +608,25 @@ def get_database_owner_ref( parent_owner=None, # Database is top level ) if owner_ref: + # After resolving owner, fetch and cache the database entity for schema inheritance + # This avoids API calls when processing schemas + try: + database_fqn = fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=database_name, + ) + database_entity = self.metadata.get_by_name( + entity=Database, + fqn=database_fqn, + fields=["owners"], + ) + if database_entity: + self.context.get().upsert(key="database_entity", value=database_entity) + except Exception as cache_exc: + logger.debug(f"Could not cache database entity: {cache_exc}") + return owner_ref except Exception as exc: @@ -634,29 +653,8 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList try: parent_owner = None - # Try to get database_entity from context + # Get database_entity from context (should be cached by get_database_owner_ref) database_entity = getattr(self.context.get(), "database_entity", None) - - # If not in context, fetch from API and cache it - if database_entity is None and self.context.get().database: - try: - database_fqn = fqn.build( - metadata=self.metadata, - entity_type=Database, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - ) - database_entity = self.metadata.get_by_name( - entity=Database, - fqn=database_fqn, - fields=["owners"], - ) - if database_entity: - # Cache in context for future use - self.context.get().upsert(key="database_entity", value=database_entity) - except Exception as exc: - logger.debug(f"Could not fetch database entity: {exc}") - if database_entity: db_owners = database_entity.owners if db_owners and db_owners.root: @@ -676,6 +674,26 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList parent_owner=parent_owner, ) if owner_ref: + # After resolving owner, fetch and cache the schema entity for table inheritance + # This avoids API calls when processing tables + try: + schema_fqn_full = fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=schema_name, + ) + schema_entity = self.metadata.get_by_name( + entity=DatabaseSchema, + fqn=schema_fqn_full, + fields=["owners"], + ) + if schema_entity: + self.context.get().upsert(key="database_schema_entity", value=schema_entity) + except Exception as cache_exc: + logger.debug(f"Could not cache schema entity: {cache_exc}") + return owner_ref except Exception as exc: @@ -702,32 +720,10 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: try: parent_owner = None - # Try to get database_schema_entity from context + # Get database_schema_entity from context (should be cached by get_schema_owner_ref) database_schema_entity = getattr( self.context.get(), "database_schema_entity", None ) - - # If not in context, fetch from API and cache it - if database_schema_entity is None and self.context.get().database_schema: - try: - schema_fqn = fqn.build( - metadata=self.metadata, - entity_type=DatabaseSchema, - service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=self.context.get().database_schema, - ) - database_schema_entity = self.metadata.get_by_name( - entity=DatabaseSchema, - fqn=schema_fqn, - fields=["owners"], - ) - if database_schema_entity: - # Cache in context for future use - self.context.get().upsert(key="database_schema_entity", value=database_schema_entity) - except Exception as exc: - logger.debug(f"Could not fetch schema entity: {exc}") - if database_schema_entity: schema_owners = database_schema_entity.owners if schema_owners and schema_owners.root: