diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 43823690..0c43b3c6 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list): collection_names = [k.split('___')[0] for k in collection_ids] self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}') return self + def get_size(self, private_api_prefix: str): query_params = {'field': 'status', 'type': 'collections'} main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]} @@ -191,6 +192,75 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'status': query_result['message']} + def delete_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + request_body = { + "collectionId": f'{new_collection["name"]}___{new_collection["version"]}', + "esBatchSize": 10000, + "dbBatchSize": 50000 + } + payload = { + 'httpMethod': 'POST', + 'resource': '/{proxy+}', + 'path': f'/executions/bulk-delete-by-collection', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': json.dumps(request_body) + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'id' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result} + + def list_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + payload = { + 'httpMethod': 'GET', + 'resource': '/{proxy+}', + 'path': f'/executions', + 'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'}, + 'headers': { + 'Content-Type': 'application/json', + } + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'results' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'results': query_result['results']} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 42a8611a..6b111a9d 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -1,5 +1,6 @@ import json import os +from time import sleep from typing import Optional import pystac @@ -81,68 +82,17 @@ def __init__(self, request_body): self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True) self.__cumulus_collection_query = CollectionsQuery('', '') - def __delete_collection_cumulus(self, cumulus_collection_doc): - delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) - if 'status' not in delete_result: - LOGGER.error(f'status not in creation_result: {delete_result}') + def analyze_cumulus_result(self, cumulus_request_result): + if 'status' not in cumulus_request_result: + LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}') return { 'statusCode': 500, 'body': { - 'message': delete_result + 'message': cumulus_request_result } }, None - return None, delete_result + return None, cumulus_request_result - def __create_collection_cumulus(self, cumulus_collection_doc): - creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) - if 'status' not in creation_result: - LOGGER.error(f'status not in creation_result: {creation_result}') - return { - 'statusCode': 500, - 'body': { - 'message': creation_result - } - }, None - return None, creation_result - - def __create_rules_cumulus(self, cumulus_collection_doc): - rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix, - self.__ingest_sqs_url, - self.__provider_id, - self.__workflow_name, - ) - if 'status' not in rule_creation_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}') - delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, - cumulus_collection_doc['name'], - cumulus_collection_doc['version']) - self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id()) - return { - 'statusCode': 500, - 'body': { - 'message': rule_creation_result, - 'details': f'collection deletion result: {delete_collection_result}' - } - } - return None - - def __delete_rules_cumulus(self, cumulus_collection_doc): - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix - ) - if 'status' not in rule_deletion_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') - return { - 'statusCode': 500, - 'body': { - 'message': rule_deletion_result, - 'details': f'collection deletion result: {rule_deletion_result}' - } - } - return None def __delete_collection_uds(self): try: @@ -189,17 +139,36 @@ def __create_collection_uds(self, cumulus_collection_doc): def delete(self): deletion_result = {} try: + cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: - rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc) - deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded' - delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc) + result = self.__cumulus_collection_query.list_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + LOGGER.debug(f'execution list result: {result}') + if len(result['results']) > 0: + self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + return { + 'statusCode': 409, + 'body': { + 'message': f'There are cumulus executions for this collection. Deleting them. Pls try again in a few minutes.', + } + } + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) + if delete_err is not None: + LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}') + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result else: + deletion_result['cumulus_executions_deletion'] = 'NA' deletion_result['cumulus_rule_deletion'] = 'NA' deletion_result['cumulus_collection_deletion'] = 'NA' @@ -222,6 +191,20 @@ def delete(self): } } + def __delete_collection_rule(self, cumulus_collection_doc, deletion_result): + if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']: + return + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) + rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) + deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result + return + + def __delete_collection_execution(self, cumulus_collection_doc, deletion_result): + executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) + deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result + sleep(10) + return def create(self): try: cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) @@ -229,16 +212,24 @@ def create(self): LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: - creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc) + creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) + creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result) if creation_err is not None: return creation_err uds_creation_result = self.__create_collection_uds(cumulus_collection_doc) if uds_creation_result is not None: return uds_creation_result if self.__include_cumulus: - create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc) - if create_rule_result is not None: - return create_rule_result + rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( + cumulus_collection_doc, + self.__cumulus_lambda_prefix, + self.__ingest_sqs_url, + self.__provider_id, + self.__workflow_name, + ) + create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result) + if create_rule_err is not None: + return create_rule_err # validation_result = pystac.Collection.from_dict(self.__request_body).validate() # cumulus_collection_query = CollectionsQuery('', '') #