From 3e0056510188eb5ee079ec851efd0c9b35a4b970 Mon Sep 17 00:00:00 2001 From: kraysent Date: Thu, 13 Nov 2025 10:26:49 +0000 Subject: [PATCH 1/2] #329: rename objects to records --- .../layer0/{objects.py => records.py} | 48 ++++++++--------- app/data/repositories/layer0/repository.py | 14 ++--- app/data/repositories/layer0/tables.py | 4 +- app/data/repositories/layer1.py | 52 +++++++++---------- app/data/repositories/layer2/repository.py | 36 ++++++------- app/domain/homogenization/apply.py | 2 +- .../V016__rename_objects_to_records.sql | 37 +++++++++++++ 7 files changed, 114 insertions(+), 79 deletions(-) rename app/data/repositories/layer0/{objects.py => records.py} (82%) create mode 100644 postgres/migrations/V016__rename_objects_to_records.sql diff --git a/app/data/repositories/layer0/objects.py b/app/data/repositories/layer0/records.py similarity index 82% rename from app/data/repositories/layer0/objects.py rename to app/data/repositories/layer0/records.py index 2475b90d..31de53e5 100644 --- a/app/data/repositories/layer0/objects.py +++ b/app/data/repositories/layer0/records.py @@ -7,12 +7,12 @@ from app.lib.web.errors import DatabaseError -class Layer0ObjectRepository(postgres.TransactionalPGRepository): +class Layer0RecordRepository(postgres.TransactionalPGRepository): def register_records(self, table_id: int, record_ids: list[str]) -> None: if len(record_ids) == 0: raise RuntimeError("no records to upsert") - query = "INSERT INTO layer0.objects (id, table_id) VALUES " + query = "INSERT INTO layer0.records (id, table_id) VALUES " params = [] values = [] @@ -37,8 +37,8 @@ def get_processed_records( where_stmnt = [] join_tables = """ - FROM layer0.objects AS o - JOIN layer0.crossmatch AS c ON o.id = c.object_id + FROM layer0.records AS o + JOIN layer0.crossmatch AS c ON o.id = c.record_id """ if table_name is not None: @@ -70,7 +70,7 @@ def get_processed_records( rows = self._storage.query(query, params=params) - objects = [] + records = [] for row in rows: status = row["status"] @@ -84,7 +84,7 @@ def get_processed_records( else: ci_result = model.CIResultObjectCollision(pgcs=metadata["possible_matches"]) - objects.append( + records.append( model.RecordCrossmatch( model.Record( row["id"], @@ -94,7 +94,7 @@ def get_processed_records( ) ) - return objects + return records def get_table_statistics(self, table_name: str) -> model.TableStatistics: table_id_row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name]) @@ -109,7 +109,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: """ SELECT COALESCE(status, 'unprocessed') AS status, COUNT(1) FROM layer0.crossmatch AS p - RIGHT JOIN layer0.objects AS o ON p.object_id = o.id + RIGHT JOIN layer0.records AS o ON p.record_id = o.id WHERE o.table_id = %s GROUP BY status""", params=[table_id], @@ -118,7 +118,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: self._storage.query_one, """ SELECT COUNT(1) AS cnt, MAX(t.modification_dt) AS modification_dt - FROM layer0.objects AS o + FROM layer0.records AS o JOIN layer0.tables AS t ON o.table_id = t.id WHERE table_id = %s""", params=[table_id], @@ -143,11 +143,11 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: ) def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None: - query = "INSERT INTO layer0.crossmatch (object_id, status, metadata) VALUES " + query = "INSERT INTO layer0.crossmatch (record_id, status, metadata) VALUES " params = [] values = [] - for object_id, result in data.items(): + for record_id, result in data.items(): values.append("(%s, %s, %s)") status = None @@ -165,44 +165,44 @@ def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None: meta = {"possible_matches": possible_pgcs} - params.extend([object_id, status, json.dumps(meta)]) + params.extend([record_id, status, json.dumps(meta)]) query += ",".join(values) - query += " ON CONFLICT (object_id) DO UPDATE SET status = EXCLUDED.status, metadata = EXCLUDED.metadata" + query += " ON CONFLICT (record_id) DO UPDATE SET status = EXCLUDED.status, metadata = EXCLUDED.metadata" self._storage.exec(query, params=params) def upsert_pgc(self, pgcs: dict[str, int | None]) -> None: pgcs_to_insert: dict[str, int] = {} - new_objects = [object_id for object_id, pgc in pgcs.items() if pgc is None] + new_records = [record_id for record_id, pgc in pgcs.items() if pgc is None] - if new_objects: + if new_records: result = self._storage.query( f"""INSERT INTO common.pgc - VALUES {",".join(["(DEFAULT)"] * len(new_objects))} + VALUES {",".join(["(DEFAULT)"] * len(new_records))} RETURNING id""", ) ids = [row["id"] for row in result] - for object_id, pgc_id in zip(new_objects, ids, strict=False): - pgcs_to_insert[object_id] = pgc_id + for record_id, pgc_id in zip(new_records, ids, strict=False): + pgcs_to_insert[record_id] = pgc_id - for object_id, pgc in pgcs.items(): + for record_id, pgc in pgcs.items(): if pgc is not None: - pgcs_to_insert[object_id] = pgc + pgcs_to_insert[record_id] = pgc if pgcs_to_insert: - update_query = "UPDATE layer0.objects SET pgc = v.pgc FROM (VALUES " + update_query = "UPDATE layer0.records SET pgc = v.pgc FROM (VALUES " params = [] values = [] - for object_id, pgc_id in pgcs_to_insert.items(): + for record_id, pgc_id in pgcs_to_insert.items(): values.append("(%s, %s)") - params.extend([object_id, pgc_id]) + params.extend([record_id, pgc_id]) update_query += ",".join(values) - update_query += ") AS v(object_id, pgc) WHERE layer0.objects.id = v.object_id" + update_query += ") AS v(record_id, pgc) WHERE layer0.records.id = v.record_id" self._storage.exec(update_query, params=params) diff --git a/app/data/repositories/layer0/repository.py b/app/data/repositories/layer0/repository.py index fd64783e..9c6b3c00 100644 --- a/app/data/repositories/layer0/repository.py +++ b/app/data/repositories/layer0/repository.py @@ -2,7 +2,7 @@ from astropy import table from app.data import model -from app.data.repositories.layer0 import homogenization, modifiers, objects, tables +from app.data.repositories.layer0 import homogenization, modifiers, records, tables from app.lib.storage import enums, postgres @@ -12,7 +12,7 @@ def __init__(self, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLo super().__init__(storage) self.table_repo = tables.Layer0TableRepository(storage) - self.objects_repo = objects.Layer0ObjectRepository(storage) + self.records_repo = records.Layer0RecordRepository(storage) self.homogenization_repo = homogenization.Layer0HomogenizationRepository(storage) self.modifier_repo = modifiers.Layer0ModifiersRepository(storage) @@ -54,10 +54,10 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu return self.table_repo.update_column_metadata(table_name, column_description) def register_records(self, table_id: int, record_ids: list[str]) -> None: - return self.objects_repo.register_records(table_id, record_ids) + return self.records_repo.register_records(table_id, record_ids) def get_table_statistics(self, table_name: str) -> model.TableStatistics: - return self.objects_repo.get_table_statistics(table_name) + return self.records_repo.get_table_statistics(table_name) def get_processed_records( self, @@ -67,13 +67,13 @@ def get_processed_records( status: enums.RecordCrossmatchStatus | None = None, record_id: str | None = None, ) -> list[model.RecordCrossmatch]: - return self.objects_repo.get_processed_records(limit, offset, table_name, status, record_id) + return self.records_repo.get_processed_records(limit, offset, table_name, status, record_id) def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None: - return self.objects_repo.add_crossmatch_result(data) + return self.records_repo.add_crossmatch_result(data) def upsert_pgc(self, pgcs: dict[str, int | None]) -> None: - return self.objects_repo.upsert_pgc(pgcs) + return self.records_repo.upsert_pgc(pgcs) def get_homogenization_rules(self) -> list[model.HomogenizationRule]: return self.homogenization_repo.get_homogenization_rules() diff --git a/app/data/repositories/layer0/tables.py b/app/data/repositories/layer0/tables.py index 1541198f..5e53ce25 100644 --- a/app/data/repositories/layer0/tables.py +++ b/app/data/repositories/layer0/tables.py @@ -120,7 +120,7 @@ def fetch_table( :param columns: select only given columns :param order_column: orders result by a provided column :param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`. - :param offset: allows to retrieve rows starting from the `offset` object_id + :param offset: allows to retrieve rows starting from the `offset` record_id :param limit: allows to retrieve no more than `limit` rows """ @@ -180,7 +180,7 @@ def fetch_raw_data( :param columns: select only given columns :param order_column: orders result by a provided column :param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`. - :param offset: allows to retrieve rows starting from the `offset` object_id + :param offset: allows to retrieve rows starting from the `offset` record_id :param limit: allows to retrieve no more than `limit` rows :return: Layer0RawData """ diff --git a/app/data/repositories/layer1.py b/app/data/repositories/layer1.py index d2ed1079..486efc93 100644 --- a/app/data/repositories/layer1.py +++ b/app/data/repositories/layer1.py @@ -25,14 +25,14 @@ def save_data(self, records: list[model.Record]) -> None: if not table_items: continue - columns = ["object_id"] + columns = ["record_id"] columns.extend(table_items[0][1].layer1_keys()) params = [] values = [] for record_id, catalog_object in table_items: data = catalog_object.layer1_data() - data["object_id"] = record_id + data["record_id"] = record_id params.extend([data[column] for column in columns]) values.append(",".join(["%s"] * len(columns))) @@ -42,7 +42,7 @@ def save_data(self, records: list[model.Record]) -> None: query = f""" INSERT INTO {table} ({", ".join(columns)}) VALUES {", ".join([f"({value})" for value in values])} - ON CONFLICT (object_id) DO UPDATE SET {on_conflict_update_statement} + ON CONFLICT (record_id) DO UPDATE SET {on_conflict_update_statement} """ self._storage.exec(query, params=params) @@ -70,11 +70,11 @@ def get_new_observations( query = f"""SELECT * FROM {object_cls.layer1_table()} AS l1 - JOIN layer0.objects AS o ON l1.object_id = o.id + JOIN layer0.records AS o ON l1.record_id = o.id WHERE o.pgc IN ( SELECT DISTINCT o.pgc FROM {object_cls.layer1_table()} AS l1 - JOIN layer0.objects AS o ON l1.object_id = o.id + JOIN layer0.records AS o ON l1.record_id = o.id WHERE o.modification_time > %s AND o.pgc > %s ORDER BY o.pgc LIMIT %s @@ -86,21 +86,21 @@ def get_new_observations( record_data: dict[tuple[int, str], list[model.CatalogObject]] = {} for row in rows: - object_id = row.pop("object_id") + record_id = row.pop("record_id") pgc = int(row.pop("pgc")) catalog_object = object_cls.from_layer1(row) - key = (pgc, object_id) + key = (pgc, record_id) if key not in record_data: record_data[key] = [] record_data[key].append(catalog_object) - objects: list[model.RecordWithPGC] = [] + records: list[model.RecordWithPGC] = [] for (pgc, record_id), catalog_objects in record_data.items(): record_info = model.Record(id=record_id, data=catalog_objects) - objects.append(model.RecordWithPGC(pgc, record_info)) + records.append(model.RecordWithPGC(pgc, record_info)) - return objects + return records def query_records( self, @@ -130,13 +130,13 @@ def query_records( cte_query = f""" {alias} AS ( - SELECT object_id, {", ".join(catalog_columns)} + SELECT record_id, {", ".join(catalog_columns)} FROM {table_name_layer1} """ cte_where_conditions = [] if record_ids: - cte_where_conditions.append("object_id = ANY(%s)") + cte_where_conditions.append("record_id = ANY(%s)") params.append(record_ids) if cte_where_conditions: @@ -147,56 +147,56 @@ def query_records( select_parts.extend([f'{alias}."{catalog.value}|{column}"' for column in object_cls.layer1_keys()]) select_parts.append( - f'CASE WHEN {alias}.object_id IS NOT NULL THEN true ELSE false END AS "{catalog.value}|_present"' + f'CASE WHEN {alias}.record_id IS NOT NULL THEN true ELSE false END AS "{catalog.value}|_present"' ) if i == 0: join_parts.append(f"FROM {alias}") else: - join_parts.append(f"FULL OUTER JOIN {alias} USING (object_id)") + join_parts.append(f"FULL OUTER JOIN {alias} USING (record_id)") if table_name: - where_conditions.append("layer0.objects.table_id = layer0.tables.id") + where_conditions.append("layer0.records.table_id = layer0.tables.id") where_conditions.append("layer0.tables.table_name = %s") params.append(table_name) if offset: - coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")" + coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.record_id" for i in range(len(catalogs))]) + ")" where_conditions.append(f"{coalesce_expr} > %s") params.append(offset) query = f""" WITH {", ".join(cte_parts)} - SELECT COALESCE({", ".join([f"t{i}.object_id" for i in range(len(catalogs))])}) AS object_id, + SELECT COALESCE({", ".join([f"t{i}.record_id" for i in range(len(catalogs))])}) AS record_id, {", ".join(select_parts)} {" ".join(join_parts)} """ if table_name: - coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")" + coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.record_id" for i in range(len(catalogs))]) + ")" query += f""" - JOIN layer0.objects ON {coalesce_expr} = layer0.objects.id - JOIN layer0.tables ON layer0.objects.table_id = layer0.tables.id + JOIN layer0.records ON {coalesce_expr} = layer0.records.id + JOIN layer0.tables ON layer0.records.table_id = layer0.tables.id """ if where_conditions: query += f" WHERE {' AND '.join(where_conditions)}" - query += " ORDER BY object_id" + query += " ORDER BY record_id" if limit: query += " LIMIT %s" params.append(limit) - objects = self._storage.query(query, params=params) + records = self._storage.query(query, params=params) - return self._group_by_record_id(objects, catalogs) + return self._group_by_record_id(records, catalogs) - def _group_by_record_id(self, objects: list[dict], catalogs: list[model.RawCatalog]) -> list[model.Record]: + def _group_by_record_id(self, records: list[dict], catalogs: list[model.RawCatalog]) -> list[model.Record]: record_data: dict[str, list[model.CatalogObject]] = {} - for row in objects: - record_id = row["object_id"] + for row in records: + record_id = row["record_id"] if record_id not in record_data: record_data[record_id] = [] diff --git a/app/data/repositories/layer2/repository.py b/app/data/repositories/layer2/repository.py index 6382f432..06cae2cf 100644 --- a/app/data/repositories/layer2/repository.py +++ b/app/data/repositories/layer2/repository.py @@ -74,16 +74,16 @@ def _construct_batch_query( ) -> tuple[str, list[Any]]: if not search_params: # If no search parameters, return empty result - return "SELECT NULL as object_id, NULL as pgc WHERE FALSE", [] + return "SELECT NULL as record_id, NULL as pgc WHERE FALSE", [] query = """ WITH search_params AS ( SELECT * FROM ( VALUES {values} - ) AS t(object_id, search_type, params) + ) AS t(record_id, search_type, params) ) - SELECT sp.object_id, pgc, {columns} + SELECT sp.record_id, pgc, {columns} FROM search_params sp CROSS JOIN {joined_tables} WHERE {conditions} @@ -93,9 +93,9 @@ def _construct_batch_query( values_lines = [] params = [] - for object_id, sparams in search_params.items(): + for record_id, sparams in search_params.items(): values_lines.append("(%s, %s, %s::jsonb)") - params.extend([object_id, sparams.name(), json.dumps(sparams.get_params())]) + params.extend([record_id, sparams.name(), json.dumps(sparams.get_params())]) columns = [] table_names = [] @@ -104,12 +104,10 @@ def _construct_batch_query( object_cls = model.get_catalog_object_type(catalog) table_names.append(object_cls.layer2_table()) - columns.extend( - [ - f'{object_cls.layer2_table()}.{column} AS "{catalog.value}|{column}"' - for column in object_cls.layer2_keys() - ] - ) + columns.extend([ + f'{object_cls.layer2_table()}.{column} AS "{catalog.value}|{column}"' + for column in object_cls.layer2_keys() + ]) columns.append( f"CASE WHEN {object_cls.layer2_table()}.pgc IS NOT NULL " f'THEN true ELSE false END AS "{catalog.value}|_present"' @@ -144,17 +142,17 @@ def query_batch( ) -> dict[str, list[model.Layer2Object]]: query, params = self._construct_batch_query(catalogs, search_types, search_params, limit, offset) - objects = self._storage.query(query, params=params) + records = self._storage.query(query, params=params) - objects_by_id = containers.group_by(objects, key_func=lambda obj: str(obj["object_id"])) + records_by_id = containers.group_by(records, key_func=lambda obj: str(obj["record_id"])) result: dict[str, list[model.Layer2Object]] = {} - for object_id, objects in objects_by_id.items(): - if object_id not in result: - result[object_id] = [] + for record_id, records in records_by_id.items(): + if record_id not in result: + result[record_id] = [] - result[object_id].extend(self._group_by_pgc(objects)) + result[record_id].extend(self._group_by_pgc(records)) return result @@ -168,8 +166,8 @@ def _group_by_pgc(self, objects: list[rows.DictRow]) -> list[model.Layer2Object] # TODO: what if for each pgc there are multiple rows? For example, if # the catalog does not have a UNIQUE constraint on pgc. obj = pgc_objects[0] - if "object_id" in obj: - obj.pop("object_id") + if "record_id" in obj: + obj.pop("record_id") if "pgc" in obj: obj.pop("pgc") diff --git a/app/domain/homogenization/apply.py b/app/domain/homogenization/apply.py index a9ae20c7..87122cec 100644 --- a/app/domain/homogenization/apply.py +++ b/app/domain/homogenization/apply.py @@ -77,7 +77,7 @@ def apply(self, data: table.Table) -> list[data_model.Record]: if not self.ignore_errors: raise e - logger.warn("Error creating catalog object", object_id=record_id, error=e, data_dict=data_dict) + logger.warn("Error creating catalog object", record_id=record_id, error=e, data_dict=data_dict) continue records[record_id].data.append(catalog_obj) diff --git a/postgres/migrations/V016__rename_objects_to_records.sql b/postgres/migrations/V016__rename_objects_to_records.sql new file mode 100644 index 00000000..4de6fba1 --- /dev/null +++ b/postgres/migrations/V016__rename_objects_to_records.sql @@ -0,0 +1,37 @@ +/* pgmigrate-encoding: utf-8 */ + +ALTER TABLE layer0.crossmatch DROP CONSTRAINT IF EXISTS crossmatch_object_id_fkey; +ALTER TABLE icrs.data DROP CONSTRAINT IF EXISTS data_object_id_fkey; +ALTER TABLE designation.data DROP CONSTRAINT IF EXISTS designation_data_object_id_fkey; +ALTER TABLE cz.data DROP CONSTRAINT IF EXISTS data_object_id_fkey; +ALTER TABLE layer0.objects DROP CONSTRAINT IF EXISTS objects_table_id_fkey; + +DROP TRIGGER IF EXISTS set_modification_time_on_pgc_update ON layer0.objects; + +ALTER TABLE layer0.objects RENAME TO records; + +ALTER TABLE layer0.crossmatch RENAME COLUMN object_id TO record_id; +ALTER TABLE icrs.data RENAME COLUMN object_id TO record_id; +ALTER TABLE designation.data RENAME COLUMN object_id TO record_id; +ALTER TABLE cz.data RENAME COLUMN object_id TO record_id; + +ALTER TABLE layer0.crossmatch +ADD FOREIGN KEY (record_id) REFERENCES layer0.records(id); + +ALTER TABLE icrs.data +ADD FOREIGN KEY (record_id) REFERENCES layer0.records(id); + +ALTER TABLE designation.data +ADD FOREIGN KEY (record_id) REFERENCES layer0.records(id); + +ALTER TABLE cz.data +ADD FOREIGN KEY (record_id) REFERENCES layer0.records(id) ON DELETE restrict ON UPDATE cascade; + +ALTER TABLE layer0.records +ADD FOREIGN KEY (table_id) REFERENCES layer0.tables(id); + +CREATE TRIGGER set_modification_time_on_pgc_update +BEFORE UPDATE OF pgc ON layer0.records +FOR EACH ROW +EXECUTE FUNCTION rawdata_set_modification_time(); + From 77c829790eea433a2ba86128054eae0db59bf323 Mon Sep 17 00:00:00 2001 From: kraysent Date: Thu, 13 Nov 2025 10:28:38 +0000 Subject: [PATCH 2/2] style --- app/data/repositories/layer2/repository.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/app/data/repositories/layer2/repository.py b/app/data/repositories/layer2/repository.py index 06cae2cf..b99edf89 100644 --- a/app/data/repositories/layer2/repository.py +++ b/app/data/repositories/layer2/repository.py @@ -104,10 +104,12 @@ def _construct_batch_query( object_cls = model.get_catalog_object_type(catalog) table_names.append(object_cls.layer2_table()) - columns.extend([ - f'{object_cls.layer2_table()}.{column} AS "{catalog.value}|{column}"' - for column in object_cls.layer2_keys() - ]) + columns.extend( + [ + f'{object_cls.layer2_table()}.{column} AS "{catalog.value}|{column}"' + for column in object_cls.layer2_keys() + ] + ) columns.append( f"CASE WHEN {object_cls.layer2_table()}.pgc IS NOT NULL " f'THEN true ELSE false END AS "{catalog.value}|_present"'