Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from app.lib.web.errors import DatabaseError


class Layer0ObjectRepository(postgres.TransactionalPGRepository):
class Layer0RecordRepository(postgres.TransactionalPGRepository):
def register_records(self, table_id: int, record_ids: list[str]) -> None:
if len(record_ids) == 0:
raise RuntimeError("no records to upsert")

query = "INSERT INTO layer0.objects (id, table_id) VALUES "
query = "INSERT INTO layer0.records (id, table_id) VALUES "
params = []
values = []

Expand All @@ -37,8 +37,8 @@ def get_processed_records(

where_stmnt = []
join_tables = """
FROM layer0.objects AS o
JOIN layer0.crossmatch AS c ON o.id = c.object_id
FROM layer0.records AS o
JOIN layer0.crossmatch AS c ON o.id = c.record_id
"""

if table_name is not None:
Expand Down Expand Up @@ -70,7 +70,7 @@ def get_processed_records(

rows = self._storage.query(query, params=params)

objects = []
records = []

for row in rows:
status = row["status"]
Expand All @@ -84,7 +84,7 @@ def get_processed_records(
else:
ci_result = model.CIResultObjectCollision(pgcs=metadata["possible_matches"])

objects.append(
records.append(
model.RecordCrossmatch(
model.Record(
row["id"],
Expand All @@ -94,7 +94,7 @@ def get_processed_records(
)
)

return objects
return records

def get_table_statistics(self, table_name: str) -> model.TableStatistics:
table_id_row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name])
Expand All @@ -109,7 +109,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
"""
SELECT COALESCE(status, 'unprocessed') AS status, COUNT(1)
FROM layer0.crossmatch AS p
RIGHT JOIN layer0.objects AS o ON p.object_id = o.id
RIGHT JOIN layer0.records AS o ON p.record_id = o.id
WHERE o.table_id = %s
GROUP BY status""",
params=[table_id],
Expand All @@ -118,7 +118,7 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
self._storage.query_one,
"""
SELECT COUNT(1) AS cnt, MAX(t.modification_dt) AS modification_dt
FROM layer0.objects AS o
FROM layer0.records AS o
JOIN layer0.tables AS t ON o.table_id = t.id
WHERE table_id = %s""",
params=[table_id],
Expand All @@ -143,11 +143,11 @@ def get_table_statistics(self, table_name: str) -> model.TableStatistics:
)

def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:
query = "INSERT INTO layer0.crossmatch (object_id, status, metadata) VALUES "
query = "INSERT INTO layer0.crossmatch (record_id, status, metadata) VALUES "
params = []
values = []

for object_id, result in data.items():
for record_id, result in data.items():
values.append("(%s, %s, %s)")

status = None
Expand All @@ -165,44 +165,44 @@ def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:

meta = {"possible_matches": possible_pgcs}

params.extend([object_id, status, json.dumps(meta)])
params.extend([record_id, status, json.dumps(meta)])

query += ",".join(values)
query += " ON CONFLICT (object_id) DO UPDATE SET status = EXCLUDED.status, metadata = EXCLUDED.metadata"
query += " ON CONFLICT (record_id) DO UPDATE SET status = EXCLUDED.status, metadata = EXCLUDED.metadata"

self._storage.exec(query, params=params)

def upsert_pgc(self, pgcs: dict[str, int | None]) -> None:
pgcs_to_insert: dict[str, int] = {}

new_objects = [object_id for object_id, pgc in pgcs.items() if pgc is None]
new_records = [record_id for record_id, pgc in pgcs.items() if pgc is None]

if new_objects:
if new_records:
result = self._storage.query(
f"""INSERT INTO common.pgc
VALUES {",".join(["(DEFAULT)"] * len(new_objects))}
VALUES {",".join(["(DEFAULT)"] * len(new_records))}
RETURNING id""",
)

ids = [row["id"] for row in result]

for object_id, pgc_id in zip(new_objects, ids, strict=False):
pgcs_to_insert[object_id] = pgc_id
for record_id, pgc_id in zip(new_records, ids, strict=False):
pgcs_to_insert[record_id] = pgc_id

for object_id, pgc in pgcs.items():
for record_id, pgc in pgcs.items():
if pgc is not None:
pgcs_to_insert[object_id] = pgc
pgcs_to_insert[record_id] = pgc

if pgcs_to_insert:
update_query = "UPDATE layer0.objects SET pgc = v.pgc FROM (VALUES "
update_query = "UPDATE layer0.records SET pgc = v.pgc FROM (VALUES "
params = []
values = []

for object_id, pgc_id in pgcs_to_insert.items():
for record_id, pgc_id in pgcs_to_insert.items():
values.append("(%s, %s)")
params.extend([object_id, pgc_id])
params.extend([record_id, pgc_id])

update_query += ",".join(values)
update_query += ") AS v(object_id, pgc) WHERE layer0.objects.id = v.object_id"
update_query += ") AS v(record_id, pgc) WHERE layer0.records.id = v.record_id"

self._storage.exec(update_query, params=params)
14 changes: 7 additions & 7 deletions app/data/repositories/layer0/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from astropy import table

from app.data import model
from app.data.repositories.layer0 import homogenization, modifiers, objects, tables
from app.data.repositories.layer0 import homogenization, modifiers, records, tables
from app.lib.storage import enums, postgres


Expand All @@ -12,7 +12,7 @@ def __init__(self, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLo
super().__init__(storage)

self.table_repo = tables.Layer0TableRepository(storage)
self.objects_repo = objects.Layer0ObjectRepository(storage)
self.records_repo = records.Layer0RecordRepository(storage)
self.homogenization_repo = homogenization.Layer0HomogenizationRepository(storage)
self.modifier_repo = modifiers.Layer0ModifiersRepository(storage)

Expand Down Expand Up @@ -54,10 +54,10 @@ def update_column_metadata(self, table_name: str, column_description: model.Colu
return self.table_repo.update_column_metadata(table_name, column_description)

def register_records(self, table_id: int, record_ids: list[str]) -> None:
return self.objects_repo.register_records(table_id, record_ids)
return self.records_repo.register_records(table_id, record_ids)

def get_table_statistics(self, table_name: str) -> model.TableStatistics:
return self.objects_repo.get_table_statistics(table_name)
return self.records_repo.get_table_statistics(table_name)

def get_processed_records(
self,
Expand All @@ -67,13 +67,13 @@ def get_processed_records(
status: enums.RecordCrossmatchStatus | None = None,
record_id: str | None = None,
) -> list[model.RecordCrossmatch]:
return self.objects_repo.get_processed_records(limit, offset, table_name, status, record_id)
return self.records_repo.get_processed_records(limit, offset, table_name, status, record_id)

def add_crossmatch_result(self, data: dict[str, model.CIResult]) -> None:
return self.objects_repo.add_crossmatch_result(data)
return self.records_repo.add_crossmatch_result(data)

def upsert_pgc(self, pgcs: dict[str, int | None]) -> None:
return self.objects_repo.upsert_pgc(pgcs)
return self.records_repo.upsert_pgc(pgcs)

def get_homogenization_rules(self) -> list[model.HomogenizationRule]:
return self.homogenization_repo.get_homogenization_rules()
Expand Down
4 changes: 2 additions & 2 deletions app/data/repositories/layer0/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def fetch_table(
:param columns: select only given columns
:param order_column: orders result by a provided column
:param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`.
:param offset: allows to retrieve rows starting from the `offset` object_id
:param offset: allows to retrieve rows starting from the `offset` record_id
:param limit: allows to retrieve no more than `limit` rows
"""

Expand Down Expand Up @@ -180,7 +180,7 @@ def fetch_raw_data(
:param columns: select only given columns
:param order_column: orders result by a provided column
:param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`.
:param offset: allows to retrieve rows starting from the `offset` object_id
:param offset: allows to retrieve rows starting from the `offset` record_id
:param limit: allows to retrieve no more than `limit` rows
:return: Layer0RawData
"""
Expand Down
52 changes: 26 additions & 26 deletions app/data/repositories/layer1.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def save_data(self, records: list[model.Record]) -> None:
if not table_items:
continue

columns = ["object_id"]
columns = ["record_id"]
columns.extend(table_items[0][1].layer1_keys())

params = []
values = []
for record_id, catalog_object in table_items:
data = catalog_object.layer1_data()
data["object_id"] = record_id
data["record_id"] = record_id

params.extend([data[column] for column in columns])
values.append(",".join(["%s"] * len(columns)))
Expand All @@ -42,7 +42,7 @@ def save_data(self, records: list[model.Record]) -> None:
query = f"""
INSERT INTO {table} ({", ".join(columns)})
VALUES {", ".join([f"({value})" for value in values])}
ON CONFLICT (object_id) DO UPDATE SET {on_conflict_update_statement}
ON CONFLICT (record_id) DO UPDATE SET {on_conflict_update_statement}
"""

self._storage.exec(query, params=params)
Expand Down Expand Up @@ -70,11 +70,11 @@ def get_new_observations(

query = f"""SELECT *
FROM {object_cls.layer1_table()} AS l1
JOIN layer0.objects AS o ON l1.object_id = o.id
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.pgc IN (
SELECT DISTINCT o.pgc
FROM {object_cls.layer1_table()} AS l1
JOIN layer0.objects AS o ON l1.object_id = o.id
JOIN layer0.records AS o ON l1.record_id = o.id
WHERE o.modification_time > %s AND o.pgc > %s
ORDER BY o.pgc
LIMIT %s
Expand All @@ -86,21 +86,21 @@ def get_new_observations(
record_data: dict[tuple[int, str], list[model.CatalogObject]] = {}

for row in rows:
object_id = row.pop("object_id")
record_id = row.pop("record_id")
pgc = int(row.pop("pgc"))
catalog_object = object_cls.from_layer1(row)

key = (pgc, object_id)
key = (pgc, record_id)
if key not in record_data:
record_data[key] = []
record_data[key].append(catalog_object)

objects: list[model.RecordWithPGC] = []
records: list[model.RecordWithPGC] = []
for (pgc, record_id), catalog_objects in record_data.items():
record_info = model.Record(id=record_id, data=catalog_objects)
objects.append(model.RecordWithPGC(pgc, record_info))
records.append(model.RecordWithPGC(pgc, record_info))

return objects
return records

def query_records(
self,
Expand Down Expand Up @@ -130,13 +130,13 @@ def query_records(

cte_query = f"""
{alias} AS (
SELECT object_id, {", ".join(catalog_columns)}
SELECT record_id, {", ".join(catalog_columns)}
FROM {table_name_layer1}
"""

cte_where_conditions = []
if record_ids:
cte_where_conditions.append("object_id = ANY(%s)")
cte_where_conditions.append("record_id = ANY(%s)")
params.append(record_ids)

if cte_where_conditions:
Expand All @@ -147,56 +147,56 @@ def query_records(

select_parts.extend([f'{alias}."{catalog.value}|{column}"' for column in object_cls.layer1_keys()])
select_parts.append(
f'CASE WHEN {alias}.object_id IS NOT NULL THEN true ELSE false END AS "{catalog.value}|_present"'
f'CASE WHEN {alias}.record_id IS NOT NULL THEN true ELSE false END AS "{catalog.value}|_present"'
)

if i == 0:
join_parts.append(f"FROM {alias}")
else:
join_parts.append(f"FULL OUTER JOIN {alias} USING (object_id)")
join_parts.append(f"FULL OUTER JOIN {alias} USING (record_id)")

if table_name:
where_conditions.append("layer0.objects.table_id = layer0.tables.id")
where_conditions.append("layer0.records.table_id = layer0.tables.id")
where_conditions.append("layer0.tables.table_name = %s")
params.append(table_name)

if offset:
coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")"
coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.record_id" for i in range(len(catalogs))]) + ")"
where_conditions.append(f"{coalesce_expr} > %s")
params.append(offset)

query = f"""
WITH {", ".join(cte_parts)}
SELECT COALESCE({", ".join([f"t{i}.object_id" for i in range(len(catalogs))])}) AS object_id,
SELECT COALESCE({", ".join([f"t{i}.record_id" for i in range(len(catalogs))])}) AS record_id,
{", ".join(select_parts)}
{" ".join(join_parts)}
"""

if table_name:
coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.object_id" for i in range(len(catalogs))]) + ")"
coalesce_expr = "COALESCE(" + ", ".join([f"t{i}.record_id" for i in range(len(catalogs))]) + ")"
query += f"""
JOIN layer0.objects ON {coalesce_expr} = layer0.objects.id
JOIN layer0.tables ON layer0.objects.table_id = layer0.tables.id
JOIN layer0.records ON {coalesce_expr} = layer0.records.id
JOIN layer0.tables ON layer0.records.table_id = layer0.tables.id
"""

if where_conditions:
query += f" WHERE {' AND '.join(where_conditions)}"

query += " ORDER BY object_id"
query += " ORDER BY record_id"

if limit:
query += " LIMIT %s"
params.append(limit)

objects = self._storage.query(query, params=params)
records = self._storage.query(query, params=params)

return self._group_by_record_id(objects, catalogs)
return self._group_by_record_id(records, catalogs)

def _group_by_record_id(self, objects: list[dict], catalogs: list[model.RawCatalog]) -> list[model.Record]:
def _group_by_record_id(self, records: list[dict], catalogs: list[model.RawCatalog]) -> list[model.Record]:
record_data: dict[str, list[model.CatalogObject]] = {}

for row in objects:
record_id = row["object_id"]
for row in records:
record_id = row["record_id"]
if record_id not in record_data:
record_data[record_id] = []

Expand Down
Loading
Loading