diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 7da55ecbc031..ea1a3af9971c 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -13,7 +13,7 @@ """ import traceback from abc import ABC, abstractmethod -from typing import Any, Iterable, List, Optional, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple from pydantic import BaseModel, Field from sqlalchemy.engine import Inspector @@ -596,7 +596,6 @@ def get_database_owner_ref( EntityReferenceList with owner or None """ try: - # Priority 1: Use ownerConfig if configured if ( hasattr(self.source_config, "ownerConfig") and self.source_config.ownerConfig @@ -609,6 +608,25 @@ def get_database_owner_ref( parent_owner=None, # Database is top level ) if owner_ref: + # After resolving owner, fetch and cache the database entity for schema inheritance + # This avoids API calls when processing schemas + try: + database_fqn = fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.get().database_service, + database_name=database_name, + ) + database_entity = self.metadata.get_by_name( + entity=Database, + fqn=database_fqn, + fields=["owners"], + ) + if database_entity: + self.context.get().upsert(key="database_entity", value=database_entity) + except Exception as cache_exc: + logger.debug(f"Could not cache database entity: {cache_exc}") + return owner_ref except Exception as exc: @@ -634,6 +652,8 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList """ try: parent_owner = None + + # Get database_entity from context (should be cached by get_database_owner_ref) database_entity = getattr(self.context.get(), "database_entity", None) if database_entity: db_owners = database_entity.owners @@ -654,6 +674,26 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList parent_owner=parent_owner, ) if owner_ref: + # After resolving owner, fetch and cache the schema entity for table inheritance + # This avoids API calls when processing tables + try: + schema_fqn_full = fqn.build( + metadata=self.metadata, + entity_type=DatabaseSchema, + service_name=self.context.get().database_service, + database_name=self.context.get().database, + schema_name=schema_name, + ) + schema_entity = self.metadata.get_by_name( + entity=DatabaseSchema, + fqn=schema_fqn_full, + fields=["owners"], + ) + if schema_entity: + self.context.get().upsert(key="database_schema_entity", value=schema_entity) + except Exception as cache_exc: + logger.debug(f"Could not cache schema entity: {cache_exc}") + return owner_ref except Exception as exc: @@ -679,6 +719,8 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]: """ try: parent_owner = None + + # Get database_schema_entity from context (should be cached by get_schema_owner_ref) database_schema_entity = getattr( self.context.get(), "database_schema_entity", None )