diff --git a/app/commands/runserver/start.py b/app/commands/runserver/start.py index 163fd9be..4ee3bc06 100644 --- a/app/commands/runserver/start.py +++ b/app/commands/runserver/start.py @@ -27,9 +27,10 @@ def start(config_path: str): redis_storage.connect() authenticator = auth.PostgresAuthenticator(pg_storage) + common_repo = repositories.CommonRepository(pg_storage, logger) depot = commands.Depot( - common_repo=repositories.CommonRepository(pg_storage, logger), - layer0_repo=repositories.Layer0Repository(pg_storage, logger), + common_repo=common_repo, + layer0_repo=repositories.Layer0Repository(common_repo, pg_storage, logger), layer1_repo=repositories.Layer1Repository(pg_storage, logger), layer2_repo=repositories.Layer2Repository(pg_storage, logger), tmp_data_repo=repositories.TmpDataRepositoryImpl(pg_storage), diff --git a/app/data/repositories/layer0_repository.py b/app/data/repositories/layer0_repository.py index 6c968b67..32a55776 100644 --- a/app/data/repositories/layer0_repository.py +++ b/app/data/repositories/layer0_repository.py @@ -6,6 +6,12 @@ from app import entities from app.data import template +from app.data.mappers import domain_to_data +from app.data.mappers.data_to_domain import layer_0_mapper +from app.data.repositories import CommonRepository +from app.domain import repositories +from app.domain.model import Layer0Model +from app.domain.model.params.layer_0_query_param import Layer0QueryParam from app.entities import ColumnDescription, CoordinatePart, Layer0Creation from app.lib.storage import enums, postgres from app.lib.web.errors import DatabaseError @@ -14,8 +20,11 @@ INTERNAL_ID_COLUMN_NAME = "hyperleda_internal_id" -class Layer0Repository(postgres.TransactionalPGRepository): - def __init__(self, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLogger) -> None: +class Layer0Repository(postgres.TransactionalPGRepository, repositories.Layer0Repository): + def __init__( + self, common_repository: CommonRepository, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLogger + ) -> None: + self._common_repository: CommonRepository = common_repository self._logger = logger super().__init__(storage) @@ -305,3 +314,35 @@ def get_objects(self, batch_size: int, offset: int) -> list[entities.ObjectProce ) for row in rows ] + + def create_update_instances(self, instances: list[Layer0Model]): + pass + + def create_instances(self, instances: list[Layer0Model]): + with self.with_tx(): + for instance in instances: + bibliography = domain_to_data.layer_0_bibliography_mapper(instance) + bibliography_id = self._common_repository.create_bibliography( + bibliography.code, + bibliography.year, + bibliography.author, + bibliography.title, + ) + creation = domain_to_data.layer_0_creation_mapper(instance, bibliography_id) + table_resp = self.create_table(creation) + raw = domain_to_data.layer_0_raw_mapper(instance, table_resp.table_id) + self.insert_raw_data(raw) + + def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: + with self.with_tx(): + # TODO use some selection params to filter unneeded tables + ids = self.get_all_table_ids() + + to_domain = [] + for table_id in ids: + meta = self.fetch_metadata(table_id) + raw = self.fetch_raw_data(table_id) + bib = self._common_repository.get_source_by_id(meta.bibliography_id) + to_domain.append(layer_0_mapper(meta, raw, bib)) + + return to_domain diff --git a/app/data/repositories/layer_0_repository_impl.py b/app/data/repositories/layer_0_repository_impl.py deleted file mode 100644 index 2cdf757f..00000000 --- a/app/data/repositories/layer_0_repository_impl.py +++ /dev/null @@ -1,45 +0,0 @@ -from app.data.mappers import domain_to_data -from app.data.mappers.data_to_domain import layer_0_mapper -from app.data.repositories import CommonRepository -from app.data.repositories import Layer0Repository as DataRepository -from app.domain.model import Layer0Model -from app.domain.model.params.layer_0_query_param import Layer0QueryParam -from app.domain.repositories.layer_0_repository import Layer0Repository - - -class Layer0RepositoryImpl(Layer0Repository): - def __init__(self, data_repository: DataRepository, common_repository: CommonRepository): - self._data_repository: DataRepository = data_repository - self._common_repository: CommonRepository = common_repository - - async def create_update_instances(self, instances: list[Layer0Model]): - pass - - async def create_instances(self, instances: list[Layer0Model]): - with self._data_repository.with_tx(): - for instance in instances: - bibliography = domain_to_data.layer_0_bibliography_mapper(instance) - bibliography_id = self._common_repository.create_bibliography( - bibliography.code, - bibliography.year, - bibliography.author, - bibliography.title, - ) - creation = domain_to_data.layer_0_creation_mapper(instance, bibliography_id) - table_resp = self._data_repository.create_table(creation) - raw = domain_to_data.layer_0_raw_mapper(instance, table_resp.table_id) - self._data_repository.insert_raw_data(raw) - - async def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: - with self._data_repository.with_tx(): - # TODO use some selection params to filter unneeded tables - ids = self._data_repository.get_all_table_ids() - - to_domain = [] - for table_id in ids: - meta = self._data_repository.fetch_metadata(table_id) - raw = self._data_repository.fetch_raw_data(table_id) - bib = self._common_repository.get_source_by_id(meta.bibliography_id) - to_domain.append(layer_0_mapper(meta, raw, bib)) - - return to_domain diff --git a/app/domain/actions/logic_units/__init__.py b/app/domain/actions/logic_units/__init__.py new file mode 100644 index 00000000..75c3db17 --- /dev/null +++ b/app/domain/actions/logic_units/__init__.py @@ -0,0 +1,4 @@ +from app.domain.actions.logic_units.transaction_0_1 import Transaction01Depot, transaction_0_1 +from app.domain.actions.logic_units.transformation_0_1 import TransformationO1Depot, transformation_0_1 + +__all__ = ["transformation_0_1", "TransformationO1Depot", "transaction_0_1", "Transaction01Depot"] diff --git a/app/domain/actions/logic_units/transaction_0_1.py b/app/domain/actions/logic_units/transaction_0_1.py new file mode 100644 index 00000000..d528bb9b --- /dev/null +++ b/app/domain/actions/logic_units/transaction_0_1.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass, replace +from typing import Callable, Optional + +from app.domain.actions.logic_units.transformation_0_1 import TransformationO1Depot, transformation_0_1 +from app.domain.model import Layer0Model, Layer1Model +from app.domain.model.layer0 import Transformation01Fail +from app.domain.model.params.transaction_0_1_stages import ( + AwaitingQueue, + TransactionO1Sage, + TransformingData, +) + + +@dataclass +class Transaction01Depot: + transformation_depot: TransformationO1Depot + + +def _perform_transaction( + transformation_depot: TransformationO1Depot, + data: Layer0Model, + on_progress: Optional[Callable[[TransactionO1Sage], None]], +) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]: + def fcn(*args, **kwargs): + return transformation_0_1(transformation_depot, *args, **kwargs) + + if on_progress is not None: + models, fails = fcn(data, lambda stage: on_progress(TransformingData(stage))) + else: + models, fails = fcn(data) + + updated_source, updated_models = _update_update_time(data, models) + + return updated_source, updated_models, fails + + +def _update_update_time(source: Layer0Model, models: list[Layer1Model]) -> tuple[Layer0Model, list[Layer1Model]]: + """ + Update info about data frame last processing + :param source: Source of data, being transformed + :param models: Successfully processed models + :return: updated source and models + """ + updated_source = replace(source, processed=True) + return updated_source, models + + +def transaction_0_1( + depot: Transaction01Depot, data: Layer0Model, on_progress: Optional[Callable[[TransactionO1Sage], None]] = None +) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]: + """ + Performs data transaction from layer 0 to layer 1 + :param depot: Dependencies + :param data: Layer 0 data to be transformed + :param on_progress: Function, called on pogress + :return: + updated_source: Updated Layer 0 data (according to transformation result) + updated_models: Successfully transformed Layer 1 data + fails: Fails during transformation + """ + if on_progress is not None: + on_progress(AwaitingQueue()) + + return _perform_transaction(depot.transformation_depot, data, on_progress) diff --git a/app/domain/actions/logic_units/transformation_0_1.py b/app/domain/actions/logic_units/transformation_0_1.py new file mode 100644 index 00000000..78325e4d --- /dev/null +++ b/app/domain/actions/logic_units/transformation_0_1.py @@ -0,0 +1,137 @@ +from dataclasses import dataclass +from typing import Callable, Optional + +from app import entities +from app.domain.cross_id_simultaneous_data_provider import CrossIdSimultaneousDataProvider +from app.domain.model import Layer0Model, Layer1Model +from app.domain.model.layer0 import Transformation01Fail +from app.domain.model.layer1.layer_1_value import Layer1Value +from app.domain.model.params.cross_identification_result import CrossIdentifyResult +from app.domain.model.params.cross_identification_user_param import CrossIdentificationUserParam +from app.domain.model.params.transformation_0_1_stages import ( + CrossIdentification, + ParseCoordinates, + ParseValues, + Transformation01Stage, +) +from app.domain.repositories.layer_2_repository import Layer2Repository + + +@dataclass +class TransformationO1Depot: + """ + Args: + layer2_repo: Layer2Repository + cross_identification_function: Cross identification logic + simultaneous_data_provider: Function, used to obtain simultaneous data provider for cross identification + """ + + layer2_repo: Layer2Repository + cross_identification_function: Callable[ + [Layer2Repository, entities.ObjectInfo, CrossIdSimultaneousDataProvider, CrossIdentificationUserParam], + CrossIdentifyResult, + ] + simultaneous_data_provider: Callable[[list[entities.ObjectInfo]], CrossIdSimultaneousDataProvider] + + +def transformation_0_1( + depot: TransformationO1Depot, + data: Layer0Model, + user_param: CrossIdentificationUserParam | None = None, + on_progress: Optional[Callable[[Transformation01Stage], None]] = None, +) -> tuple[list[Layer1Model], list[Transformation01Fail]]: + """ + Performs data transformation from layer 0 [Layer0Model] to layer 1 [Layer1Model] + :param depot: Dependencies + :param data: Layer 0 data to be transformed + :param user_param: User defined parameters for cross identification + :param on_progress: Optional callable to call on progress (from 0.0 to 1.0) + :return: + success: list[Layer1Model] - transformed models + fail: list[Transformation01Fail] - Fails during transformation + """ + if user_param is None: + user_param = CrossIdentificationUserParam(None, None) + n_rows = data.data.shape[0] + + # parse coordinates + if on_progress is not None: + on_progress(ParseCoordinates()) + cd = data.meta.coordinate_descr + if cd is not None: + coordinates = cd.parse_coordinates(data.data) + else: + coordinates = n_rows * [None] + + # parse values + if on_progress is not None: + on_progress(ParseValues()) + values = [vd.parse_values(data.data) for vd in data.meta.value_descriptions] + + # extract names + if data.meta.names_descr is not None: + names = data.meta.names_descr.parse_name(data.data) + else: + names = n_rows * [None] + + # cross identification + identification_params = [] + for coordinate, name in zip(coordinates, names): + if isinstance(coordinate, BaseException): + identification_params.append(coordinate) + elif isinstance(name, BaseException): + identification_params.append(name) + elif name is None: + identification_params.append(entities.ObjectInfo(None, None, coordinate)) + else: + primary_name, all_names = name + identification_params.append(entities.ObjectInfo(all_names, primary_name, coordinate)) + # Simultaneous data provider needs only valid CrossIdentificationParams + simultaneous_data_provider = depot.simultaneous_data_provider( + [it for it in identification_params if not isinstance(it, BaseException)] + ) + identification_results = [] + for i, param in enumerate(identification_params): + if isinstance(param, BaseException): + identification_results.append(param) + else: + identification_results.append( + depot.cross_identification_function( + depot.layer2_repo, + param, + simultaneous_data_provider, + user_param, + ) + ) + if on_progress is not None: + on_progress(CrossIdentification(n_rows, i + 1)) + simultaneous_data_provider.clear() + + # compile objects + models = [] + fails = [] + for i in range(n_rows): + coordinate, identification_result = (coordinates[i], identification_results[i]) + if isinstance(coordinate, BaseException): + fails.append(Transformation01Fail(coordinate, i)) + continue + + if identification_result.fail is not None: + fails.append(Transformation01Fail(identification_result.fail, i)) + continue + + pgc = identification_result.result.pgc if identification_result.result is not None else None + model = Layer1Model( + pgc=pgc, + source_id=data.id, + processed=False, + coordinates=coordinate, + name=names[i], + measurements=[ + Layer1Value(value[i], data.meta.value_descriptions[i_descr].ucd) for i_descr, value in enumerate(values) + ], + dataset=data.meta.dataset, + ) + models.append(model) + + return models, fails diff --git a/app/domain/repositories/__init__.py b/app/domain/repositories/__init__.py index e69de29b..5fdf7327 100644 --- a/app/domain/repositories/__init__.py +++ b/app/domain/repositories/__init__.py @@ -0,0 +1,5 @@ +from app.domain.repositories.layer_0_repository import Layer0Repository +from app.domain.repositories.layer_1_repository import Layer1Repository +from app.domain.repositories.layer_2_repository import Layer2Repository + +__all__ = ["Layer0Repository", "Layer1Repository", "Layer2Repository"] diff --git a/app/domain/repositories/layer_0_repository.py b/app/domain/repositories/layer_0_repository.py index aaa64f95..d936a93e 100644 --- a/app/domain/repositories/layer_0_repository.py +++ b/app/domain/repositories/layer_0_repository.py @@ -10,21 +10,21 @@ class Layer0Repository(ABC): """ @abstractmethod - async def create_update_instances(self, instances: list[Layer0Model]): + def create_update_instances(self, instances: list[Layer0Model]): """ Save or update given instances in local DB :param instances: Instances to update """ @abstractmethod - async def create_instances(self, instances: list[Layer0Model]): + def create_instances(self, instances: list[Layer0Model]): """ Used to create instances, fails on conflict :param instances: """ @abstractmethod - async def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: + def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: """ Fetches Layer0Model from DB :param param: Used to specify needed Layer0 instances diff --git a/app/domain/repositories/layer_1_repository.py b/app/domain/repositories/layer_1_repository.py index 9e2c3954..a9b42064 100644 --- a/app/domain/repositories/layer_1_repository.py +++ b/app/domain/repositories/layer_1_repository.py @@ -10,7 +10,7 @@ class Layer1Repository(ABC): """ @abstractmethod - async def query_data(self, param: Layer1QueryParam) -> list[Layer1Model]: + def query_data(self, param: Layer1QueryParam) -> list[Layer1Model]: """ Get all objects around given point :param param: Parameter, used to make specific query @@ -18,7 +18,7 @@ async def query_data(self, param: Layer1QueryParam) -> list[Layer1Model]: """ @abstractmethod - async def save_update_instances(self, instances: list[Layer1Model]): + def save_update_instances(self, instances: list[Layer1Model]): """ Create or update provided objects :param instances: Objects to be processed diff --git a/app/domain/tasks/download_vizier_table.py b/app/domain/tasks/download_vizier_table.py index 6bc76bb5..1b187b70 100644 --- a/app/domain/tasks/download_vizier_table.py +++ b/app/domain/tasks/download_vizier_table.py @@ -60,7 +60,7 @@ def download_vizier_table( ): common = repositories.CommonRepository(storage, logger) - layer0 = repositories.Layer0Repository(storage, logger) + layer0 = repositories.Layer0Repository(common, storage, logger) table_name = construct_table_name(params.table_id) table_id, ok = layer0.get_table_id(table_name) diff --git a/app/domain/usecases/__init__.py b/app/domain/usecases/__init__.py deleted file mode 100644 index ef880c1f..00000000 --- a/app/domain/usecases/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from app.domain.usecases.store_l0_use_case import StoreL0UseCase -from app.domain.usecases.transaction_0_1_use_case import Transaction01UseCase -from app.domain.usecases.transformation_0_1_use_case import TransformationO1UseCase - -__all__ = ["TransformationO1UseCase", "StoreL0UseCase", "Transaction01UseCase"] diff --git a/app/domain/usecases/store_l0_use_case.py b/app/domain/usecases/store_l0_use_case.py deleted file mode 100644 index 8b93fde8..00000000 --- a/app/domain/usecases/store_l0_use_case.py +++ /dev/null @@ -1,16 +0,0 @@ -from app.domain.model import Layer0Model -from app.domain.repositories.layer_0_repository import Layer0Repository -from app.domain.util import GlobalDBLock - - -class StoreL0UseCase: - """ - Stores data of layer 0. Throws error if repository fails to perform operation - """ - - def __init__(self, repository: Layer0Repository): - self._repository: Layer0Repository = repository - - async def invoke(self, instances: list[Layer0Model]): - async with GlobalDBLock.get(): - await self._repository.create_instances(instances) diff --git a/app/domain/usecases/transaction_0_1_use_case.py b/app/domain/usecases/transaction_0_1_use_case.py deleted file mode 100644 index d36da2df..00000000 --- a/app/domain/usecases/transaction_0_1_use_case.py +++ /dev/null @@ -1,67 +0,0 @@ -from dataclasses import replace -from typing import Callable, Optional - -from app.domain.model import Layer0Model, Layer1Model -from app.domain.model.layer0 import Transformation01Fail -from app.domain.model.params.transaction_0_1_stages import ( - AwaitingQueue, - TransactionO1Sage, - TransformingData, -) -from app.domain.usecases.transformation_0_1_use_case import TransformationO1UseCase -from app.domain.util import GlobalDBLock - - -class Transaction01UseCase: - """ - Performs data transaction from layer 0 to layer 1 - """ - - def __init__( - self, - transformation_use_case: TransformationO1UseCase, - ): - self._transformation_use_case: TransformationO1UseCase = transformation_use_case - - async def invoke( - self, data: Layer0Model, on_progress: Optional[Callable[[TransactionO1Sage], None]] = None - ) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]: - """ - :param data: Layer 0 data to be transformed - :param on_progress: Function, called on pogress - :return: - updated_source: Updated Layer 0 data (according to transformation result) - updated_models: Successfully transformed Layer 1 data - fails: Fails during transformation - """ - if on_progress is not None: - on_progress(AwaitingQueue()) - - async with GlobalDBLock.get(): - return await self._perform_transaction(data, on_progress) - - async def _perform_transaction( - self, data: Layer0Model, on_progress: Optional[Callable[[TransactionO1Sage], None]] - ) -> tuple[Layer0Model, list[Layer1Model], list[Transformation01Fail]]: - if on_progress is not None: - models, fails = await self._transformation_use_case.invoke( - data, lambda stage: on_progress(TransformingData(stage)) - ) - else: - models, fails = await self._transformation_use_case.invoke(data) - - updated_source, updated_models = self._update_update_time(data, models) - - return updated_source, updated_models, fails - - def _update_update_time( - self, source: Layer0Model, models: list[Layer1Model] - ) -> tuple[Layer0Model, list[Layer1Model]]: - """ - Update info about data frame last processing - :param source: Source of data, being transformed - :param models: Successfully processed models - :return: updated source and models - """ - updated_source = replace(source, processed=True) - return updated_source, models diff --git a/app/domain/usecases/transformation_0_1_use_case.py b/app/domain/usecases/transformation_0_1_use_case.py deleted file mode 100644 index a87e841b..00000000 --- a/app/domain/usecases/transformation_0_1_use_case.py +++ /dev/null @@ -1,150 +0,0 @@ -from typing import Callable, Optional - -from app import entities -from app.domain.cross_id_simultaneous_data_provider import CrossIdSimultaneousDataProvider -from app.domain.model import Layer0Model, Layer1Model -from app.domain.model.layer0 import Transformation01Fail -from app.domain.model.layer1.layer_1_value import Layer1Value -from app.domain.model.params.cross_identification_result import CrossIdentifyResult -from app.domain.model.params.cross_identification_user_param import CrossIdentificationUserParam -from app.domain.model.params.transformation_0_1_stages import ( - CrossIdentification, - ParseCoordinates, - ParseValues, - Transformation01Stage, -) -from app.domain.repositories.layer_2_repository import Layer2Repository - - -class TransformationO1UseCase: - """ - Performs data transformation from layer 0 [Layer0Model] to layer 1 [Layer1Model] - """ - - def __init__( - self, - layer2_repo: Layer2Repository, - cross_identification_function: Callable[ - [Layer2Repository, entities.ObjectInfo, CrossIdSimultaneousDataProvider, CrossIdentificationUserParam], - CrossIdentifyResult, - ], - simultaneous_data_provider: Callable[[list[entities.ObjectInfo]], CrossIdSimultaneousDataProvider], - ): - """ - :param cross_identification_function: Cross identification logic - :param simultaneous_data_provider: Function, used to obtain simultaneous data provider for cross identification - """ - self._layer2_repo = layer2_repo - self._cross_identification_function = cross_identification_function - self._simultaneous_data_provider = simultaneous_data_provider - - def with_mocked_cross_identification( - self, - layer2_repo: Layer2Repository, - cross_identification_function: Callable[ - [Layer2Repository, entities.ObjectInfo, CrossIdSimultaneousDataProvider, CrossIdentificationUserParam], - CrossIdentifyResult, - ], - ): - return TransformationO1UseCase(layer2_repo, cross_identification_function, self._simultaneous_data_provider) - - async def invoke( - self, - data: Layer0Model, - user_param: CrossIdentificationUserParam | None = None, - on_progress: Optional[Callable[[Transformation01Stage], None]] = None, - ) -> tuple[list[Layer1Model], list[Transformation01Fail]]: - """ - :param data: Layer 0 data to be transformed - :param user_param: User defined parameters for cross identification - :param on_progress: Optional callable to call on progress (from 0.0 to 1.0) - :return: - success: list[Layer1Model] - transformed models - fail: list[Transformation01Fail] - Fails during transformation - """ - if user_param is None: - user_param = CrossIdentificationUserParam(None, None) - n_rows = data.data.shape[0] - - # parse coordinates - if on_progress is not None: - on_progress(ParseCoordinates()) - cd = data.meta.coordinate_descr - if cd is not None: - coordinates = cd.parse_coordinates(data.data) - else: - coordinates = n_rows * [None] - - # parse values - if on_progress is not None: - on_progress(ParseValues()) - values = [vd.parse_values(data.data) for vd in data.meta.value_descriptions] - - # extract names - if data.meta.names_descr is not None: - names = data.meta.names_descr.parse_name(data.data) - else: - names = n_rows * [None] - - # cross identification - identification_params = [] - for coordinate, name in zip(coordinates, names): - if isinstance(coordinate, BaseException): - identification_params.append(coordinate) - elif isinstance(name, BaseException): - identification_params.append(name) - elif name is None: - identification_params.append(entities.ObjectInfo(None, None, coordinate)) - else: - primary_name, all_names = name - identification_params.append(entities.ObjectInfo(all_names, primary_name, coordinate)) - # Simultaneous data provider needs only valid entities.ObjectInfos - simultaneous_data_provider = self._simultaneous_data_provider( - [it for it in identification_params if not isinstance(it, BaseException)] - ) - identification_results = [] - for i, param in enumerate(identification_params): - if isinstance(param, BaseException): - identification_results.append(param) - else: - identification_results.append( - self._cross_identification_function( - self._layer2_repo, - param, - simultaneous_data_provider, - user_param, - ) - ) - if on_progress is not None: - on_progress(CrossIdentification(n_rows, i + 1)) - simultaneous_data_provider.clear() - - # compile objects - models = [] - fails = [] - for i in range(n_rows): - coordinate, identification_result = (coordinates[i], identification_results[i]) - if isinstance(coordinate, BaseException): - fails.append(Transformation01Fail(coordinate, i)) - continue - - if identification_result.fail is not None: - fails.append(Transformation01Fail(identification_result.fail, i)) - continue - - pgc = identification_result.result.pgc if identification_result.result is not None else None - model = Layer1Model( - pgc=pgc, - source_id=data.id, - processed=False, - coordinates=coordinate, - name=names[i], - measurements=[ - Layer1Value(value[i], data.meta.value_descriptions[i_descr].ucd) - for i_descr, value in enumerate(values) - ], - dataset=data.meta.dataset, - ) - models.append(model) - - return models, fails diff --git a/app/domain/user_interaction/interaction.py b/app/domain/user_interaction/interaction.py index a93f6dc8..7cfcb4bb 100644 --- a/app/domain/user_interaction/interaction.py +++ b/app/domain/user_interaction/interaction.py @@ -21,13 +21,13 @@ class AbstractInteraction(ABC): """ @abstractmethod - async def eval(self, arg: AbstractArgument) -> InteractionResult: + def eval(self, arg: AbstractArgument) -> InteractionResult: pass class ResolveCrossIdentificationCollisionInteraction(AbstractInteraction): @abstractmethod - async def eval( + def eval( self, arg: ResolveCrossIdentificationCollisionInteractionArg ) -> ResolveCrossIdentificationCollisionInteractionRes: pass @@ -35,11 +35,11 @@ async def eval( class ResolveCoordinateParseFail(AbstractInteraction): @abstractmethod - async def eval(self, arg: ResolveCoordinateParseFailArg) -> ResolveCoordinateParseFailRes: + def eval(self, arg: ResolveCoordinateParseFailArg) -> ResolveCoordinateParseFailRes: pass class Confirm01Transaction(AbstractInteraction): @abstractmethod - async def eval(self, arg: Confirm01TransactionArg) -> Confirm01TransactionRes: + def eval(self, arg: Confirm01TransactionArg) -> Confirm01TransactionRes: pass diff --git a/tests/integration/layer1_repository_test.py b/tests/integration/layer1_repository_test.py index 2b5e1d6a..b0f04ae1 100644 --- a/tests/integration/layer1_repository_test.py +++ b/tests/integration/layer1_repository_test.py @@ -16,7 +16,9 @@ def setUpClass(cls) -> None: cls.depot = commands.get_mock_depot() cls.depot.common_repo = repositories.CommonRepository(cls.pg_storage.get_storage(), structlog.get_logger()) - cls.depot.layer0_repo = repositories.Layer0Repository(cls.pg_storage.get_storage(), structlog.get_logger()) + cls.depot.layer0_repo = repositories.Layer0Repository( + cls.depot.common_repo, cls.pg_storage.get_storage(), structlog.get_logger() + ) cls.depot.layer1_repo = repositories.Layer1Repository(cls.pg_storage.get_storage(), structlog.get_logger()) def tearDown(self): diff --git a/tests/integration/layer_0_repository_test.py b/tests/integration/layer_0_repository_test.py index 51993d4a..37e21cda 100644 --- a/tests/integration/layer_0_repository_test.py +++ b/tests/integration/layer_0_repository_test.py @@ -5,7 +5,6 @@ from app import entities from app.data import repositories -from app.data.repositories.layer_0_repository_impl import Layer0RepositoryImpl from app.domain.model import Layer0Model from app.domain.model.layer0.biblio import Biblio from app.domain.model.layer0.coordinates import ICRSDescrStr @@ -17,22 +16,21 @@ from app.lib.storage.mapping import TYPE_INTEGER, TYPE_TEXT -class Layer0RepositoryTest(unittest.IsolatedAsyncioTestCase): +class Layer0RepositoryTest(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.pg_storage = testing.get_test_postgres_storage() common_repo = repositories.CommonRepository(cls.pg_storage.get_storage(), structlog.get_logger()) - layer0_repo = repositories.Layer0Repository(cls.pg_storage.get_storage(), structlog.get_logger()) + layer0_repo = repositories.Layer0Repository(common_repo, cls.pg_storage.get_storage(), structlog.get_logger()) cls._layer0_repo: repositories.Layer0Repository = layer0_repo cls._common_repo: repositories.CommonRepository = common_repo - cls._layer0_repo_impl: Layer0RepositoryImpl = Layer0RepositoryImpl(layer0_repo, common_repo) def tearDown(self): self.pg_storage.clear() - async def test_retrieve(self): + def test_retrieve(self): data = DataFrame({"col0": [1, 2, 3, 4], "col1": ["ad", "ad", "a", "he"]}) bib_id = self._common_repo.create_bibliography("2024arXiv240411942F", 1999, ["ade"], "title") creation = entities.Layer0Creation( @@ -44,10 +42,10 @@ async def test_retrieve(self): resp = self._layer0_repo.create_table(creation) self._layer0_repo.insert_raw_data(entities.Layer0RawData(resp.table_id, data)) - from_db = await self._layer0_repo_impl.fetch_data(Layer0QueryParam()) + from_db = self._layer0_repo.fetch_data(Layer0QueryParam()) self.assertTrue(data.equals(from_db[0].data)) - async def test_store_retrieve(self): + def test_store_retrieve(self): expected = Layer0Model( id="test_table_store_retrieve", processed=False, @@ -71,8 +69,8 @@ async def test_store_retrieve(self): } ), ) - await self._layer0_repo_impl.create_instances([expected]) - from_db = await self._layer0_repo_impl.fetch_data(Layer0QueryParam()) + self._layer0_repo.create_instances([expected]) + from_db = self._layer0_repo.fetch_data(Layer0QueryParam()) got = next(it for it in from_db if it.id == "test_table_store_retrieve") diff --git a/tests/integration/queue_test.py b/tests/integration/queue_test.py index 15d22d83..daa5a4af 100644 --- a/tests/integration/queue_test.py +++ b/tests/integration/queue_test.py @@ -24,7 +24,9 @@ def setUpClass(cls): cls.depot = commands.get_mock_depot() cls.depot.common_repo = repositories.CommonRepository(cls.pg_storage.get_storage(), logger) - cls.depot.layer0_repo = repositories.Layer0Repository(cls.pg_storage.get_storage(), logger) + cls.depot.layer0_repo = repositories.Layer0Repository( + cls.depot.common_repo, cls.pg_storage.get_storage(), logger + ) cls.depot.queue_repo = repositories.QueueRepository( cls.redis_queue.get_storage(), cls.pg_storage.config, logger ) diff --git a/tests/integration/rawdata_table_test.py b/tests/integration/rawdata_table_test.py index 5542fa5d..15409607 100644 --- a/tests/integration/rawdata_table_test.py +++ b/tests/integration/rawdata_table_test.py @@ -21,7 +21,9 @@ def setUpClass(cls) -> None: cls.depot = commands.get_mock_depot() cls.depot.common_repo = repositories.CommonRepository(cls.pg_storage.get_storage(), structlog.get_logger()) - cls.depot.layer0_repo = repositories.Layer0Repository(cls.pg_storage.get_storage(), structlog.get_logger()) + cls.depot.layer0_repo = repositories.Layer0Repository( + cls.depot.common_repo, cls.pg_storage.get_storage(), structlog.get_logger() + ) def tearDown(self): self.pg_storage.clear() diff --git a/tests/integration/user_scenario_test.py b/tests/integration/user_scenario_test.py index b03b8b02..e759b0cb 100644 --- a/tests/integration/user_scenario_test.py +++ b/tests/integration/user_scenario_test.py @@ -5,8 +5,7 @@ from pandas import DataFrame from app.data import repositories -from app.data.repositories.layer_0_repository_impl import Layer0RepositoryImpl -from app.domain import usecases +from app.domain.actions import logic_units from app.domain.cross_id_simultaneous_data_provider import PostgreSimultaneousDataProvider from app.domain.model import Layer0Model from app.domain.model.layer0.biblio import Biblio @@ -18,32 +17,27 @@ from tests.unit.domain.util import noop_cross_identify_function -class SaveAndTransform01(unittest.IsolatedAsyncioTestCase): +class SaveAndTransform01(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.pg_storage = testing.get_test_postgres_storage() common_repo = repositories.CommonRepository(cls.pg_storage.get_storage(), structlog.get_logger()) - layer0_repo = repositories.Layer0Repository(cls.pg_storage.get_storage(), structlog.get_logger()) + layer0_repo = repositories.Layer0Repository(common_repo, cls.pg_storage.get_storage(), structlog.get_logger()) tmp_repo = repositories.TmpDataRepositoryImpl(cls.pg_storage.get_storage()) - layer0_repo_impl = Layer0RepositoryImpl(layer0_repo, common_repo) - transformation_use_case = usecases.TransformationO1UseCase( + cls._transformation_depot = logic_units.TransformationO1Depot( None, noop_cross_identify_function, lambda it: PostgreSimultaneousDataProvider(it, tmp_repo), ) - cls._transaction_use_case: usecases.Transaction01UseCase = usecases.Transaction01UseCase( - transformation_use_case - ) - cls._store_l0_use_case: usecases.StoreL0UseCase = usecases.StoreL0UseCase(layer0_repo_impl) + cls._transaction_depot = logic_units.Transaction01Depot(cls._transformation_depot) cls._layer0_repo: repositories.Layer0Repository = layer0_repo - cls._layer0_repo_impl: Layer0RepositoryImpl = layer0_repo_impl def tearDown(self): self.pg_storage.clear() - async def test_save_and_transform(self): + def test_save_and_transform(self): data_from_user = Layer0Model( id="test_table_save_and_transform", processed=False, @@ -69,13 +63,13 @@ async def test_save_and_transform(self): ) # store data - await self._store_l0_use_case.invoke([data_from_user]) + self._layer0_repo.create_instances([data_from_user]) # get data from database TODO move to usecase - from_db = await self._layer0_repo_impl.fetch_data(Layer0QueryParam()) + from_db = self._layer0_repo.fetch_data(Layer0QueryParam()) got = next(it for it in from_db if it.id == "test_table_save_and_transform") - raw, transformed, fails = await self._transaction_use_case.invoke(got) + raw, transformed, fails = logic_units.transaction_0_1(self._transaction_depot, got) self.assertEqual(0, len(fails)) self.assertEqual(3, len(transformed)) diff --git a/tests/unit/data/layer0_repository_test.py b/tests/unit/data/layer0_repository_test.py index 23cbaa39..8b951a8a 100644 --- a/tests/unit/data/layer0_repository_test.py +++ b/tests/unit/data/layer0_repository_test.py @@ -4,14 +4,15 @@ import structlog from parameterized import param, parameterized -from app.data.repositories import Layer0Repository +from app.data.repositories import CommonRepository, Layer0Repository from app.lib import testing class Layer0RepositoryTest(unittest.TestCase): def setUp(self) -> None: self.storage_mock = mock.MagicMock() - self.repo = Layer0Repository(self.storage_mock, structlog.get_logger()) + common = CommonRepository(self.storage_mock, structlog.get_logger()) + self.repo = Layer0Repository(common, self.storage_mock, structlog.get_logger()) @parameterized.expand( [ diff --git a/tests/unit/domain/transaction_01_test.py b/tests/unit/domain/transaction_01_test.py index 7ca2f5a1..882de81e 100644 --- a/tests/unit/domain/transaction_01_test.py +++ b/tests/unit/domain/transaction_01_test.py @@ -3,6 +3,8 @@ from pandas import DataFrame +from app.domain.actions.logic_units import TransformationO1Depot, transaction_0_1 +from app.domain.actions.logic_units.transaction_0_1 import Transaction01Depot from app.domain.cross_id_simultaneous_data_provider import SimpleSimultaneousDataProvider from app.domain.model import Layer0Model, Layer1Model from app.domain.model.layer0.coordinates import ICRSDescrStr @@ -11,41 +13,21 @@ from app.domain.model.params.layer_0_query_param import Layer0QueryParam from app.domain.repositories.layer_0_repository import Layer0Repository from app.domain.repositories.layer_1_repository import Layer1Repository -from app.domain.usecases import TransformationO1UseCase -from app.domain.usecases.transaction_0_1_use_case import Transaction01UseCase -from app.domain.user_interaction.interaction import ResolveCoordinateParseFail -from app.domain.user_interaction.interaction_argument import AbstractArgument -from app.domain.user_interaction.interaction_result import ( - InteractionResult, - ResolveCoordinateParseFailRes, -) from tests.unit.domain.util import noop_cross_identify_function -class MockedCoordinateParseFailResolver(ResolveCoordinateParseFail): - async def eval(self, arg: AbstractArgument) -> InteractionResult: - # user always сancels the data row - return ResolveCoordinateParseFailRes(True) - - -class MockedCoordinateParseFailResolverFail(ResolveCoordinateParseFail): - async def eval(self, arg: AbstractArgument) -> InteractionResult: - # user always сancels the data row - return ResolveCoordinateParseFailRes(False) - - class MockedCachingLayer0Repo(Layer0Repository): - async def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: + def fetch_data(self, param: Layer0QueryParam) -> list[Layer0Model]: return [] def __init__(self): self.last_saved_instances = None - async def create_update_instances(self, instances: list[Layer0Model]) -> bool: + def create_update_instances(self, instances: list[Layer0Model]) -> bool: self.last_saved_instances = instances return True - async def create_instances(self, instances: list[Layer0Model]): + def create_instances(self, instances: list[Layer0Model]): """ Used to create instances, fails on conflict :param instances: @@ -56,21 +38,19 @@ class MockedCachingLayer1Repo(Layer1Repository): def __init__(self): self.last_saved_instances = None - async def get_by_name(self, name: str) -> Optional[Layer1Model]: + def get_by_name(self, name: str) -> Optional[Layer1Model]: return None - async def get_inside_square( - self, min_ra: float, max_ra: float, min_dec: float, max_dec: float - ) -> list[Layer1Model]: + def get_inside_square(self, min_ra: float, max_ra: float, min_dec: float, max_dec: float) -> list[Layer1Model]: return [] - async def save_update_instances(self, instances: list[Layer1Model]) -> bool: + def save_update_instances(self, instances: list[Layer1Model]) -> bool: self.last_saved_instances = instances return True -class Transaction01Test(unittest.IsolatedAsyncioTestCase): - async def test_transaction_coordinate_fail_0(self): +class Transaction01Test(unittest.TestCase): + def test_transaction_coordinate_fail_0(self): """ Layer 0 data has three rows, one has corrupted coordinates """ @@ -98,11 +78,11 @@ async def test_transaction_coordinate_fail_0(self): ), ) - transformation_use_case = TransformationO1UseCase( + transformation_depot = TransformationO1Depot( None, noop_cross_identify_function, lambda it: SimpleSimultaneousDataProvider(it) ) - transaction_use_case = Transaction01UseCase(transformation_use_case) - source, models, fails = await transaction_use_case.invoke(data) + transaction_depot = Transaction01Depot(transformation_depot) + source, models, fails = transaction_0_1(transaction_depot, data) self.assertEqual(2, len(models)) self.assertEqual(1, len(fails)) self.assertEqual(2, fails[0].original_row) diff --git a/tests/unit/domain/transform_01_test.py b/tests/unit/domain/transform_01_test.py index a1a59656..b5791987 100644 --- a/tests/unit/domain/transform_01_test.py +++ b/tests/unit/domain/transform_01_test.py @@ -4,6 +4,7 @@ from pandas import DataFrame from app import entities +from app.domain.actions.logic_units import TransformationO1Depot, transformation_0_1 from app.domain.cross_id_simultaneous_data_provider import ( CrossIdSimultaneousDataProvider, SimpleSimultaneousDataProvider, @@ -25,7 +26,6 @@ ParseValues, ) from app.domain.repositories.layer_2_repository import Layer2Repository -from app.domain.usecases import TransformationO1UseCase from tests.unit.domain.util import noop_cross_identify_function @@ -45,14 +45,14 @@ def func( return func -class Transform01Test(unittest.IsolatedAsyncioTestCase): +class Transform01Test(unittest.TestCase): def setUp(self): super().setUp() - self.transformation_use_case = TransformationO1UseCase( + self.transformation_O1_depot = TransformationO1Depot( None, noop_cross_identify_function, lambda it: SimpleSimultaneousDataProvider(it) ) - async def test_transform_general(self): + def test_transform_general(self): data = Layer0Model( id="1", processed=False, @@ -79,7 +79,7 @@ async def test_transform_general(self): stages = [] - models, fails = await self.transformation_use_case.invoke(data, None, stages.append) + models, fails = transformation_0_1(self.transformation_O1_depot, data, None, stages.append) self.assertEqual( [ @@ -94,7 +94,7 @@ async def test_transform_general(self): self.assertEqual(3, len(models)) self.assertEqual(0, len(fails)) - async def test_transform_fails(self): + def test_transform_fails(self): data = Layer0Model( id="1", processed=False, @@ -135,12 +135,12 @@ async def test_transform_fails(self): ), ) - _, fails = await self.transformation_use_case.invoke(data) + _, fails = transformation_0_1(self.transformation_O1_depot, data) self.assertEqual(len(fails), 2) self.assertEqual(5, fails[1].original_row) self.assertIsInstance(fails[0].cause, ValueError) - async def test_cross_identification_fails(self): + def test_cross_identification_fails(self): data = Layer0Model( id="1", processed=False, @@ -182,17 +182,17 @@ async def test_cross_identification_fails(self): ), ) - transformation_use_case = TransformationO1UseCase( + depot = TransformationO1Depot( None, get_purposefully_failing_cross_identification_function(lambda el: el.primary_name in {"fail", "fail2"}), lambda it: SimpleSimultaneousDataProvider(it), ) - res, fails = await transformation_use_case.invoke(data) + res, fails = transformation_0_1(depot, data) self.assertEqual(len(fails), 2) self.assertIsInstance(fails[0].cause, CrossIdentificationException) self.assertIsInstance(fails[1].cause, CrossIdentificationException) - async def test_name_wrong_column_fail(self): + def test_name_wrong_column_fail(self): data = Layer0Model( id="1", processed=False, @@ -235,6 +235,6 @@ async def test_name_wrong_column_fail(self): ) with self.assertRaises(KeyError) as scope: - await self.transformation_use_case.invoke(data) + transformation_0_1(self.transformation_O1_depot, data) self.assertEqual(("wrong_col",), scope.exception.args)