diff --git a/cumulus_lambda_functions/cumulus_es_setup/es_setup.py b/cumulus_lambda_functions/cumulus_es_setup/es_setup.py index 8f595442..f72755bf 100644 --- a/cumulus_lambda_functions/cumulus_es_setup/es_setup.py +++ b/cumulus_lambda_functions/cumulus_es_setup/es_setup.py @@ -1,5 +1,8 @@ import os +import requests + +from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator from mdps_ds_lib.lib.aws.es_abstract import ESAbstract @@ -23,6 +26,36 @@ def __init__(self): port=int(os.getenv('ES_PORT', '443')) ) + def setup_maap_daac_index(self): + stac_fast_version = '6.0.0' + url = f"https://raw.githubusercontent.com/stac-utils/stac-fastapi-elasticsearch-opensearch/refs/tags/v{stac_fast_version}/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/mappings.py" + resp = requests.get(url) + resp.raise_for_status() + + code = resp.text + namespace = {} + exec(code, namespace) + es_items_mappings = namespace["ES_ITEMS_MAPPINGS"] + LOGGER.debug(f'stac fast API es_items_mappings: {es_items_mappings}') + es_items_mappings['properties'] = { + **GranulesIndexMapping.percolator_mappings, + **es_items_mappings['properties'], + } + index_mapping = { + "settings": { + "number_of_shards": 3, + "number_of_replicas": 2 + }, + "mappings": es_items_mappings + } + index_name = f'{GranulesIndexMapping.daac_percolator_name}--{stac_fast_version.replace(".", "-")}' + try: + self.__es.create_index(index_name, index_mapping) + self.__es.create_alias(index_name, GranulesIndexMapping.daac_percolator_name) + except: + LOGGER.exception(f'failed to create index / alias for: {GranulesIndexMapping.daac_percolator_name}') + return self + def get_index_mapping(self, index_name: str): if not hasattr(es_mappings, index_name): raise ValueError(f'missing index_name: {index_name}') diff --git a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py index ab1a5c2f..5fcc8ab5 100644 --- a/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py +++ b/cumulus_lambda_functions/daac_archiver/daac_archiver_logic.py @@ -3,12 +3,14 @@ from time import sleep import requests +from mdps_ds_lib.lib.cumulus_stac.item_transformer import ItemTransformer from mdps_ds_lib.lib.utils.file_utils import FileUtils from mdps_ds_lib.lib.aws.aws_s3 import AwsS3 from mdps_ds_lib.lib.aws.aws_message_transformers import AwsMessageTransformers from mdps_ds_lib.lib.utils.json_validator import JsonValidator +from pystac import Item from mdps_ds_lib.stac_fast_api_client.sfa_client_factory import SFAClientFactory from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex @@ -148,6 +150,85 @@ def send_to_daac(self, event: dict): self.send_to_daac_internal(uds_cnm_json) return + def __extract_files_maap(self, asset_dict, daac_config): + result_files = [] + # https://github.com/podaac/cloud-notification-message-schema + for k, v in asset_dict.items(): + # if v.roles[0]['type'] not in archiving_types: + # continue + temp = { + 'type': v.roles[0], + 'uri': self.revert_to_s3_url(v.href), + 'name': os.path.basename(v.href), + 'checksumType': 'md5', + 'checksum': v.extra_fields['file:checksum'], + 'size': v.extra_fields['file:size'] + } + result_files.append(temp) # TODO remove missing md5? + + if 'archiving_types' not in daac_config or len(daac_config['archiving_types']) < 1: + return result_files # TODO remove missing md5? + archiving_types = {k['data_type']: [] if 'file_extension' not in k else k['file_extension'] for k in daac_config['archiving_types']} + result_files1 = [] + for each_file in result_files: + if each_file['type'] not in archiving_types: + continue + file_extensions = archiving_types[each_file['type']] + if len(file_extensions) < 1: + result_files1.append(each_file) # TODO remove missing md5? + continue + temp_filename = each_file['name'].upper().strip() + if any([temp_filename.endswith(k.upper()) for k in file_extensions]): + result_files1.append(each_file) # TODO remove missing md5? + return result_files1 + + def send_to_daac_maap(self, granules_json): + daac_configs = self.__archive_index_logic.percolate_maap_document(granules_json) + if daac_configs is None or len(daac_configs) < 1: + LOGGER.debug(f'this granule is not configured for archival: {granules_json}') + return + granules_item = Item.from_dict(granules_json) + errors = [] + for each_daac_config in daac_configs: + LOGGER.debug(f'working on {each_daac_config}') + result = JsonValidator(UdsArchiveConfigIndex.db_record_schema).validate(each_daac_config) + if result is not None: + errors.append(f'each_daac_config does not have valid schema. Pls re-add the daac config: {result} for {each_daac_config}') + continue + try: + self.__sns.set_topic_arn(each_daac_config['daac_sns_topic_arn']) + daac_cnm_message = { + "collection": { + 'name': each_daac_config['daac_collection_name'], + 'version': each_daac_config['daac_data_version'], + }, + "identifier": granules_item.id, + "submissionTime": f'{TimeUtils.get_current_time()}Z', + "provider": each_daac_config['daac_provider'], + "version": "1.6.0", # TODO this is hardcoded? + "product": { + "name": granules_item.id, + # "dataVersion": daac_config['daac_data_version'], + 'files': self.__extract_files_maap(granules_item.assets, each_daac_config), + } + } + LOGGER.debug(f'daac_cnm_message: {daac_cnm_message}') + self.__sns.set_external_role(each_daac_config['daac_role_arn'], + each_daac_config['daac_role_session_name']).publish_message( + json.dumps(daac_cnm_message), True) + return { + 'archive_status': 'cnm_s_success', + 'archive_error_message': '', + 'archive_error_code': '', + } + except Exception as e: + LOGGER.exception(f'failed during archival process') + return { + 'archive_status': 'cnm_s_failed', + 'archive_error_message': str(e), + } + return + def update_stac(self, cnm_notification_msg): update_type = os.getenv('ARCHIVAL_STATUS_MECHANISM', '') if not any([k for k in ['UDS', 'FAST_STAC'] if k == update_type]): diff --git a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py index 2fdc5021..2e0c9a99 100644 --- a/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py +++ b/cumulus_lambda_functions/granules_to_es/granules_index_mapping.py @@ -1,4 +1,5 @@ class GranulesIndexMapping: + daac_percolator_name = 'uds_maap_percolator' archiving_keys = [ 'archive_status', 'archive_error_message', 'archive_error_code' ] diff --git a/cumulus_lambda_functions/lib/uds_db/archive_index.py b/cumulus_lambda_functions/lib/uds_db/archive_index.py index fcf74f9d..da80beb5 100644 --- a/cumulus_lambda_functions/lib/uds_db/archive_index.py +++ b/cumulus_lambda_functions/lib/uds_db/archive_index.py @@ -1,5 +1,6 @@ from copy import deepcopy +from cumulus_lambda_functions.granules_to_es.granules_index_mapping import GranulesIndexMapping from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator from mdps_ds_lib.lib.utils.json_validator import JsonValidator @@ -56,6 +57,29 @@ def __init__(self, es_url, es_port=443, es_type='AWS', use_ssl=True): port=es_port) self.__tenant, self.__venue = '', '' + def percolate_maap_document(self, document): + dsl = { + 'size': 9999, + # '_source': ['ss_name', 'ss_type', 'ss_username'], + 'query': { + 'percolate': { + 'field': 'ss_query', + 'document': document, + } + }, + # 'sort': [{'ss_name': {'order': 'asc'}}] + } + try: + percolated_result = self.__es.query(dsl, querying_index=GranulesIndexMapping.daac_percolator_name) + except Exception as e: + if e.error == 'resource_not_found_exception': + LOGGER.debug(f'unable to find document: {document} on index: {GranulesIndexMapping.daac_percolator_name}') + return None + LOGGER.exception(f'error while percolating') + raise e + percolated_result = [k['_source'] for k in percolated_result['hits']['hits']] + return percolated_result + def percolate_document(self, document_id): write_alias_name = f'{DBConstants.granules_write_alias_prefix}_{self.__tenant}_{self.__venue}'.lower().strip() current_alias = self.__es.get_alias(write_alias_name) diff --git a/cumulus_lambda_functions/uds_api/granules_archive_api.py b/cumulus_lambda_functions/uds_api/granules_archive_api.py index 39fc302e..e3a5c3b3 100644 --- a/cumulus_lambda_functions/uds_api/granules_archive_api.py +++ b/cumulus_lambda_functions/uds_api/granules_archive_api.py @@ -1,6 +1,7 @@ import json import os +from cumulus_lambda_functions.daac_archiver.daac_archiver_logic import DaacArchiverLogic from cumulus_lambda_functions.uds_api.dapa.granules_dapa_query_es import GranulesDapaQueryEs from cumulus_lambda_functions.uds_api.dapa.pagination_links_generator import PaginationLinksGenerator @@ -21,6 +22,7 @@ from fastapi import APIRouter, HTTPException, Request +from cumulus_lambda_functions.uds_api.granules_api import StacGranuleModel from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env()) @@ -143,6 +145,30 @@ async def dapa_archive_get_config(request: Request, collection_id: str): return add_result['body'] raise HTTPException(status_code=add_result['statusCode'], detail=add_result['body']) +@router.post("/{collection_id}/archive/{granule_id}") +@router.post("/{collection_id}/archive/{granule_id}/") +async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str, granule: StacGranuleModel): + authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \ + .get_instance(UDSAuthorizerFactory.cognito, + es_url=os.getenv('ES_URL'), + es_port=int(os.getenv('ES_PORT', '443')) + ) + auth_info = FastApiUtils.get_authorization_info(request) + collection_identifier = UdsCollections.decode_identifier(collection_id) + if not authorizer.is_authorized_for_collection(DBConstants.read, collection_id, + auth_info['ldap_groups'], + collection_identifier.tenant, + collection_identifier.venue): + LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}') + raise HTTPException(status_code=403, detail=json.dumps({ + 'message': 'not authorized to execute this action' + })) + new_granule = granule.model_dump() + update_result = DaacArchiverLogic().send_to_daac_maap(new_granule) + + return + + @router.put("/{collection_id}/archive/{granule_id}") @router.put("/{collection_id}/archive/{granule_id}/") async def archive_single_granule_dapa(request: Request, collection_id: str, granule_id: str): diff --git a/cumulus_lambda_functions/uds_api/system_admin_api.py b/cumulus_lambda_functions/uds_api/system_admin_api.py index 00494ebb..5107b641 100644 --- a/cumulus_lambda_functions/uds_api/system_admin_api.py +++ b/cumulus_lambda_functions/uds_api/system_admin_api.py @@ -36,3 +36,25 @@ async def es_setup(request: Request, tenant: Union[str, None]=None, venue: Union LOGGER.exception(f'') raise HTTPException(status_code=500, detail=str(e)) return {'message': 'successful'} + + +@router.put("/maap_daac_config_setup") +@router.put("/maap_daac_config_setup/") +async def maap_daac_config_setup(request: Request, tenant: Union[str, None]=None, venue: Union[str, None]=None, group_names: Union[str, None]=None): + LOGGER.debug(f'started maap_daac_config_setup') + auth_info = FastApiUtils.get_authorization_info(request) + query_body = { + 'tenant': tenant, + 'venue': venue, + 'ldap_group_names': group_names if group_names is None else [k.strip() for k in group_names.split(',')], + } + auth_crud = AuthCrud(auth_info, query_body) + is_admin_result = auth_crud.is_admin() + if is_admin_result['statusCode'] != 200: + raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body']) + try: + SetupESIndexAlias().setup_maap_daac_index() + except Exception as e: + LOGGER.exception(f'') + raise HTTPException(status_code=500, detail=str(e)) + return {'message': 'successful'} diff --git a/cumulus_lambda_functions/uds_api/web_service.py b/cumulus_lambda_functions/uds_api/web_service.py index edea8cd2..c0fac090 100644 --- a/cumulus_lambda_functions/uds_api/web_service.py +++ b/cumulus_lambda_functions/uds_api/web_service.py @@ -1,3 +1,6 @@ +import httpx +from fastapi import Response + from fastapi.staticfiles import StaticFiles from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils @@ -64,6 +67,40 @@ async def get_open_api(request: Request): default_open_api_doc['paths'].pop(k) return app.openapi() + +# NOTE: This is how you create a proxy in Fast API. + +# BACKEND_URL = 'http://localhost:8080/' # TODO make sure it ends with '/' +# @app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) +# async def proxy(full_path: str, request: Request): +# # Construct full target URL +# fast_api_path = full_path.replace(f'{api_base_prefix}/', '') +# target_url = f"{BACKEND_URL}{fast_api_path}" +# print(f'full_path = {full_path}') +# print(f'target_url = {target_url}') +# # Prepare the request +# method = request.method +# headers = dict(request.headers) +# body = await request.body() +# +# async with httpx.AsyncClient() as client: +# backend_response = await client.request( +# method, +# target_url, +# content=body, +# headers=headers, +# params=request.query_params +# ) +# +# # Return the response from the backend +# return Response( +# content=backend_response.content, +# status_code=backend_response.status_code, +# headers=dict(backend_response.headers), +# ) +# + + # to make it work with Amazon Lambda, we create a handler object handler = Mangum(app=app)