Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import traceback
from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Optional, Set, Tuple
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

from pydantic import BaseModel, Field
from sqlalchemy.engine import Inspector
Expand Down Expand Up @@ -596,7 +596,6 @@ def get_database_owner_ref(
EntityReferenceList with owner or None
"""
try:
# Priority 1: Use ownerConfig if configured
if (
hasattr(self.source_config, "ownerConfig")
and self.source_config.ownerConfig
Expand All @@ -609,6 +608,25 @@ def get_database_owner_ref(
parent_owner=None, # Database is top level
)
if owner_ref:
# After resolving owner, fetch and cache the database entity for schema inheritance
# This avoids API calls when processing schemas
try:
database_fqn = fqn.build(
metadata=self.metadata,
entity_type=Database,
service_name=self.context.get().database_service,
database_name=database_name,
)
database_entity = self.metadata.get_by_name(
entity=Database,
fqn=database_fqn,
fields=["owners"],
)
if database_entity:
self.context.get().upsert(key="database_entity", value=database_entity)
except Exception as cache_exc:
logger.debug(f"Could not cache database entity: {cache_exc}")

return owner_ref

except Exception as exc:
Expand All @@ -634,6 +652,8 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
"""
try:
parent_owner = None

# Get database_entity from context (should be cached by get_database_owner_ref)
database_entity = getattr(self.context.get(), "database_entity", None)
if database_entity:
db_owners = database_entity.owners
Expand All @@ -654,6 +674,26 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
parent_owner=parent_owner,
)
if owner_ref:
# After resolving owner, fetch and cache the schema entity for table inheritance
# This avoids API calls when processing tables
try:
schema_fqn_full = fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=schema_name,
)
schema_entity = self.metadata.get_by_name(
entity=DatabaseSchema,
fqn=schema_fqn_full,
fields=["owners"],
)
if schema_entity:
self.context.get().upsert(key="database_schema_entity", value=schema_entity)
except Exception as cache_exc:
logger.debug(f"Could not cache schema entity: {cache_exc}")

return owner_ref

except Exception as exc:
Expand All @@ -679,6 +719,8 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
"""
try:
parent_owner = None

# Get database_schema_entity from context (should be cached by get_schema_owner_ref)
database_schema_entity = getattr(
self.context.get(), "database_schema_entity", None
)
Expand Down
Loading