Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app/commands/runserver/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ def start(config_path: str):
redis_storage.connect()

authenticator = auth.PostgresAuthenticator(pg_storage)
common_repo = repositories.CommonRepository(pg_storage, logger)
depot = commands.Depot(
common_repo=repositories.CommonRepository(pg_storage, logger),
layer0_repo=repositories.Layer0Repository(pg_storage, logger),
common_repo=common_repo,
layer0_repo=repositories.Layer0Repository(common_repo, pg_storage, logger),
layer1_repo=repositories.Layer1Repository(pg_storage, logger),
layer2_repo=repositories.Layer2Repository(pg_storage, logger),
tmp_data_repo=repositories.TmpDataRepositoryImpl(pg_storage),
Expand Down
45 changes: 43 additions & 2 deletions app/data/repositories/layer0_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@

from app import entities
from app.data import template
from app.data.mappers import domain_to_data
from app.data.mappers.data_to_domain import layer_0_mapper
from app.data.repositories import CommonRepository
from app.domain import repositories
from app.domain.model import Layer0Model
from app.domain.model.params.layer_0_query_param import Layer0QueryParam
from app.entities import ColumnDescription, CoordinatePart, Layer0Creation
from app.lib.storage import enums, postgres
from app.lib.web.errors import DatabaseError
Expand All @@ -14,8 +20,11 @@
INTERNAL_ID_COLUMN_NAME = "hyperleda_internal_id"


class Layer0Repository(postgres.TransactionalPGRepository):
def __init__(self, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLogger) -> None:
class Layer0Repository(postgres.TransactionalPGRepository, repositories.Layer0Repository):
def __init__(
self, common_repository: CommonRepository, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLogger
) -> None:
self._common_repository: CommonRepository = common_repository
self._logger = logger
super().__init__(storage)

Expand Down Expand Up @@ -305,3 +314,35 @@ def get_objects(self, batch_size: int, offset: int) -> list[entities.ObjectProce
)
for row in rows
]

def create_update_instances(self, instances: list[Layer0Model]):
pass

def create_instances(self, instances: list[Layer0Model]):
with self.with_tx():
for instance in instances:
bibliography = domain_to_data.layer_0_bibliography_mapper(instance)
bibliography_id = self._common_repository.create_bibliography(
bibliography.code,
bibliography.year,
bibliography.author,
bibliography.title,
)
creation = domain_to_data.layer_0_creation_mapper(instance, bibliography_id)
table_resp = self.create_table(creation)
raw = domain_to_data.layer_0_raw_mapper(instance, table_resp.table_id)
self.insert_raw_data(raw)

def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]:
with self.with_tx():
# TODO use some selection params to filter unneeded tables
ids = self.get_all_table_ids()

to_domain = []
for table_id in ids:
meta = self.fetch_metadata(table_id)
raw = self.fetch_raw_data(table_id)
bib = self._common_repository.get_source_by_id(meta.bibliography_id)
to_domain.append(layer_0_mapper(meta, raw, bib))

return to_domain
45 changes: 0 additions & 45 deletions app/data/repositories/layer_0_repository_impl.py

This file was deleted.

4 changes: 4 additions & 0 deletions app/domain/actions/logic_units/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from app.domain.actions.logic_units.transaction_0_1 import Transaction01Depot, transaction_0_1
from app.domain.actions.logic_units.transformation_0_1 import TransformationO1Depot, transformation_0_1

__all__ = ["transformation_0_1", "TransformationO1Depot", "transaction_0_1", "Transaction01Depot"]
64 changes: 64 additions & 0 deletions app/domain/actions/logic_units/transaction_0_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from dataclasses import dataclass, replace
from typing import Callable, Optional

from app.domain.actions.logic_units.transformation_0_1 import TransformationO1Depot, transformation_0_1
from app.domain.model import Layer0Model, Layer1Model
from app.domain.model.layer0 import Transformation01Fail
from app.domain.model.params.transaction_0_1_stages import (
AwaitingQueue,
TransactionO1Sage,
TransformingData,
)


@dataclass
class Transaction01Depot:
transformation_depot: TransformationO1Depot


def _perform_transaction(
transformation_depot: TransformationO1Depot,
data: Layer0Model,
on_progress: Optional[Callable[[TransactionO1Sage], None]],
) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]:
def fcn(*args, **kwargs):
return transformation_0_1(transformation_depot, *args, **kwargs)

if on_progress is not None:
models, fails = fcn(data, lambda stage: on_progress(TransformingData(stage)))
else:
models, fails = fcn(data)

updated_source, updated_models = _update_update_time(data, models)

return updated_source, updated_models, fails


def _update_update_time(source: Layer0Model, models: list[Layer1Model]) -> tuple[Layer0Model, list[Layer1Model]]:
"""
Update info about data frame last processing
:param source: Source of data, being transformed
:param models: Successfully processed models
:return: updated source and models
"""
updated_source = replace(source, processed=True)
return updated_source, models


def transaction_0_1(
depot: Transaction01Depot, data: Layer0Model, on_progress: Optional[Callable[[TransactionO1Sage], None]] = None
) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]:
"""
Performs data transaction from layer 0 to layer 1
:param depot: Dependencies
:param data: Layer 0 data to be transformed
:param on_progress: Function, called on pogress
:return:
updated_source: Updated Layer 0 data (according to transformation result)
updated_models: Successfully transformed Layer 1 data
fails: Fails during transformation
"""
if on_progress is not None:
on_progress(AwaitingQueue())

return _perform_transaction(depot.transformation_depot, data, on_progress)
137 changes: 137 additions & 0 deletions app/domain/actions/logic_units/transformation_0_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from dataclasses import dataclass
from typing import Callable, Optional

from app import entities
from app.domain.cross_id_simultaneous_data_provider import CrossIdSimultaneousDataProvider
from app.domain.model import Layer0Model, Layer1Model
from app.domain.model.layer0 import Transformation01Fail
from app.domain.model.layer1.layer_1_value import Layer1Value
from app.domain.model.params.cross_identification_result import CrossIdentifyResult
from app.domain.model.params.cross_identification_user_param import CrossIdentificationUserParam
from app.domain.model.params.transformation_0_1_stages import (
CrossIdentification,
ParseCoordinates,
ParseValues,
Transformation01Stage,
)
from app.domain.repositories.layer_2_repository import Layer2Repository


@dataclass
class TransformationO1Depot:
"""
Args:
layer2_repo: Layer2Repository
cross_identification_function: Cross identification logic
simultaneous_data_provider: Function, used to obtain simultaneous data provider for cross identification
"""

layer2_repo: Layer2Repository
cross_identification_function: Callable[
[Layer2Repository, entities.ObjectInfo, CrossIdSimultaneousDataProvider, CrossIdentificationUserParam],
CrossIdentifyResult,
]
simultaneous_data_provider: Callable[[list[entities.ObjectInfo]], CrossIdSimultaneousDataProvider]


def transformation_0_1(
depot: TransformationO1Depot,
data: Layer0Model,
user_param: CrossIdentificationUserParam | None = None,
on_progress: Optional[Callable[[Transformation01Stage], None]] = None,
) -> tuple[list[Layer1Model], list[Transformation01Fail]]:
"""
Performs data transformation from layer 0 [Layer0Model] to layer 1 [Layer1Model]
:param depot: Dependencies
:param data: Layer 0 data to be transformed
:param user_param: User defined parameters for cross identification
:param on_progress: Optional callable to call on progress (from 0.0 to 1.0)
:return:
success: list[Layer1Model] - transformed models
fail: list[Transformation01Fail] - Fails during transformation
"""
if user_param is None:
user_param = CrossIdentificationUserParam(None, None)
n_rows = data.data.shape[0]

# parse coordinates
if on_progress is not None:
on_progress(ParseCoordinates())
cd = data.meta.coordinate_descr
if cd is not None:
coordinates = cd.parse_coordinates(data.data)
else:
coordinates = n_rows * [None]

# parse values
if on_progress is not None:
on_progress(ParseValues())
values = [vd.parse_values(data.data) for vd in data.meta.value_descriptions]

# extract names
if data.meta.names_descr is not None:
names = data.meta.names_descr.parse_name(data.data)
else:
names = n_rows * [None]

# cross identification
identification_params = []
for coordinate, name in zip(coordinates, names):
if isinstance(coordinate, BaseException):
identification_params.append(coordinate)
elif isinstance(name, BaseException):
identification_params.append(name)
elif name is None:
identification_params.append(entities.ObjectInfo(None, None, coordinate))
else:
primary_name, all_names = name
identification_params.append(entities.ObjectInfo(all_names, primary_name, coordinate))
# Simultaneous data provider needs only valid CrossIdentificationParams
simultaneous_data_provider = depot.simultaneous_data_provider(
[it for it in identification_params if not isinstance(it, BaseException)]
)
identification_results = []
for i, param in enumerate(identification_params):
if isinstance(param, BaseException):
identification_results.append(param)
else:
identification_results.append(
depot.cross_identification_function(
depot.layer2_repo,
param,
simultaneous_data_provider,
user_param,
)
)
if on_progress is not None:
on_progress(CrossIdentification(n_rows, i + 1))
simultaneous_data_provider.clear()

# compile objects
models = []
fails = []
for i in range(n_rows):
coordinate, identification_result = (coordinates[i], identification_results[i])
if isinstance(coordinate, BaseException):
fails.append(Transformation01Fail(coordinate, i))
continue

if identification_result.fail is not None:
fails.append(Transformation01Fail(identification_result.fail, i))
continue

pgc = identification_result.result.pgc if identification_result.result is not None else None
model = Layer1Model(
pgc=pgc,
source_id=data.id,
processed=False,
coordinates=coordinate,
name=names[i],
measurements=[
Layer1Value(value[i], data.meta.value_descriptions[i_descr].ucd) for i_descr, value in enumerate(values)
],
dataset=data.meta.dataset,
)
models.append(model)

return models, fails
5 changes: 5 additions & 0 deletions app/domain/repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from app.domain.repositories.layer_0_repository import Layer0Repository
from app.domain.repositories.layer_1_repository import Layer1Repository
from app.domain.repositories.layer_2_repository import Layer2Repository

__all__ = ["Layer0Repository", "Layer1Repository", "Layer2Repository"]
6 changes: 3 additions & 3 deletions app/domain/repositories/layer_0_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ class Layer0Repository(ABC):
"""

@abstractmethod
async def create_update_instances(self, instances: list[Layer0Model]):
def create_update_instances(self, instances: list[Layer0Model]):
"""
Save or update given instances in local DB
:param instances: Instances to update
"""

@abstractmethod
async def create_instances(self, instances: list[Layer0Model]):
def create_instances(self, instances: list[Layer0Model]):
"""
Used to create instances, fails on conflict
:param instances:
"""

@abstractmethod
async def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]:
def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]:
"""
Fetches Layer0Model from DB
:param param: Used to specify needed Layer0 instances
Expand Down
4 changes: 2 additions & 2 deletions app/domain/repositories/layer_1_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ class Layer1Repository(ABC):
"""

@abstractmethod
async def query_data(self, param: Layer1QueryParam) -> list[Layer1Model]:
def query_data(self, param: Layer1QueryParam) -> list[Layer1Model]:
"""
Get all objects around given point
:param param: Parameter, used to make specific query
:return: all Layer1Model, that meet criteria in param
"""

@abstractmethod
async def save_update_instances(self, instances: list[Layer1Model]):
def save_update_instances(self, instances: list[Layer1Model]):
"""
Create or update provided objects
:param instances: Objects to be processed
Expand Down
2 changes: 1 addition & 1 deletion app/domain/tasks/download_vizier_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def download_vizier_table(
):
common = repositories.CommonRepository(storage, logger)

layer0 = repositories.Layer0Repository(storage, logger)
layer0 = repositories.Layer0Repository(common, storage, logger)
table_name = construct_table_name(params.table_id)

table_id, ok = layer0.get_table_id(table_name)
Expand Down
5 changes: 0 additions & 5 deletions app/domain/usecases/__init__.py

This file was deleted.

Loading