Skip to content

Commit

Permalink
UPDATE the SQL statement used in Nucleus pds-nucleus-datasync-complet…
Browse files Browse the repository at this point in the history
…ion lambda code to make sure both product table product_data_file_mapping table are updated in a consistent way (make sure both tables are updated).

Refer to task #54
  • Loading branch information
ramesh-maddegoda committed Feb 7, 2024
1 parent 11f0bc1 commit 04eb56e
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions src/pds/ingress/pds-nucleus-product-completion-checker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
==============================================
pds-nucleus-product-completion-checker-batch.py
==============================================
============================================================
pds-nucleus-product-completion-checker.py (batch processing)
============================================================ =
Lambda function to check if the staging S3 bucket has received a complete product
with all required files. This lambda function is triggered periodically.
Expand All @@ -18,6 +18,7 @@
import http.client
import base64
import ast
import uuid

from xml.dom import minidom

Expand All @@ -29,10 +30,10 @@
rds_data = boto3.client('rds-data')

mwaa_env_name = 'PDS-Nucleus-Airflow-Env'
dag_name = 'PDS_Registry_Use_Case_61_Messenger_Batch-logs'
mwaa_cli_command = 'dags trigger'

# Read environment variables from lambda configurations
dag_name = os.environ.get('AIRFLOW_DAG_NAME')
node_name = os.environ.get('NODE_NAME')
es_url = os.environ.get('ES_URL')
replace_prefix_with = os.environ.get('REPLACE_PREFIX_WITH')
Expand All @@ -47,7 +48,7 @@
def lambda_handler(event, context):
""" Main lambda handler """

logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

logger.info(f"Lambda Request ID: {context.aws_request_id}")
Expand All @@ -65,11 +66,13 @@ def process_completed_products():

logger.debug("Checking completed products...")

sql = """
select distinct s3_url_of_product_label from product where processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (select distinct s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (select s3_url_of_data_file from data_file));
sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping);
"""

response = rds_data.execute_statement(
Expand Down Expand Up @@ -136,15 +139,15 @@ def submit_data_to_nucleus(list_of_product_labels_to_process):


def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_process):
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """
""" Creates harvest manifest file and harvest config file and trigger Nucleus workflow """

logger.debug('List of product labels to process:' + str(list_of_product_labels_to_process))

efs_mount_path = os.environ.get('EFS_MOUNT_PATH')

harvest_config_dir = efs_mount_path + '/harvest-configs'

file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1))
file_name = os.path.basename(list_of_product_labels_to_process[0].replace("s3:/", efs_mount_path, 1) )

harvest_manifest_content = ""
list_of_product_labels_to_process_with_file_paths = []
Expand All @@ -154,10 +157,13 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
harvest_manifest_content = harvest_manifest_content + efs_product_label_file_location + '\n'
list_of_product_labels_to_process_with_file_paths.append(efs_product_label_file_location)

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
random_suffix = uuid.uuid4().hex

try:
os.makedirs(harvest_config_dir, exist_ok=True)
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '.txt'
harvest_config_file_path = harvest_config_dir + '/harvest_' + file_name + '_' + random_suffix + '.cfg'
harvest_manifest_file_path = harvest_config_dir + '/harvest_manifest_' + file_name + '_' + random_suffix + '.txt'

logger.debug(f"Manifest content: {str(harvest_manifest_content)}")

Expand Down Expand Up @@ -189,6 +195,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc
logger.info(f"Created harvest config XML file: {harvest_config_file_path}")
except Exception as e:
logger.error(f"Error creating harvest config files in : {harvest_config_dir}. Exception: {str(e)}")
return

trigger_nucleus_workflow(harvest_manifest_file_path, harvest_config_file_path,
list_of_product_labels_to_process_with_file_paths)
Expand All @@ -197,7 +204,7 @@ def create_harvest_config_xml_and_trigger_nucleus(list_of_product_labels_to_proc


def trigger_nucleus_workflow(harvest_manifest_file_path, pds_harvest_config_file, list_of_product_labels_to_process):
""" Triggers Nucleus workflow with parameters """
""" Triggers Nucleus workflow with parameters """

# Convert list to comma seperated list
delim = ","
Expand Down

0 comments on commit 04eb56e

Please sign in to comment.