Skip to content

Commit 0cc7bca

Browse files
Refactor: Use context for owner inheritance, remove cache
Co-authored-by: yourton.ma <yourton.ma@gmail.com>
1 parent 7657c85 commit 0cc7bca

File tree

2 files changed

+26
-31
lines changed

2 files changed

+26
-31
lines changed

ingestion/src/metadata/ingestion/api/topology_runner.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,20 @@ def yield_and_update_context(
446446
)
447447

448448
self.context.get().update_context_name(stage=stage, right=right)
449+
450+
# Store the entity in context for downstream inheritance
451+
# After yielding the request to sink, fetch the entity from API to get the complete entity
452+
# with all resolved fields (like owners) for child entities to inherit from
453+
if stage.context and not same_fingerprint:
454+
# Fetch the entity after it has been created/updated by the sink
455+
entity = self.metadata.get_by_name(
456+
entity=stage.type_,
457+
fqn=entity_fqn,
458+
fields=["owners"], # Only fetch owners field for performance
459+
)
460+
if entity:
461+
entity_context_key = f"{stage.context}_entity"
462+
self.context.get().upsert(key=entity_context_key, value=entity)
449463

450464
@yield_and_update_context.register
451465
def _(

ingestion/src/metadata/ingestion/source/database/database_service.py

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,6 @@ class DatabaseServiceSource(
225225
stored_procedure_source_state: Set = set()
226226
database_entity_source_state: Set = set()
227227
schema_entity_source_state: Set = set()
228-
# Cache for schema owners to avoid repeated API calls during table ingestion
229-
schema_owner_cache: Dict[str, Optional[str]] = {}
230228
# Big union of types we want to fetch dynamically
231229
service_connection: DatabaseConnection.model_fields["config"].annotation
232230

@@ -598,7 +596,6 @@ def get_database_owner_ref(
598596
EntityReferenceList with owner or None
599597
"""
600598
try:
601-
# Priority 1: Use ownerConfig if configured
602599
if (
603600
hasattr(self.source_config, "ownerConfig")
604601
and self.source_config.ownerConfig
@@ -611,9 +608,6 @@ def get_database_owner_ref(
611608
parent_owner=None, # Database is top level
612609
)
613610
if owner_ref:
614-
# Cache the resolved owner for schema inheritance
615-
if owner_ref.root:
616-
self.schema_owner_cache[database_name] = owner_ref.root[0].name
617611
return owner_ref
618612

619613
except Exception as exc:
@@ -638,14 +632,12 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
638632
EntityReferenceList with owner or None
639633
"""
640634
try:
641-
# Get parent owner from cached database owner
642635
parent_owner = None
643-
database_name = self.context.get().database
644-
if database_name:
645-
parent_owner = self.schema_owner_cache.get(database_name)
646-
logger.debug(
647-
f"Schema '{schema_name}': Using cached database owner from '{database_name}': {parent_owner}"
648-
)
636+
database_entity = getattr(self.context.get(), "database_entity", None)
637+
if database_entity:
638+
db_owners = database_entity.owners
639+
if db_owners and db_owners.root:
640+
parent_owner = db_owners.root[0].name
649641

650642
schema_fqn = f"{self.context.get().database}.{schema_name}"
651643

@@ -661,9 +653,6 @@ def get_schema_owner_ref(self, schema_name: str) -> Optional[EntityReferenceList
661653
parent_owner=parent_owner,
662654
)
663655
if owner_ref:
664-
# Cache the resolved owner for table inheritance
665-
if owner_ref.root:
666-
self.schema_owner_cache[schema_fqn] = owner_ref.root[0].name
667656
return owner_ref
668657

669658
except Exception as exc:
@@ -689,21 +678,13 @@ def get_owner_ref(self, table_name: str) -> Optional[EntityReferenceList]:
689678
"""
690679
try:
691680
parent_owner = None
692-
693-
# Get parent owner from cached schema owner
694-
# The schema owner was cached when the schema was processed
695-
if (
696-
hasattr(self.source_config, "ownerConfig")
697-
and self.source_config.ownerConfig
698-
):
699-
schema_fqn = f"{self.context.get().database}.{self.context.get().database_schema}"
700-
701-
# Use cached schema owner if available
702-
parent_owner = self.schema_owner_cache.get(schema_fqn)
703-
704-
logger.debug(
705-
f"Table '{table_name}': Using cached schema owner from '{schema_fqn}': {parent_owner}"
706-
)
681+
database_schema_entity = getattr(
682+
self.context.get(), "database_schema_entity", None
683+
)
684+
if database_schema_entity:
685+
schema_owners = database_schema_entity.owners
686+
if schema_owners and schema_owners.root:
687+
parent_owner = schema_owners.root[0].name
707688

708689
table_fqn = f"{self.context.get().database}.{self.context.get().database_schema}.{table_name}"
709690

0 commit comments

Comments
 (0)