From e42598b4381fbdcd05fa972cee76e6726fb048f7 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Wed, 28 May 2025 17:06:28 -0700 Subject: [PATCH 01/11] feat: collection deletion --- .../cumulus_wrapper/query_collections.py | 33 ++++++++ .../lib/uds_db/granules_db_index.py | 11 +++ .../uds_api/collections_api.py | 61 +++++++++++++- .../uds_api/dapa/collections_dapa_creation.py | 79 +++++++++++++++++++ 4 files changed, 183 insertions(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 449d487c..43823690 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -158,6 +158,39 @@ def query_rules(self, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'results': query_result} + def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores + rule_name = f'{underscore_collection_name}___{new_collection["version"]}___rules_sqs' + payload = { + 'httpMethod': 'DELETE', + 'resource': '/{proxy+}', + 'path': f'/{self.__rules_key}/{rule_name}', + 'headers': { + 'Content-Type': 'application/json', + }, + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'message' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result['message']} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py index 2fae3f9d..103c8d3b 100644 --- a/cumulus_lambda_functions/lib/uds_db/granules_db_index.py +++ b/cumulus_lambda_functions/lib/uds_db/granules_db_index.py @@ -305,6 +305,17 @@ def add_entry(self, tenant: str, tenant_venue: str, json_body: dict, doc_id: str # TODO validate custom metadata vs the latest index to filter extra items return + def get_size(self, tenant: str, tenant_venue: str, collection_id: str): + read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip() + search_dsl = { + 'query': {'bool': {'must': [{ + 'term': {'collection': collection_id} + }]}}, + 'size': 0 + } + search_result = self.__es.query(search_dsl, querying_index=read_alias_name) + return self.__es.get_result_size(search_result) + def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict): read_alias_name = f'{DBConstants.granules_read_alias_prefix}_{tenant}_{tenant_venue}'.lower().strip() if 'sort' not in search_dsl: # We cannot paginate w/o sort. So, max is 10k items: diff --git a/cumulus_lambda_functions/uds_api/collections_api.py b/cumulus_lambda_functions/uds_api/collections_api.py index 3551a7b5..d6d24ade 100644 --- a/cumulus_lambda_functions/uds_api/collections_api.py +++ b/cumulus_lambda_functions/uds_api/collections_api.py @@ -1,10 +1,12 @@ import json import os +from datetime import datetime from typing import Union -from pystac import Catalog, Link +from pystac import Catalog, Link, Collection, Extent, SpatialExtent, TemporalExtent, Summaries, Provider from cumulus_lambda_functions.lib.uds_db.db_constants import DBConstants +from cumulus_lambda_functions.lib.uds_db.granules_db_index import GranulesDbIndex from cumulus_lambda_functions.lib.uds_db.uds_collections import UdsCollections @@ -276,3 +278,60 @@ async def query_collections(request: Request, collection_id: Union[str, None] = if collections_result['statusCode'] == 200: return collections_result['body'] raise HTTPException(status_code=collections_result['statusCode'], detail=collections_result['body']) + +@router.delete("/{collection_id}") +@router.delete("/{collection_id}/") +async def delete_single_collection(request: Request, collection_id: str): + LOGGER.debug(f'starting delete_single_collection: {collection_id}') + LOGGER.debug(f'starting delete_single_collection request: {request}') + + authorizer: UDSAuthorizorAbstract = UDSAuthorizerFactory() \ + .get_instance(UDSAuthorizerFactory.cognito, + es_url=os.getenv('ES_URL'), + es_port=int(os.getenv('ES_PORT', '443')) + ) + auth_info = FastApiUtils.get_authorization_info(request) + uds_collections = UdsCollections(es_url=os.getenv('ES_URL'), + es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS')) + if collection_id is None or collection_id == '': + raise HTTPException(status_code=500, detail=f'missing or invalid collection_id: {collection_id}') + collection_identifier = uds_collections.decode_identifier(collection_id) + if not authorizer.is_authorized_for_collection(DBConstants.delete, collection_id, + auth_info['ldap_groups'], + collection_identifier.tenant, + collection_identifier.venue): + LOGGER.debug(f'user: {auth_info["username"]} is not authorized for {collection_id}') + raise HTTPException(status_code=403, detail=json.dumps({ + 'message': 'not authorized to execute this action' + })) + + granules_count = GranulesDbIndex().get_size(collection_identifier.tenant, collection_identifier.venue, + collection_id) + LOGGER.debug(f'granules_count: {granules_count} for {collection_id}') + if granules_count > 0: + LOGGER.debug(f'NOT deleting {collection_id} as it is not empty') + raise HTTPException(status_code=409, detail=f'NOT deleting {collection_id} as it is not empty') + + try: + new_collection = Collection( + id=collection_id, + description='TODO', + extent = Extent( + SpatialExtent([[0.0, 0.0, 0.0, 0.0]]), + TemporalExtent([[datetime.utcnow(), datetime.utcnow()]]) + ), + license = "proprietary", + providers = [], + # title=input_collection['LongName'], + # keywords=[input_collection['SpatialKeywords']['Keyword']], + summaries = Summaries({ + "totalGranules": [-1], + }), + ) + creation_result = CollectionDapaCreation(new_collection).delete() + except Exception as e: + LOGGER.exception('failed during ingest_cnm_dapa') + raise HTTPException(status_code=500, detail=str(e)) + if creation_result['statusCode'] < 300: + return creation_result['body'], creation_result['statusCode'] + raise HTTPException(status_code=creation_result['statusCode'], detail=creation_result['body']) diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 27fe645d..42a8611a 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -81,6 +81,18 @@ def __init__(self, request_body): self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True) self.__cumulus_collection_query = CollectionsQuery('', '') + def __delete_collection_cumulus(self, cumulus_collection_doc): + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + if 'status' not in delete_result: + LOGGER.error(f'status not in creation_result: {delete_result}') + return { + 'statusCode': 500, + 'body': { + 'message': delete_result + } + }, None + return None, delete_result + def __create_collection_cumulus(self, cumulus_collection_doc): creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) if 'status' not in creation_result: @@ -116,6 +128,37 @@ def __create_rules_cumulus(self, cumulus_collection_doc): } return None + def __delete_rules_cumulus(self, cumulus_collection_doc): + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( + cumulus_collection_doc, + self.__cumulus_lambda_prefix + ) + if 'status' not in rule_deletion_result: + LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') + return { + 'statusCode': 500, + 'body': { + 'message': rule_deletion_result, + 'details': f'collection deletion result: {rule_deletion_result}' + } + } + return None + + def __delete_collection_uds(self): + try: + delete_collection_result = self.__uds_collection.delete_collection( + collection_id=self.__collection_transformer.get_collection_id() + ) + except Exception as e: + LOGGER.exception(f'failed to add collection to Elasticsearch') + return { + 'statusCode': 500, + 'body': { + 'message': f'unable to delete collection to Elasticsearch: {str(e)}', + } + } + return None + def __create_collection_uds(self, cumulus_collection_doc): try: @@ -143,6 +186,42 @@ def __create_collection_uds(self, cumulus_collection_doc): } return None + def delete(self): + deletion_result = {} + try: + cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) + self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider + LOGGER.debug(f'__provider_id: {self.__provider_id}') + creation_result = 'NA' + + if self.__include_cumulus: + rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc) + deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded' + delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc) + deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result + else: + deletion_result['cumulus_rule_deletion'] = 'NA' + deletion_result['cumulus_collection_deletion'] = 'NA' + + uds_deletion_result = self.__delete_collection_uds() + deletion_result['uds_collection_deletion'] = uds_deletion_result if uds_deletion_result is not None else 'succeeded' + except Exception as e: + LOGGER.exception('error while creating new collection in Cumulus') + return { + 'statusCode': 500, + 'body': { + 'message': f'error while creating new collection in Cumulus. check details', + 'details': str(e) + } + } + LOGGER.info(f'creation_result: {creation_result}') + return { + 'statusCode': 200, + 'body': { + 'message': deletion_result + } + } + def create(self): try: cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) From a573582fe8f00d2b038a72599de258586dfd0457 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Mon, 2 Jun 2025 15:53:40 -0700 Subject: [PATCH 02/11] fix: update collection to pass existing code validation --- cumulus_lambda_functions/uds_api/collections_api.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cumulus_lambda_functions/uds_api/collections_api.py b/cumulus_lambda_functions/uds_api/collections_api.py index d6d24ade..3b299961 100644 --- a/cumulus_lambda_functions/uds_api/collections_api.py +++ b/cumulus_lambda_functions/uds_api/collections_api.py @@ -315,6 +315,7 @@ async def delete_single_collection(request: Request, collection_id: str): try: new_collection = Collection( id=collection_id, + title=collection_id, description='TODO', extent = Extent( SpatialExtent([[0.0, 0.0, 0.0, 0.0]]), @@ -328,6 +329,14 @@ async def delete_single_collection(request: Request, collection_id: str): "totalGranules": [-1], }), ) + new_collection.links = [ + Link(rel='root', + target=f'./collection.json', + media_type='application/json', title=f"{new_collection.id}"), + Link(rel='item', + target='./collection.json', + media_type='application/json', title=f"{new_collection.id} Granules") + ] creation_result = CollectionDapaCreation(new_collection).delete() except Exception as e: LOGGER.exception('failed during ingest_cnm_dapa') From 5f7c4b0f7933c53e99fe5c1e794cfc56686a8c5f Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Mon, 2 Jun 2025 16:22:40 -0700 Subject: [PATCH 03/11] fix: expecting json dict --- cumulus_lambda_functions/uds_api/collections_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/uds_api/collections_api.py b/cumulus_lambda_functions/uds_api/collections_api.py index 3b299961..6a7c2d2e 100644 --- a/cumulus_lambda_functions/uds_api/collections_api.py +++ b/cumulus_lambda_functions/uds_api/collections_api.py @@ -337,7 +337,7 @@ async def delete_single_collection(request: Request, collection_id: str): target='./collection.json', media_type='application/json', title=f"{new_collection.id} Granules") ] - creation_result = CollectionDapaCreation(new_collection).delete() + creation_result = CollectionDapaCreation(new_collection.to_dict(False, False)).delete() except Exception as e: LOGGER.exception('failed during ingest_cnm_dapa') raise HTTPException(status_code=500, detail=str(e)) From 6cb657bc7922377fb7add27764edcfd96edf4bb8 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Mon, 9 Jun 2025 17:27:32 -0700 Subject: [PATCH 04/11] fix: deleting executions and wait for 10 seconds --- .../cumulus_wrapper/query_collections.py | 39 ++++++++ .../uds_api/dapa/collections_dapa_creation.py | 92 ++++++------------- 2 files changed, 67 insertions(+), 64 deletions(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 43823690..e48ae651 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -34,6 +34,7 @@ def with_collections(self, collection_ids: list): collection_names = [k.split('___')[0] for k in collection_ids] self._conditions.append(f'{self.__collection_name}__in={",".join(collection_names)}') return self + def get_size(self, private_api_prefix: str): query_params = {'field': 'status', 'type': 'collections'} main_conditions = {k[0]: k[1] for k in [k1.split('=') for k1 in self._conditions]} @@ -191,6 +192,44 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'status': query_result['message']} + def delete_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores + request_body = { + "collectionId": f'{underscore_collection_name}___{new_collection["version"]}', + "esBatchSize": 100000, + "dbBatchSize": 50000 + } + payload = { + 'httpMethod': 'POST', + 'resource': '/{proxy+}', + 'path': f'/executions/bulk-delete-by-collection', + 'headers': { + 'Content-Type': 'application/json', + }, + 'body': json.dumps(request_body) + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'message' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result['message']} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 42a8611a..f282d902 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -1,5 +1,6 @@ import json import os +from time import sleep from typing import Optional import pystac @@ -81,68 +82,16 @@ def __init__(self, request_body): self.__uds_collection = UdsCollections(es_url=os.getenv('ES_URL'), es_port=int(os.getenv('ES_PORT', '443')), es_type=os.getenv('ES_TYPE', 'AWS'), use_ssl=os.getenv('ES_USE_SSL', 'TRUE').strip() is True) self.__cumulus_collection_query = CollectionsQuery('', '') - def __delete_collection_cumulus(self, cumulus_collection_doc): - delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) - if 'status' not in delete_result: - LOGGER.error(f'status not in creation_result: {delete_result}') + def analyze_cumulus_result(self, cumulus_request_result): + if 'status' not in cumulus_request_result: + LOGGER.error(f'status not in cumulus_request_result: {cumulus_request_result}') return { 'statusCode': 500, 'body': { - 'message': delete_result + 'message': cumulus_request_result } }, None - return None, delete_result - - def __create_collection_cumulus(self, cumulus_collection_doc): - creation_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) - if 'status' not in creation_result: - LOGGER.error(f'status not in creation_result: {creation_result}') - return { - 'statusCode': 500, - 'body': { - 'message': creation_result - } - }, None - return None, creation_result - - def __create_rules_cumulus(self, cumulus_collection_doc): - rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix, - self.__ingest_sqs_url, - self.__provider_id, - self.__workflow_name, - ) - if 'status' not in rule_creation_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_creation_result}') - delete_collection_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, - cumulus_collection_doc['name'], - cumulus_collection_doc['version']) - self.__uds_collection.delete_collection(self.__collection_transformer.get_collection_id()) - return { - 'statusCode': 500, - 'body': { - 'message': rule_creation_result, - 'details': f'collection deletion result: {delete_collection_result}' - } - } - return None - - def __delete_rules_cumulus(self, cumulus_collection_doc): - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix - ) - if 'status' not in rule_deletion_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') - return { - 'statusCode': 500, - 'body': { - 'message': rule_deletion_result, - 'details': f'collection deletion result: {rule_deletion_result}' - } - } - return None + return None, cumulus_request_result def __delete_collection_uds(self): try: @@ -195,11 +144,18 @@ def delete(self): creation_result = 'NA' if self.__include_cumulus: - rules_deletion_result = self.__delete_rules_cumulus(cumulus_collection_doc) - deletion_result['cumulus_rule_deletion'] = rules_deletion_result if rules_deletion_result is not None else 'succeeded' - delete_err, delete_result = self.__delete_collection_cumulus(cumulus_collection_doc) + executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) + deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result + sleep(10) + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) + rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) + deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result else: + deletion_result['cumulus_executions_deletion'] = 'NA' deletion_result['cumulus_rule_deletion'] = 'NA' deletion_result['cumulus_collection_deletion'] = 'NA' @@ -229,16 +185,24 @@ def create(self): LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: - creation_err, creation_result = self.__create_collection_cumulus(cumulus_collection_doc) + creation_cumulus_result = self.__cumulus_collection_query.create_collection(cumulus_collection_doc, self.__cumulus_lambda_prefix) + creation_err, creation_result = self.analyze_cumulus_result(creation_cumulus_result) if creation_err is not None: return creation_err uds_creation_result = self.__create_collection_uds(cumulus_collection_doc) if uds_creation_result is not None: return uds_creation_result if self.__include_cumulus: - create_rule_result = self.__create_rules_cumulus(cumulus_collection_doc) - if create_rule_result is not None: - return create_rule_result + rule_creation_result = self.__cumulus_collection_query.create_sqs_rules( + cumulus_collection_doc, + self.__cumulus_lambda_prefix, + self.__ingest_sqs_url, + self.__provider_id, + self.__workflow_name, + ) + create_rule_err, create_rule_result = self.analyze_cumulus_result(rule_creation_result) + if create_rule_err is not None: + return create_rule_err # validation_result = pystac.Collection.from_dict(self.__request_body).validate() # cumulus_collection_query = CollectionsQuery('', '') # From 63e555b67b099dd1d46b24e06299e8e47aec3d85 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 10 Jun 2025 15:43:34 -0700 Subject: [PATCH 05/11] fix: normal name to delete executions --- cumulus_lambda_functions/cumulus_wrapper/query_collections.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index e48ae651..674b2e04 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -194,9 +194,8 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): def delete_executions(self, new_collection: dict, private_api_prefix: str): # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' - underscore_collection_name = re.sub(r'[^a-zA-Z0-9_]', '___', new_collection["name"]) # replace any character that's not alphanumeric or underscore with 3 underscores request_body = { - "collectionId": f'{underscore_collection_name}___{new_collection["version"]}', + "collectionId": new_collection["name"], "esBatchSize": 100000, "dbBatchSize": 50000 } From 499de8b234663351dceb063a31ab1fb192b55a82 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 10 Jun 2025 16:16:57 -0700 Subject: [PATCH 06/11] fix: need name___version --- cumulus_lambda_functions/cumulus_wrapper/query_collections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 674b2e04..dfa322b2 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -195,7 +195,7 @@ def delete_sqs_rules(self, new_collection: dict, private_api_prefix: str): def delete_executions(self, new_collection: dict, private_api_prefix: str): # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' request_body = { - "collectionId": new_collection["name"], + "collectionId": f'{new_collection["name"]}___{new_collection["version"]}', "esBatchSize": 100000, "dbBatchSize": 50000 } From 854da5bb7965daa20ee0f0b103b04167c1fff1e6 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 10 Jun 2025 17:32:59 -0700 Subject: [PATCH 07/11] fix: reduce batch size --- cumulus_lambda_functions/cumulus_wrapper/query_collections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index dfa322b2..d9d000a3 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -196,7 +196,7 @@ def delete_executions(self, new_collection: dict, private_api_prefix: str): # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' request_body = { "collectionId": f'{new_collection["name"]}___{new_collection["version"]}', - "esBatchSize": 100000, + "esBatchSize": 10000, "dbBatchSize": 50000 } payload = { From 93bc64521b3c4cf79a2cc14abe2854259c042a7d Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 24 Jun 2025 09:08:52 -0700 Subject: [PATCH 08/11] fix: brute force delete retry --- .../cumulus_wrapper/query_collections.py | 4 ++-- .../uds_api/dapa/collections_dapa_creation.py | 17 ++++++++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index d9d000a3..2217ae1d 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -222,12 +222,12 @@ def delete_executions(self, new_collection: dict, private_api_prefix: str): return {'client_error': query_result} query_result = json.loads(query_result['body']) LOGGER.debug(f'json query_result: {query_result}') - if 'message' not in query_result: + if 'id' not in query_result: return {'server_error': f'invalid response: {query_result}'} except Exception as e: LOGGER.exception('error while invoking') return {'server_error': f'error while invoking:{str(e)}'} - return {'status': query_result['message']} + return {'status': query_result} def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index f9b68258..63fdbcc8 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -179,12 +179,23 @@ def delete(self): exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result sleep(10) - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) - rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) - deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) delete_err, delete_result = self.analyze_cumulus_result(delete_result) + if delete_err is not None: + executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, + self.__cumulus_lambda_prefix) + exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) + deletion_result[ + 'cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result + sleep(10) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, + cumulus_collection_doc['name'], + cumulus_collection_doc['version']) + delete_err, delete_result = self.analyze_cumulus_result(delete_result) deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) + rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) + deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result else: deletion_result['cumulus_executions_deletion'] = 'NA' deletion_result['cumulus_rule_deletion'] = 'NA' From 45c5b06d069c2b0a8b97a82e5dcd7bd2bdef9c60 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 24 Jun 2025 09:40:36 -0700 Subject: [PATCH 09/11] fix: delete rule first then collection --- .../uds_api/dapa/collections_dapa_creation.py | 66 ++++++------------- 1 file changed, 20 insertions(+), 46 deletions(-) diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 63fdbcc8..832182ee 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -93,36 +93,6 @@ def analyze_cumulus_result(self, cumulus_request_result): }, None return None, cumulus_request_result - def __delete_collection_uds(self): - try: - delete_collection_result = self.__uds_collection.delete_collection( - collection_id=self.__collection_transformer.get_collection_id() - ) - except Exception as e: - LOGGER.exception(f'failed to add collection to Elasticsearch') - return { - 'statusCode': 500, - 'body': { - 'message': f'unable to delete collection to Elasticsearch: {str(e)}', - } - } - return None - - def __delete_rules_cumulus(self, cumulus_collection_doc): - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules( - cumulus_collection_doc, - self.__cumulus_lambda_prefix - ) - if 'status' not in rule_deletion_result: - LOGGER.error(f'status not in rule_creation_result. deleting collection: {rule_deletion_result}') - return { - 'statusCode': 500, - 'body': { - 'message': rule_deletion_result, - 'details': f'collection deletion result: {rule_deletion_result}' - } - } - return None def __delete_collection_uds(self): try: @@ -175,27 +145,17 @@ def delete(self): creation_result = 'NA' if self.__include_cumulus: - executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) - exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) - deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result - sleep(10) + self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) delete_err, delete_result = self.analyze_cumulus_result(delete_result) if delete_err is not None: - executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, - self.__cumulus_lambda_prefix) - exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) - deletion_result[ - 'cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result - sleep(10) - delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, - cumulus_collection_doc['name'], - cumulus_collection_doc['version']) + LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}') + self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + self.__delete_collection_rule(cumulus_collection_doc, deletion_result) + delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) delete_err, delete_result = self.analyze_cumulus_result(delete_result) deletion_result['cumulus_collection_deletion'] = delete_err if delete_err is not None else delete_result - rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) - rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) - deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result else: deletion_result['cumulus_executions_deletion'] = 'NA' deletion_result['cumulus_rule_deletion'] = 'NA' @@ -220,6 +180,20 @@ def delete(self): } } + def __delete_collection_rule(self, cumulus_collection_doc, deletion_result): + if 'cumulus_rule_deletion' in deletion_result and 'statusCode' not in deletion_result['cumulus_rule_deletion']: + return + rule_deletion_result = self.__cumulus_collection_query.delete_sqs_rules(cumulus_collection_doc, self.__cumulus_lambda_prefix) + rule_delete_err, rule_delete_result = self.analyze_cumulus_result(rule_deletion_result) + deletion_result['cumulus_rule_deletion'] = rule_delete_err if rule_delete_err is not None else rule_delete_result + return + + def __delete_collection_execution(self, cumulus_collection_doc, deletion_result): + executions_delete_result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + exec_delete_err, exec_delete_result = self.analyze_cumulus_result(executions_delete_result) + deletion_result['cumulus_executions_deletion'] = exec_delete_err if exec_delete_err is not None else exec_delete_result + sleep(10) + return def create(self): try: cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) From 8f706be3d5e072f26b668921cd3a41d76e37d8b0 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 24 Jun 2025 15:48:18 -0700 Subject: [PATCH 10/11] fix: get executions for collection --- .../cumulus_wrapper/query_collections.py | 32 +++++++++++++++++++ .../uds_api/dapa/collections_dapa_creation.py | 3 ++ 2 files changed, 35 insertions(+) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 2217ae1d..7f623cd3 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -229,6 +229,38 @@ def delete_executions(self, new_collection: dict, private_api_prefix: str): return {'server_error': f'error while invoking:{str(e)}'} return {'status': query_result} + def list_executions(self, new_collection: dict, private_api_prefix: str): + # $ curl --request DELETE https://example.com/rules/repeat_test --header 'Authorization: Bearer ReplaceWithTheToken' + payload = { + 'httpMethod': 'GET', + 'resource': '/{proxy+}', + 'path': f'/executions', + 'queryStringParameters': {'limit': '100', 'collectionId': f'{new_collection["name"]}___{new_collection["version"]}'}, + 'headers': { + 'Content-Type': 'application/json', + } + } + LOGGER.debug(f'payload: {payload}') + try: + query_result = self._invoke_api(payload, private_api_prefix) + """ + {'statusCode': 500, 'body': '', 'headers': {}} + """ + if query_result['statusCode'] >= 500: + LOGGER.error(f'server error status code: {query_result["statusCode"]}. details: {query_result}') + return {'server_error': query_result} + if query_result['statusCode'] >= 400: + LOGGER.error(f'client error status code: {query_result["statusCode"]}. details: {query_result}') + return {'client_error': query_result} + query_result = json.loads(query_result['body']) + LOGGER.debug(f'json query_result: {query_result}') + if 'id' not in query_result: + return {'server_error': f'invalid response: {query_result}'} + except Exception as e: + LOGGER.exception('error while invoking') + return {'server_error': f'error while invoking:{str(e)}'} + return {'status': query_result} + def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ curl --request POST "$CUMULUS_BASEURL/rules" --header "Authorization: Bearer $cumulus_token" --header 'Content-Type: application/json' --data '{ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 832182ee..09009aa9 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -139,12 +139,15 @@ def __create_collection_uds(self, cumulus_collection_doc): def delete(self): deletion_result = {} try: + cumulus_collection_doc = self.__collection_transformer.from_stac(self.__request_body) self.__provider_id = self.__provider_id if self.__collection_transformer.output_provider is None else self.__collection_transformer.output_provider LOGGER.debug(f'__provider_id: {self.__provider_id}') creation_result = 'NA' if self.__include_cumulus: + result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + print(f'execution list result: {result}') self.__delete_collection_execution(cumulus_collection_doc, deletion_result) self.__delete_collection_rule(cumulus_collection_doc, deletion_result) delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) From fbf5656cca4346820ffcad9be7c4a44ffb6fbc69 Mon Sep 17 00:00:00 2001 From: Wai Phyo Date: Tue, 24 Jun 2025 16:45:09 -0700 Subject: [PATCH 11/11] fix: delete only executions if existed --- .../cumulus_wrapper/query_collections.py | 4 ++-- .../uds_api/dapa/collections_dapa_creation.py | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py index 7f623cd3..0c43b3c6 100644 --- a/cumulus_lambda_functions/cumulus_wrapper/query_collections.py +++ b/cumulus_lambda_functions/cumulus_wrapper/query_collections.py @@ -254,12 +254,12 @@ def list_executions(self, new_collection: dict, private_api_prefix: str): return {'client_error': query_result} query_result = json.loads(query_result['body']) LOGGER.debug(f'json query_result: {query_result}') - if 'id' not in query_result: + if 'results' not in query_result: return {'server_error': f'invalid response: {query_result}'} except Exception as e: LOGGER.exception('error while invoking') return {'server_error': f'error while invoking:{str(e)}'} - return {'status': query_result} + return {'results': query_result['results']} def create_sqs_rules(self, new_collection: dict, private_api_prefix: str, sqs_url: str, provider_name: str = '', workflow_name: str = 'CatalogGranule', visibility_timeout: int = 1800): """ diff --git a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py index 09009aa9..6b111a9d 100644 --- a/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py +++ b/cumulus_lambda_functions/uds_api/dapa/collections_dapa_creation.py @@ -146,15 +146,23 @@ def delete(self): creation_result = 'NA' if self.__include_cumulus: - result = self.__cumulus_collection_query.delete_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) - print(f'execution list result: {result}') - self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + result = self.__cumulus_collection_query.list_executions(cumulus_collection_doc, self.__cumulus_lambda_prefix) + LOGGER.debug(f'execution list result: {result}') + if len(result['results']) > 0: + self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + return { + 'statusCode': 409, + 'body': { + 'message': f'There are cumulus executions for this collection. Deleting them. Pls try again in a few minutes.', + } + } + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) self.__delete_collection_rule(cumulus_collection_doc, deletion_result) delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) delete_err, delete_result = self.analyze_cumulus_result(delete_result) if delete_err is not None: LOGGER.error(f'deleting collection ends in error. Trying again. {delete_err}') - self.__delete_collection_execution(cumulus_collection_doc, deletion_result) + # self.__delete_collection_execution(cumulus_collection_doc, deletion_result) self.__delete_collection_rule(cumulus_collection_doc, deletion_result) delete_result = self.__cumulus_collection_query.delete_collection(self.__cumulus_lambda_prefix, cumulus_collection_doc['name'], cumulus_collection_doc['version']) delete_err, delete_result = self.analyze_cumulus_result(delete_result)