diff --git a/src/pds/ingress/pds-nucleus-product-completion-checker.py b/src/pds/ingress/pds-nucleus-product-completion-checker.py index b2b6b6d..13e7153 100644 --- a/src/pds/ingress/pds-nucleus-product-completion-checker.py +++ b/src/pds/ingress/pds-nucleus-product-completion-checker.py @@ -1,7 +1,7 @@ """ -============================================== -pds-nucleus-product-completion-checker-batch.py -============================================== +============================================================ +pds-nucleus-product-completion-checker.py (batch processing) +============================================================ = Lambda function to check if the staging S3 bucket has received a complete product with all required files. This lambda function is triggered periodically. @@ -18,6 +18,7 @@ import http.client import base64 import ast +import uuid from xml.dom import minidom @@ -29,10 +30,10 @@ rds_data = boto3.client('rds-data') mwaa_env_name = 'PDS-Nucleus-Airflow-Env' -dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs' mwaa_cli_command = 'dags trigger' # Read environment variables from lambda configurations +dag_name = os.environ.get('AIRFLOW_DAG_NAME') node_name = os.environ.get('NODE_NAME') es_url = os.environ.get('ES_URL') replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH') @@ -47,7 +48,7 @@ def lambda_handler(event, context): """ Main lambda handler """ - logger.setLevel(logging.INFO) + logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) logger.info(f"Lambda Request ID: {context.aws_request_id}") @@ -65,11 +66,13 @@ def process_completed_products(): logger.debug("Checking completed products...") - sql = """ - select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label - NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping - where s3_url_of_data_file - NOT IN (select s3_url_of_data_file from data_file)); + sql = """ + SELECT DISTINCT s3_url_of_product_label from product + WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label + NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping + where s3_url_of_data_file + NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label + IN (SELECT s3_url_of_product_label from product_data_file_mapping); """ response = rds_data.execute_statement( @@ -136,7 +139,7 @@ def submit_data_to_nucleus(list_of_product_labels_to_process): def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_process): -""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """ + """ Creates harvest manifest file and harvest config file and trigger Nucleus workflow """ logger.debug('List of product labels to process:' + str(list_of_product_labels_to_process)) @@ -144,7 +147,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc harvest_config_dir = efs_mount_path + '/harvest-configs' - file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1)) + file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) ) harvest_manifest_content = "" list_of_product_labels_to_process_with_file_paths = [] @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n' list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location) + # Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names + random_suffix = uuid.uuid4().hex + try: os.makedirs(harvest_config_dir, exist_ok=True) - harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg' - harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt' + harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg' + harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt' logger.debug(f"Manifest content: {str(harvest_manifest_content)}") @@ -189,6 +195,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc logger.info(f"Created harvest config XML file: {harvest_config_file_path}") except Exception as e: logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}") + return trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path, list_of_product_labels_to_process_with_file_paths) @@ -197,7 +204,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc def trigger_nucleus_workflow(harvest_manifest_file_path, pds_harvest_config_file, list_of_product_labels_to_process): -""" Triggers Nucleus workflow with parameters """ + """ Triggers Nucleus workflow with parameters """ # Convert list to comma seperated list delim = ","