Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/data/model/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@dataclass
class Layer0RawData:
table_id: int
table_name: str
data: pandas.DataFrame


Expand Down
27 changes: 23 additions & 4 deletions app/data/repositories/layer0/modifiers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import json

from app.data import model
from app.data import model, template
from app.lib.storage import postgres
from app.lib.web.errors import DatabaseError


class Layer0ModifiersRepository(postgres.TransactionalPGRepository):
def get_modifiers(self, table_id: int) -> list[model.Modifier]:
def get_modifiers(self, table_name: str) -> list[model.Modifier]:
table_id_row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name])
if table_id_row is None:
raise DatabaseError(f"unable to fetch table with name {table_name}")

table_id = table_id_row["id"]

rows = self._storage.query(
"""
SELECT column_name, modifier_name, params
Expand All @@ -21,15 +28,27 @@ def get_modifiers(self, table_id: int) -> list[model.Modifier]:
for row in rows
]

def add_modifiers(self, table_id: int, modifiers: list[model.Modifier]) -> None:
def add_modifiers(self, table_name: str, modifiers: list[model.Modifier]) -> None:
table_id_row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name])
if table_id_row is None:
raise DatabaseError(f"unable to fetch table with name {table_name}")

table_id = table_id_row["id"]

query = """
INSERT INTO layer0.column_modifiers (table_id, column_name, modifier_name, params, sequence) VALUES"""

params = []
values = []
for sequence, modifier in enumerate(modifiers):
params.extend(
[table_id, modifier.column_name, modifier.modifier_name, json.dumps(modifier.params), sequence]
[
table_id,
modifier.column_name,
modifier.modifier_name,
json.dumps(modifier.params),
sequence,
]
)
values.append("(%s, %s, %s, %s, %s)")

Expand Down
8 changes: 7 additions & 1 deletion app/data/repositories/layer0/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@


class Layer0RecordRepository(postgres.TransactionalPGRepository):
def register_records(self, table_id: int, record_ids: list[str]) -> None:
def register_records(self, table_name: str, record_ids: list[str]) -> None:
if len(record_ids) == 0:
raise RuntimeError("no records to upsert")

table_id_row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name])
if table_id_row is None:
raise DatabaseError(f"unable to fetch table with name {table_name}")

table_id = table_id_row["id"]

query = "INSERT INTO layer0.records (id, table_id) VALUES "
params = []
values = []
Expand Down
24 changes: 8 additions & 16 deletions app/data/repositories/layer0/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,26 @@ def fetch_table(

def fetch_raw_data(
self,
table_id: int,
table_name: str,
offset: str | None = None,
columns: list[str] | None = None,
order_column: str | None = None,
order_direction: str = "asc",
limit: int | None = None,
) -> model.Layer0RawData:
return self.table_repo.fetch_raw_data(table_id, offset, columns, order_column, order_direction, limit)
return self.table_repo.fetch_raw_data(table_name, offset, columns, order_column, order_direction, limit)

def fetch_metadata(self, table_id: int) -> model.Layer0TableMeta:
return self.table_repo.fetch_metadata(table_id)
def fetch_metadata(self, table_name: str) -> model.Layer0TableMeta:
return self.table_repo.fetch_metadata(table_name)

def fetch_metadata_by_name(self, table_name: str) -> model.Layer0TableMeta:
return self.table_repo.fetch_metadata_by_name(table_name)

def update_column_metadata(self, table_name: str, column_description: model.ColumnDescription) -> None:
return self.table_repo.update_column_metadata(table_name, column_description)

def register_records(self, table_id: int, record_ids: list[str]) -> None:
return self.records_repo.register_records(table_id, record_ids)
def register_records(self, table_name: str, record_ids: list[str]) -> None:
return self.records_repo.register_records(table_name, record_ids)

def get_table_statistics(self, table_name: str) -> model.TableStatistics:
return self.records_repo.get_table_statistics(table_name)
Expand Down Expand Up @@ -88,15 +88,7 @@ def add_homogenization_params(self, params: list[model.HomogenizationParams]) ->
return self.homogenization_repo.add_homogenization_params(params)

def get_modifiers(self, table_name: str) -> list[model.Modifier]:
meta = self.fetch_metadata_by_name(table_name)
if meta.table_id is None:
raise RuntimeError(f"{table_name} has no table_id")

return self.modifier_repo.get_modifiers(meta.table_id)
return self.modifier_repo.get_modifiers(table_name)

def add_modifier(self, table_name: str, modifiers: list[model.Modifier]) -> None:
meta = self.fetch_metadata_by_name(table_name)
if meta.table_id is None:
raise RuntimeError(f"{table_name} has no table_id")

return self.modifier_repo.add_modifiers(meta.table_id, modifiers)
return self.modifier_repo.add_modifiers(table_name, modifiers)
33 changes: 7 additions & 26 deletions app/data/repositories/layer0/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,9 @@ def insert_raw_data(self, data: model.Layer0RawData) -> None:
"""

if len(data.data) == 0:
log.warn("trying to insert 0 rows into the table", table_id=data.table_id)
log.warn("trying to insert 0 rows into the table", table_name=data.table_name)
return

row = self._storage.query_one(template.GET_RAWDATA_TABLE, params=[data.table_id])
table_name = row.get("table_name")

if table_name is None:
raise DatabaseError(f"unable to fetch table with id {data.table_id}")

fields = data.data.columns

values = []
Expand All @@ -99,7 +93,7 @@ def insert_raw_data(self, data: model.Layer0RawData) -> None:
fields = [f'"{field}"' for field in fields]

query = f"""
INSERT INTO rawdata."{table_name}" ({",".join(fields)})
INSERT INTO rawdata."{data.table_name}" ({",".join(fields)})
VALUES {",".join(values)}
ON CONFLICT DO NOTHING
"""
Expand Down Expand Up @@ -168,28 +162,22 @@ def fetch_table(

def fetch_raw_data(
self,
table_id: int,
table_name: str,
offset: str | None = None,
columns: list[str] | None = None,
order_column: str | None = None,
order_direction: str = "asc",
limit: int | None = None,
) -> model.Layer0RawData:
"""
:param table_id: ID of the raw table
:param table_name: Name of the raw table
:param columns: select only given columns
:param order_column: orders result by a provided column
:param order_direction: if `order_column` is specified, sets order direction. Either `asc` or `desc`.
:param offset: allows to retrieve rows starting from the `offset` record_id
:param limit: allows to retrieve no more than `limit` rows
:return: Layer0RawData
"""
row = self._storage.query_one(template.GET_RAWDATA_TABLE, params=[table_id])
table_name = row.get("table_name")

if table_name is None:
raise DatabaseError(f"unable to fetch table with id {table_id}")

columns_str = ",".join(columns or ["*"])

params = []
Expand All @@ -209,17 +197,10 @@ def fetch_raw_data(
params.append(limit)

rows = self._storage.query(query, params=params)
return model.Layer0RawData(table_id, pandas.DataFrame(rows))

def fetch_metadata(self, table_id: int) -> model.Layer0TableMeta:
row = self._storage.query_one(template.GET_RAWDATA_TABLE, params=[table_id])
table_name = row.get("table_name")
modification_dt: datetime.datetime | None = row.get("modification_dt")
return model.Layer0RawData(table_name, pandas.DataFrame(rows))

if table_name is None:
raise DatabaseError(f"unable to fetch table with id {table_id}")

return self._fetch_metadata_by_name(table_name, modification_dt)
def fetch_metadata(self, table_name: str) -> model.Layer0TableMeta:
return self.fetch_metadata_by_name(table_name)

def fetch_metadata_by_name(self, table_name: str) -> model.Layer0TableMeta:
row = self._storage.query_one(template.FETCH_RAWDATA_REGISTRY, params=[table_name])
Expand Down
10 changes: 5 additions & 5 deletions app/domain/adminapi/table_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,21 @@ def patch_table(self, r: adminapi.PatchTableRequest) -> adminapi.PatchTableRespo

def add_data(self, r: adminapi.AddDataRequest) -> adminapi.AddDataResponse:
data_df = pandas.DataFrame.from_records(r.data)
data_df[repositories.INTERNAL_ID_COLUMN_NAME] = data_df.apply(_get_hash_func(r.table_id), axis=1)
data_df[repositories.INTERNAL_ID_COLUMN_NAME] = data_df.apply(_get_hash_func(r.table_name), axis=1)
data_df = data_df.drop_duplicates(subset=repositories.INTERNAL_ID_COLUMN_NAME, keep="last")

with self.layer0_repo.with_tx():
errgr = concurrency.ErrorGroup()
errgr.run(
self.layer0_repo.insert_raw_data,
model.Layer0RawData(
table_id=r.table_id,
table_name=r.table_name,
data=data_df,
),
)
errgr.run(
self.layer0_repo.register_records,
r.table_id,
r.table_name,
record_ids=data_df[repositories.INTERNAL_ID_COLUMN_NAME].tolist(),
)

Expand Down Expand Up @@ -242,7 +242,7 @@ def _column_description_to_presentation(columns: list[model.ColumnDescription])
return res


def _get_hash_func(table_id: int) -> Callable[[pandas.Series], str]:
def _get_hash_func(table_name: str) -> Callable[[pandas.Series], str]:
def _compute_hash(row: pandas.Series) -> str:
"""
This function applies special algorithm to an iterable to compute stable hash.
Expand All @@ -256,7 +256,7 @@ def _compute_hash(row: pandas.Series) -> str:
data = sorted(data, key=lambda t: t[0])
data_string = json.dumps(data, separators=(",", ":"))

return _hashfunc(f"{table_id}_{data_string}")
return _hashfunc(f"{table_name}_{data_string}")

return _compute_hash

Expand Down
2 changes: 0 additions & 2 deletions app/domain/unification/marking.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ def mark_objects(
ignore_homogenization_errors: bool = True,
) -> None:
meta = layer0_repo.fetch_metadata_by_name(table_name)
if meta.table_id is None:
raise RuntimeError(f"Table {table_name} has no table_id")

h = get_homogenization(layer0_repo, meta, ignore_errors=ignore_homogenization_errors)
modificator = get_modificator(layer0_repo, meta.table_name)
Expand Down
2 changes: 1 addition & 1 deletion app/presentation/adminapi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CreateTableResponse(pydantic.BaseModel):


class AddDataRequest(pydantic.BaseModel):
table_id: int
table_name: str
data: list[dict[str, Any]] = pydantic.Field(
description="""Actual data to append.
Keys in this dictionary must be a subset of the columns in the table. If not specified, column will be set to NULL.
Expand Down
4 changes: 0 additions & 4 deletions app/tasks/crossmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ def prepare(self, config: interface.Config):
self.solver = crossmatch.create_solver(test_solver_config, solvers)

def run(self):
table_meta = self.layer0_repo.fetch_metadata_by_name(self.table_name)
if table_meta.table_id is None:
raise RuntimeError(f"Table {self.table_name} has no table_id")

ctx = {"table_name": self.table_name}

offset = None
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/layer0_tables_repository_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def test_write_and_fetch_table(self):
self.bib_id,
)

table_resp = self.layer0_repo.create_table(table_meta)
_ = self.layer0_repo.create_table(table_meta)
test_data = pd.DataFrame({"ra": [12.1, 11.1], "dec": [1.0, 2.0]})
raw_data = model.Layer0RawData(table_resp.table_id, test_data)
raw_data = model.Layer0RawData(table_meta.table_name, test_data)

self.layer0_repo.insert_raw_data(raw_data)

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/layer1_repository_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_icrs(self):
]

bib_id = self.common_repo.create_bibliography("123456", 2000, ["test"], "test")
table_resp = self.layer0_repo.create_table(
_ = self.layer0_repo.create_table(
model.Layer0TableMeta(
"test_table",
[
Expand All @@ -40,7 +40,7 @@ def test_icrs(self):
enums.DataType.REGULAR,
)
)
self.layer0_repo.register_records(table_resp.table_id, ["111", "112"])
self.layer0_repo.register_records("test_table", ["111", "112"])
self.layer1_repo.save_data(objects)

result = self.pg_storage.storage.query("SELECT ra FROM icrs.data ORDER BY ra")
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/layer2_import_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def _get_table(self, table_name: str) -> int:
return table_resp.table_id

def test_import_two_catalogs(self):
table_id = self._get_table("test_import_two_catalogs")
_ = self._get_table("test_import_two_catalogs")
self.layer0_repo.register_records(
table_id,
"test_import_two_catalogs",
["123", "124"],
)

Expand Down Expand Up @@ -80,9 +80,9 @@ def test_import_two_catalogs(self):

def test_updated_objects(self):
self.test_import_two_catalogs()
table_id = self._get_table("test_updated_objects")
_ = self._get_table("test_updated_objects")
self.layer0_repo.register_records(
table_id,
"test_updated_objects",
["125", "126"],
)
self.layer0_repo.upsert_pgc({"125": 1234, "126": 1234})
Expand Down
18 changes: 9 additions & 9 deletions tests/integration/rawdata_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_create_table_happy_case(self):

self.manager.add_data(
presentation.AddDataRequest(
table_id=table_resp.id,
table_name="test_table",
data=[
{"ra": 5.5, "dec": 88},
{"ra": 5.0, "dec": -50},
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_create_table_with_nulls(self):

self.manager.add_data(
presentation.AddDataRequest(
table_id=table_resp.id,
table_name="test_table",
data=[{"ra": 5.5}, {"ra": 5.0}],
),
)
Expand Down Expand Up @@ -193,28 +193,28 @@ def test_add_data_to_unknown_column(self):
with self.assertRaises(psycopg.errors.UndefinedColumn):
self.manager.add_data(
presentation.AddDataRequest(
table_id=table_resp.id,
table_name="test_table",
data=[{"totally_nonexistent_column": 5.5}],
),
)

def test_fetch_raw_table(self):
data = DataFrame({"col0": [1, 2, 3, 4], "col1": ["ad", "ad", "a", "he"]})
bib_id = self.manager.common_repo.create_bibliography("2024arXiv240411942F", 1999, ["ade"], "title")
table_resp = self.manager.layer0_repo.create_table(
_ = self.manager.layer0_repo.create_table(
model.Layer0TableMeta(
"test_table",
[model.ColumnDescription("col0", TYPE_INTEGER), model.ColumnDescription("col1", TYPE_TEXT)],
bib_id,
enums.DataType.REGULAR,
),
)
self.manager.layer0_repo.insert_raw_data(model.Layer0RawData(table_resp.table_id, data))
expected = self.manager.layer0_repo.fetch_raw_data(table_resp.table_id)
self.manager.layer0_repo.insert_raw_data(model.Layer0RawData("test_table", data))
expected = self.manager.layer0_repo.fetch_raw_data("test_table")

self.assertTrue(expected.data.equals(data))

expected = self.manager.layer0_repo.fetch_raw_data(table_resp.table_id, columns=["col1"])
expected = self.manager.layer0_repo.fetch_raw_data("test_table", columns=["col1"])
self.assertTrue(expected.data.equals(data.drop(["col0"], axis=1)))

def test_fetch_metadata(self):
Expand All @@ -226,9 +226,9 @@ def test_fetch_metadata(self):
bib_id,
enums.DataType.REGULAR,
)
table_resp = self.manager.layer0_repo.create_table(expected)
_ = self.manager.layer0_repo.create_table(expected)

actual = self.manager.layer0_repo.fetch_metadata(table_resp.table_id)
actual = self.manager.layer0_repo.fetch_metadata("test_table")

self.assertEqual(expected.table_name, actual.table_name)
self.assertEqual(expected.column_descriptions, actual.column_descriptions)
Expand Down
Loading
Loading