diff --git a/app/data/repositories/layer0/objects.py b/app/data/repositories/layer0/objects.py index ae561770..2475b90d 100644 --- a/app/data/repositories/layer0/objects.py +++ b/app/data/repositories/layer0/objects.py @@ -12,7 +12,7 @@ def register_records(self, table_id: int, record_ids: list[str]) -> None: if len(record_ids) == 0: raise RuntimeError("no records to upsert") - query = "INSERT INTO rawdata.objects (id, table_id) VALUES " + query = "INSERT INTO layer0.objects (id, table_id) VALUES " params = [] values = [] @@ -37,12 +37,12 @@ def get_processed_records( where_stmnt = [] join_tables = """ - FROM rawdata.objects AS o - JOIN rawdata.crossmatch AS c ON o.id = c.object_id + FROM layer0.objects AS o + JOIN layer0.crossmatch AS c ON o.id = c.object_id """ if table_name is not None: - join_tables += " JOIN rawdata.tables AS t ON o.table_id = t.id" + join_tables += " JOIN layer0.tables AS t ON o.table_id = t.id" where_stmnt.append("t.table_name = %s") params.append(table_name) @@ -108,8 +108,8 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: self._storage.query, """ SELECT COALESCE(status, 'unprocessed') AS status, COUNT(1) - FROM rawdata.crossmatch AS p - RIGHT JOIN rawdata.objects AS o ON p.object_id = o.id + FROM layer0.crossmatch AS p + RIGHT JOIN layer0.objects AS o ON p.object_id = o.id WHERE o.table_id = %s GROUP BY status""", params=[table_id], @@ -118,8 +118,8 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: self._storage.query_one, """ SELECT COUNT(1) AS cnt, MAX(t.modification_dt) AS modification_dt - FROM rawdata.objects AS o - JOIN rawdata.tables AS t ON o.table_id = t.id + FROM layer0.objects AS o + JOIN layer0.tables AS t ON o.table_id = t.id WHERE table_id = %s""", params=[table_id], ) @@ -143,7 +143,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics: ) def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None: - query = "INSERT INTO rawdata.crossmatch (object_id, status, metadata) VALUES " + query = "INSERT INTO layer0.crossmatch (object_id, status, metadata) VALUES " params = [] values = [] @@ -194,7 +194,7 @@ def upsert_pgc(self, pgcs: dict[str, int | None]) -> None: pgcs_to_insert[object_id] = pgc if pgcs_to_insert: - update_query = "UPDATE rawdata.objects SET pgc = v.pgc FROM (VALUES " + update_query = "UPDATE layer0.objects SET pgc = v.pgc FROM (VALUES " params = [] values = [] @@ -203,6 +203,6 @@ def upsert_pgc(self, pgcs: dict[str, int | None]) -> None: params.extend([object_id, pgc_id]) update_query += ",".join(values) - update_query += ") AS v(object_id, pgc) WHERE rawdata.objects.id = v.object_id" + update_query += ") AS v(object_id, pgc) WHERE layer0.objects.id = v.object_id" self._storage.exec(update_query, params=params) diff --git a/app/data/repositories/layer0/tables.py b/app/data/repositories/layer0/tables.py index b6ff8a52..1541198f 100644 --- a/app/data/repositories/layer0/tables.py +++ b/app/data/repositories/layer0/tables.py @@ -286,7 +286,7 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu if column_description.ucd is not None: column_params["ucd"] = column_description.ucd - modification_query = "UPDATE rawdata.tables SET modification_dt = now() WHERE id = %s" + modification_query = "UPDATE layer0.tables SET modification_dt = now() WHERE id = %s" with self.with_tx(): self._storage.exec( @@ -298,7 +298,7 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu def _get_table_id(self, table_name: str) -> tuple[int, bool]: try: row = self._storage.query_one( - "SELECT id FROM rawdata.tables WHERE table_name = %s", + "SELECT id FROM layer0.tables WHERE table_name = %s", params=[table_name], ) except RuntimeError: diff --git a/app/data/repositories/layer1.py b/app/data/repositories/layer1.py index 115ff304..d2ed1079 100644 --- a/app/data/repositories/layer1.py +++ b/app/data/repositories/layer1.py @@ -70,11 +70,11 @@ def get_new_observations( query = f"""SELECT * FROM {object_cls.layer1_table()} AS l1 - JOIN rawdata.objects AS o ON l1.object_id = o.id + JOIN layer0.objects AS o ON l1.object_id = o.id WHERE o.pgc IN ( SELECT DISTINCT o.pgc FROM {object_cls.layer1_table()} AS l1 - JOIN rawdata.objects AS o ON l1.object_id = o.id + JOIN layer0.objects AS o ON l1.object_id = o.id WHERE o.modification_time > %s AND o.pgc > %s ORDER BY o.pgc LIMIT %s @@ -156,8 +156,8 @@ def query_records( join_parts.append(f"FULL OUTER JOIN {alias} USING (object_id)") if table_name: - where_conditions.append("rawdata.objects.table_id = rawdata.tables.id") - where_conditions.append("rawdata.tables.table_name = %s") + where_conditions.append("layer0.objects.table_id = layer0.tables.id") + where_conditions.append("layer0.tables.table_name = %s") params.append(table_name) if offset: @@ -175,8 +175,8 @@ def query_records( if table_name: coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")" query += f""" - JOIN rawdata.objects ON {coalesce_expr} = rawdata.objects.id - JOIN rawdata.tables ON rawdata.objects.table_id = rawdata.tables.id + JOIN layer0.objects ON {coalesce_expr} = layer0.objects.id + JOIN layer0.tables ON layer0.objects.table_id = layer0.tables.id """ if where_conditions: diff --git a/app/data/template.py b/app/data/template.py index 5c4e59ab..dbcb1239 100644 --- a/app/data/template.py +++ b/app/data/template.py @@ -28,13 +28,13 @@ def render_query(query_string: str, **kwargs) -> str: """ INSERT_TABLE_REGISTRY_ITEM = """ -INSERT INTO rawdata.tables (bib, table_name, datatype) +INSERT INTO layer0.tables (bib, table_name, datatype) VALUES (%s, %s, %s) RETURNING id """ GET_RAWDATA_TABLE = """ -SELECT table_name, modification_dt FROM rawdata.tables +SELECT table_name, modification_dt FROM layer0.tables WHERE id = %s """ @@ -48,6 +48,6 @@ def render_query(query_string: str, **kwargs) -> str: FETCH_RAWDATA_REGISTRY = """ SELECT * -FROM rawdata.tables +FROM layer0.tables WHERE table_name=%s """ diff --git a/app/lib/storage/postgres/postgres_storage.py b/app/lib/storage/postgres/postgres_storage.py index 1740ba4b..582ea111 100644 --- a/app/lib/storage/postgres/postgres_storage.py +++ b/app/lib/storage/postgres/postgres_storage.py @@ -34,7 +34,7 @@ def dump(self, obj: Any) -> bytes | bytearray | memoryview: DEFAULT_ENUMS = [ (enums.DataType, "common.datatype"), - (enums.RecordCrossmatchStatus, "rawdata.crossmatch_status"), + (enums.RecordCrossmatchStatus, "layer0.crossmatch_status"), ] diff --git a/postgres/migrations/V015__move_rawdata_tables_to_layer0.sql b/postgres/migrations/V015__move_rawdata_tables_to_layer0.sql new file mode 100644 index 00000000..fad5dd3e --- /dev/null +++ b/postgres/migrations/V015__move_rawdata_tables_to_layer0.sql @@ -0,0 +1,40 @@ +/* pgmigrate-encoding: utf-8 */ + +-- Drop foreign key constraints that reference tables to be moved +ALTER TABLE rawdata.crossmatch DROP CONSTRAINT crossmatch_object_id_fkey; +ALTER TABLE icrs.data DROP CONSTRAINT data_object_id_fkey; +ALTER TABLE designation.data DROP CONSTRAINT data_object_id_fkey; +ALTER TABLE cz.data DROP CONSTRAINT data_object_id_fkey; +ALTER TABLE layer0.column_modifiers DROP CONSTRAINT column_modifiers_table_id_fkey; +ALTER TABLE rawdata.objects DROP CONSTRAINT objects_table_id_fkey; +DROP TRIGGER IF EXISTS set_modification_time_on_pgc_update ON rawdata.objects; + +ALTER TABLE rawdata.tables SET SCHEMA layer0; +ALTER TABLE rawdata.objects SET SCHEMA layer0; +ALTER TYPE rawdata.crossmatch_status SET SCHEMA layer0; +ALTER TABLE rawdata.crossmatch SET SCHEMA layer0; + +-- Recreate constraints +ALTER TABLE layer0.crossmatch +ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id); + +ALTER TABLE icrs.data +ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id); + +ALTER TABLE designation.data +ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id); + +ALTER TABLE cz.data +ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id) ON DELETE restrict ON UPDATE cascade; + +ALTER TABLE layer0.column_modifiers +ADD FOREIGN KEY (table_id) REFERENCES layer0.tables(id); + +ALTER TABLE layer0.objects +ADD CONSTRAINT objects_table_id_fkey FOREIGN KEY (table_id) REFERENCES layer0.tables(id); + +CREATE TRIGGER set_modification_time_on_pgc_update +BEFORE UPDATE OF pgc ON layer0.objects +FOR EACH ROW +EXECUTE FUNCTION rawdata_set_modification_time(); + diff --git a/tests/lib/postgres.py b/tests/lib/postgres.py index 35bcdf14..2d2aa8c5 100644 --- a/tests/lib/postgres.py +++ b/tests/lib/postgres.py @@ -122,11 +122,11 @@ def clear(self): return self.storage.exec("TRUNCATE common.bib CASCADE") - self.storage.exec("TRUNCATE rawdata.tables CASCADE") + self.storage.exec("TRUNCATE layer0.tables CASCADE") tables = self.storage.query(""" SELECT table_name FROM information_schema.tables - WHERE table_schema = 'rawdata' AND table_name NOT IN ('tables', 'pgc', 'objects', 'crossmatch') + WHERE table_schema = 'rawdata' """) for table in tables: self.storage.exec(f"DROP TABLE rawdata.{table['table_name']} CASCADE")