Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions app/data/repositories/layer0/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def register_records(self, table_id: int, record_ids: list[str]) -> None:
if len(record_ids) == 0:
raise RuntimeError("no records to upsert")

query = "INSERT INTO rawdata.objects (id, table_id) VALUES "
query = "INSERT INTO layer0.objects (id, table_id) VALUES "
params = []
values = []

Expand All @@ -37,12 +37,12 @@ def get_processed_records(

where_stmnt = []
join_tables = """
FROM rawdata.objects AS o
JOIN rawdata.crossmatch AS c ON o.id = c.object_id
FROM layer0.objects AS o
JOIN layer0.crossmatch AS c ON o.id = c.object_id
"""

if table_name is not None:
join_tables += " JOIN rawdata.tables AS t ON o.table_id = t.id"
join_tables += " JOIN layer0.tables AS t ON o.table_id = t.id"
where_stmnt.append("t.table_name = %s")
params.append(table_name)

Expand Down Expand Up @@ -108,8 +108,8 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
self._storage.query,
"""
SELECT COALESCE(status, 'unprocessed') AS status, COUNT(1)
FROM rawdata.crossmatch AS p
RIGHT JOIN rawdata.objects AS o ON p.object_id = o.id
FROM layer0.crossmatch AS p
RIGHT JOIN layer0.objects AS o ON p.object_id = o.id
WHERE o.table_id = %s
GROUP BY status""",
params=[table_id],
Expand All @@ -118,8 +118,8 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
self._storage.query_one,
"""
SELECT COUNT(1) AS cnt, MAX(t.modification_dt) AS modification_dt
FROM rawdata.objects AS o
JOIN rawdata.tables AS t ON o.table_id = t.id
FROM layer0.objects AS o
JOIN layer0.tables AS t ON o.table_id = t.id
WHERE table_id = %s""",
params=[table_id],
)
Expand All @@ -143,7 +143,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
)

def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:
query = "INSERT INTO rawdata.crossmatch (object_id, status, metadata) VALUES "
query = "INSERT INTO layer0.crossmatch (object_id, status, metadata) VALUES "
params = []
values = []

Expand Down Expand Up @@ -194,7 +194,7 @@ def upsert_pgc(self, pgcs: dict[str, int | None]) -> None:
pgcs_to_insert[object_id] = pgc

if pgcs_to_insert:
update_query = "UPDATE rawdata.objects SET pgc = v.pgc FROM (VALUES "
update_query = "UPDATE layer0.objects SET pgc = v.pgc FROM (VALUES "
params = []
values = []

Expand All @@ -203,6 +203,6 @@ def upsert_pgc(self, pgcs: dict[str, int | None]) -> None:
params.extend([object_id, pgc_id])

update_query += ",".join(values)
update_query += ") AS v(object_id, pgc) WHERE rawdata.objects.id = v.object_id"
update_query += ") AS v(object_id, pgc) WHERE layer0.objects.id = v.object_id"

self._storage.exec(update_query, params=params)
4 changes: 2 additions & 2 deletions app/data/repositories/layer0/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu
if column_description.ucd is not None:
column_params["ucd"] = column_description.ucd

modification_query = "UPDATE rawdata.tables SET modification_dt = now() WHERE id = %s"
modification_query = "UPDATE layer0.tables SET modification_dt = now() WHERE id = %s"

with self.with_tx():
self._storage.exec(
Expand All @@ -298,7 +298,7 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu
def _get_table_id(self, table_name: str) -> tuple[int, bool]:
try:
row = self._storage.query_one(
"SELECT id FROM rawdata.tables WHERE table_name = %s",
"SELECT id FROM layer0.tables WHERE table_name = %s",
params=[table_name],
)
except RuntimeError:
Expand Down
12 changes: 6 additions & 6 deletions app/data/repositories/layer1.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ def get_new_observations(

query = f"""SELECT *
FROM {object_cls.layer1_table()} AS l1
JOIN rawdata.objects AS o ON l1.object_id = o.id
JOIN layer0.objects AS o ON l1.object_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM {object_cls.layer1_table()} AS l1
JOIN rawdata.objects AS o ON l1.object_id = o.id
JOIN layer0.objects AS o ON l1.object_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
Expand Down Expand Up @@ -156,8 +156,8 @@ def query_records(
join_parts.append(f"FULL OUTER JOIN {alias} USING (object_id)")

if table_name:
where_conditions.append("rawdata.objects.table_id = rawdata.tables.id")
where_conditions.append("rawdata.tables.table_name = %s")
where_conditions.append("layer0.objects.table_id = layer0.tables.id")
where_conditions.append("layer0.tables.table_name = %s")
params.append(table_name)

if offset:
Expand All @@ -175,8 +175,8 @@ def query_records(
if table_name:
coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")"
query += f"""
JOIN rawdata.objects ON {coalesce_expr} = rawdata.objects.id
JOIN rawdata.tables ON rawdata.objects.table_id = rawdata.tables.id
JOIN layer0.objects ON {coalesce_expr} = layer0.objects.id
JOIN layer0.tables ON layer0.objects.table_id = layer0.tables.id
"""

if where_conditions:
Expand Down
6 changes: 3 additions & 3 deletions app/data/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ def render_query(query_string: str, **kwargs) -> str:
"""

INSERT_TABLE_REGISTRY_ITEM = """
INSERT INTO rawdata.tables (bib, table_name, datatype)
INSERT INTO layer0.tables (bib, table_name, datatype)
VALUES (%s, %s, %s)
RETURNING id
"""

GET_RAWDATA_TABLE = """
SELECT table_name, modification_dt FROM rawdata.tables
SELECT table_name, modification_dt FROM layer0.tables
WHERE id = %s
"""

Expand All @@ -48,6 +48,6 @@ def render_query(query_string: str, **kwargs) -> str:

FETCH_RAWDATA_REGISTRY = """
SELECT *
FROM rawdata.tables
FROM layer0.tables
WHERE table_name=%s
"""
2 changes: 1 addition & 1 deletion app/lib/storage/postgres/postgres_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def dump(self, obj: Any) -> bytes | bytearray | memoryview:

DEFAULT_ENUMS = [
(enums.DataType, "common.datatype"),
(enums.RecordCrossmatchStatus, "rawdata.crossmatch_status"),
(enums.RecordCrossmatchStatus, "layer0.crossmatch_status"),
]


Expand Down
40 changes: 40 additions & 0 deletions postgres/migrations/V015__move_rawdata_tables_to_layer0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* pgmigrate-encoding: utf-8 */

-- Drop foreign key constraints that reference tables to be moved
ALTER TABLE rawdata.crossmatch DROP CONSTRAINT crossmatch_object_id_fkey;
ALTER TABLE icrs.data DROP CONSTRAINT data_object_id_fkey;
ALTER TABLE designation.data DROP CONSTRAINT data_object_id_fkey;
ALTER TABLE cz.data DROP CONSTRAINT data_object_id_fkey;
ALTER TABLE layer0.column_modifiers DROP CONSTRAINT column_modifiers_table_id_fkey;
ALTER TABLE rawdata.objects DROP CONSTRAINT objects_table_id_fkey;
DROP TRIGGER IF EXISTS set_modification_time_on_pgc_update ON rawdata.objects;

ALTER TABLE rawdata.tables SET SCHEMA layer0;
ALTER TABLE rawdata.objects SET SCHEMA layer0;
ALTER TYPE rawdata.crossmatch_status SET SCHEMA layer0;
ALTER TABLE rawdata.crossmatch SET SCHEMA layer0;

-- Recreate constraints
ALTER TABLE layer0.crossmatch
ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id);

ALTER TABLE icrs.data
ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id);

ALTER TABLE designation.data
ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id);

ALTER TABLE cz.data
ADD FOREIGN KEY (object_id) REFERENCES layer0.objects(id) ON DELETE restrict ON UPDATE cascade;

ALTER TABLE layer0.column_modifiers
ADD FOREIGN KEY (table_id) REFERENCES layer0.tables(id);

ALTER TABLE layer0.objects
ADD CONSTRAINT objects_table_id_fkey FOREIGN KEY (table_id) REFERENCES layer0.tables(id);

CREATE TRIGGER set_modification_time_on_pgc_update
BEFORE UPDATE OF pgc ON layer0.objects
FOR EACH ROW
EXECUTE FUNCTION rawdata_set_modification_time();

4 changes: 2 additions & 2 deletions tests/lib/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ def clear(self):
return

self.storage.exec("TRUNCATE common.bib CASCADE")
self.storage.exec("TRUNCATE rawdata.tables CASCADE")
self.storage.exec("TRUNCATE layer0.tables CASCADE")
tables = self.storage.query("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'rawdata' AND table_name NOT IN ('tables', 'pgc', 'objects', 'crossmatch')
WHERE table_schema = 'rawdata'
""")
for table in tables:
self.storage.exec(f"DROP TABLE rawdata.{table['table_name']} CASCADE")
Expand Down
Loading