diff --git a/.secrets.baseline b/.secrets.baseline index ba30dfa..9052173 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -280,26 +280,64 @@ "is_secret": false } ], - "terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json": [ + "terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json": [ { "type": "AWS Sensitive Information (Experimental Plugin)", - "filename": "terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json", - "hashed_secret": "55357933a7310d2db90c3fa1ed0970a7bb34ed39", + "filename": "terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json", + "hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a", "is_verified": false, "line_number": 8, "is_secret": false } ], - "terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json": [ + "terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json": [ { "type": "AWS Sensitive Information (Experimental Plugin)", - "filename": "terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json", + "filename": "terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json", "hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a", "is_verified": false, "line_number": 8, "is_secret": false } + ], + "terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json": [ + { + "type": "AWS Sensitive Information (Experimental Plugin)", + "filename": "terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json", + "hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a", + "is_verified": false, + "line_number": 11, + "is_secret": false + } + ], + "terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf": [ + { + "type": "AWS Sensitive Information (Experimental Plugin)", + "filename": "terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf", + "hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a", + "is_verified": false, + "line_number": 108, + "is_secret": false + } + ], + "terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json": [ + { + "type": "AWS Sensitive Information (Experimental Plugin)", + "filename": "terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json", + "hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a", + "is_verified": false, + "line_number": 11, + "is_secret": false + }, + { + "type": "AWS Sensitive Information (Experimental Plugin)", + "filename": "terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json", + "hashed_secret": "55357933a7310d2db90c3fa1ed0970a7bb34ed39", + "is_verified": false, + "line_number": 41, + "is_secret": false + } ] }, - "generated_at": "2024-06-20T18:08:52Z" + "generated_at": "2024-07-24T00:38:52Z" } diff --git a/terraform/main.tf b/terraform/main.tf index 0f5c38f..6396614 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -80,6 +80,7 @@ module "product-copy-completion-checker" { database_availability_zones = var.database_availability_zones airflow_env_name = var.airflow_env_name + region = var.region depends_on = [module.common] } diff --git a/terraform/terraform-modules/common/common.tf b/terraform/terraform-modules/common/common.tf index 0c4d09d..24b5f46 100644 --- a/terraform/terraform-modules/common/common.tf +++ b/terraform/terraform-modules/common/common.tf @@ -1,8 +1,8 @@ # Terraform script to create the common resources for PDS Nucleus resource "aws_security_group" "nucleus_security_group" { - name = "nucleus_security_group" - description = "nucleus_security_group" + name = var.nucleus_security_group_name + description = "PDS Nucleus security group" vpc_id = var.vpc_id ingress { diff --git a/terraform/terraform-modules/common/variables.tf b/terraform/terraform-modules/common/variables.tf index 2443f61..ad1ff10 100644 --- a/terraform/terraform-modules/common/variables.tf +++ b/terraform/terraform-modules/common/variables.tf @@ -57,3 +57,11 @@ variable "mwaa_dag_s3_bucket_name" { type = string sensitive = true } + +variable "nucleus_security_group_name" { + description = "The name of the PDS Nucleus security group" + default = "pds_nucleus_security_group" + type = string + sensitive = true +} + diff --git a/terraform/terraform-modules/mwaa-env/mwaa_env.tf b/terraform/terraform-modules/mwaa-env/mwaa_env.tf index 0ab7d5e..454ac3d 100644 --- a/terraform/terraform-modules/mwaa-env/mwaa_env.tf +++ b/terraform/terraform-modules/mwaa-env/mwaa_env.tf @@ -15,7 +15,7 @@ data "aws_iam_policy_document" "assume_role" { data "aws_caller_identity" "current" {} data "template_file" "mwaa_inline_policy_template" { - template = file("terraform-modules/mwaa-env/template_mwaa_iam_policy.json") + template = file("terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json") vars = { pds_nucleus_aws_account_id = data.aws_caller_identity.current.account_id pds_nucleus_region = var.region @@ -27,14 +27,14 @@ data "template_file" "mwaa_inline_policy_template" { resource "local_file" "mwaa_inline_policy_file" { content = data.template_file.mwaa_inline_policy_template.rendered - filename = "terraform-modules/mwaa-env/mwaa_iam_policy.json" + filename = "terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json" depends_on = [data.template_file.mwaa_inline_policy_template] } # IAM Policy Document for Inline Policy data "aws_iam_policy_document" "mwaa_inline_policy" { - source_policy_documents = [file("${path.module}/mwaa_iam_policy.json")] + source_policy_documents = [file("${path.module}/mwaa_execution_role_iam_policy.json")] depends_on = [local_file.mwaa_inline_policy_file] } diff --git a/terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json b/terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json similarity index 94% rename from terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json rename to terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json index e5f8faa..e96fa4e 100644 --- a/terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json +++ b/terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json @@ -134,6 +134,11 @@ "Action": "iam:PassRole", "Effect": "Allow", "Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds_nucleus_*" + }, + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": "arn:aws:lambda:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:function:pds_nucleus_*" } ] } diff --git a/terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json b/terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json similarity index 68% rename from terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json rename to terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json index cb3f4b9..e96fa4e 100644 --- a/terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json +++ b/terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json @@ -5,8 +5,8 @@ "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": [ - "arn:aws:airflow:*:441083951559:role/*/*", - "arn:aws:airflow:*:441083951559:environment/*" + "arn:aws:airflow:*:${pds_nucleus_aws_account_id}:role/*/*", + "arn:aws:airflow:*:${pds_nucleus_aws_account_id}:environment/*" ] }, { @@ -21,14 +21,14 @@ "ecs:DescribeTasks" ], "Resource": [ - "arn:aws:ecs:*:441083951559:task-definition/pds*:*", - "arn:aws:ecs:*:441083951559:task/pds*/*" + "arn:aws:ecs:*:${pds_nucleus_aws_account_id}:task-definition/pds*:*", + "arn:aws:ecs:*:${pds_nucleus_aws_account_id}:task/pds*/*" ] }, { "Effect": "Allow", "Action": "iam:PassRole", - "Resource": "arn:aws:iam::441083951559:role/pds-nucleus*" + "Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds-nucleus*" }, { "Effect": "Allow", @@ -38,11 +38,11 @@ "kms:GenerateDataKey*", "kms:Encrypt" ], - "NotResource": "arn:aws:kms:*:441083951559:key/*", + "NotResource": "arn:aws:kms:*:${pds_nucleus_aws_account_id}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ - "sqs.us-west-2.amazonaws.com" + "sqs.${pds_nucleus_region}.amazonaws.com" ] } } @@ -54,7 +54,7 @@ "logs:GetLogEvents", "logs:PutLogEvents" ], - "Resource": "arn:aws:logs:*:441083951559:log-group:*:log-stream:*" + "Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*:log-stream:*" }, { "Effect": "Allow", @@ -69,7 +69,7 @@ "logs:GetLogGroupFields", "logs:CreateLogGroup" ], - "Resource": "arn:aws:logs:*:441083951559:log-group:*" + "Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*" }, { "Effect": "Allow", @@ -81,7 +81,7 @@ "sqs:ReceiveMessage", "sqs:SendMessage" ], - "Resource": "arn:aws:sqs:us-west-2:*:airflow-celery-*" + "Resource": "arn:aws:sqs:${pds_nucleus_region}:*:airflow-celery-*" }, { "Effect": "Deny", @@ -113,7 +113,7 @@ "logs:GetQueryResults" ], "Resource": [ - "arn:aws:logs:us-west-2:441083951559:log-group:airflow-pds-nucleus-airflow-env-*" + "arn:aws:logs:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:log-group:airflow-${airflow_env_name}-*" ] }, { @@ -133,7 +133,12 @@ { "Action": "iam:PassRole", "Effect": "Allow", - "Resource": "arn:aws:iam::441083951559:role/pds_nucleus_*" + "Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds_nucleus_*" + }, + { + "Action": "lambda:InvokeFunction", + "Effect": "Allow", + "Resource": "arn:aws:lambda:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:function:pds_nucleus_*" } ] } diff --git a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-init.py b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-init.py index bdabb8c..e184e9c 100644 --- a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-init.py +++ b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-init.py @@ -30,6 +30,7 @@ def lambda_handler(event, context): create_product_table() create_datafile_table() create_product_datafile_mapping_table() + create_product_processing_status_table() return f"Processed lambda request ID: {context.aws_request_id}" except Exception as e: logger.error(f"Error creating database tables. Exception: {str(e)}") @@ -41,9 +42,9 @@ def create_product_table(): CREATE TABLE product ( s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1, - processing_status VARCHAR(10), + completion_status VARCHAR(50), last_updated_epoch_time BIGINT, - pds_node VARCHAR(10), + pds_node VARCHAR(50), PRIMARY KEY (s3_url_of_product_label) ); """ @@ -92,3 +93,24 @@ def create_product_datafile_mapping_table(): database='pds_nucleus', sql=sql) logger.debug(f"Response for create_product_datafile_mapping_table() : {str(response)}") + + +def create_product_processing_status_table(): + """ Created product processing status table """ + sql = """ + CREATE TABLE product_processing_status + ( + s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1, + processing_status VARCHAR(50), + last_updated_epoch_time BIGINT, + pds_node VARCHAR(50), + batch_number VARCHAR(100), + PRIMARY KEY (s3_url_of_product_label) + ); + """ + response = rds_data.execute_statement( + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql) + logger.debug(f"Response for create_product_processing_status_table() : {str(response)}") diff --git a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-completion-checker.py b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-completion-checker.py index 6c5b50b..a485448 100644 --- a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-completion-checker.py +++ b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-completion-checker.py @@ -71,13 +71,13 @@ def process_completed_products(): sql = """ SELECT DISTINCT s3_url_of_product_label from product - WHERE processing_status = 'INCOMPLETE' and + WHERE completion_status = 'INCOMPLETE' and pds_node = :pds_node_param and s3_url_of_product_label NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping where s3_url_of_data_file NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label - IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 5; + IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 100; """ pds_node_param = {'name': 'pds_node_param', @@ -104,7 +104,7 @@ def process_completed_products(): for data_dict in record: for data_type, s3_url_of_product_label in data_dict.items(): - update_product_processing_status_in_database(s3_url_of_product_label, 'COMPLETE') + update_product_completion_status_in_database(s3_url_of_product_label, 'COMPLETE') list_of_product_labels_to_process.append(s3_url_of_product_label) if count == n: @@ -116,22 +116,22 @@ def process_completed_products(): count = 0 list_of_product_labels_to_process = [] -def update_product_processing_status_in_database(s3_url_of_product_label, processing_status): +def update_product_completion_status_in_database(s3_url_of_product_label, completion_status): """ Updates the product processing status of the given s3_url_of_product_label """ sql = """ UPDATE product - SET processing_status = :processing_status_param, + SET completion_status = :completion_status_param, last_updated_epoch_time = :last_updated_epoch_time_param WHERE s3_url_of_product_label = :s3_url_of_product_label_param """ - processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} + completion_status_param = {'name': 'completion_status_param', 'value': {'stringValue': completion_status}} last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', 'value': {'longValue': round(time.time() * 1000)}} s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', 'value': {'stringValue': s3_url_of_product_label}} - param_set = [processing_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param] + param_set = [completion_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param] response = rds_data.execute_statement( resourceArn=db_clust_arn, @@ -140,7 +140,7 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces sql=sql, parameters=param_set) - logger.debug(f"Response for update_product_processing_status_in_database: {str(response)}") + logger.debug(f"Response for update_product_completion_status_in_database: {str(response)}") def submit_data_to_nucleus(list_of_product_labels_to_process): """ Submits data to Nucleus """ @@ -168,7 +168,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process list_of_s3_urls_to_copy.extend(get_list_of_data_files(s3_url_of_product_label)) # Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names - current_time = datetime.now().strftime("%m-%d-%Y-%H-%M-%S") + current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") random_batch_number = current_time + uuid.uuid4().hex try: @@ -225,7 +225,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process logger.error(f"Error creating harvest config files in s3 bucker: {pds_nucleus_config_bucket_name}. Exception: {str(e)}") return - trigger_nucleus_workflow(list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir) + trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir) logger.info(f"Triggered Nucleus workflow: {dag_name} for product labels: {list_of_product_labels_to_process_with_file_paths}") @@ -268,7 +268,7 @@ def get_list_of_data_files(s3_url_of_product_label): return list_of_data_files -def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, efs_config_dir): +def trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process, s3_config_dir, efs_config_dir): """ Triggers Nucleus workflow with parameters """ # Convert list to comma seperated list @@ -290,9 +290,17 @@ def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, e list_of_product_labels_to_process_key = "list_of_product_labels_to_process" list_of_product_labels_to_process_value = str(comma_seperated_list_of_product_labels_to_process) + pds_node_name_key = "pds_node_name" + pds_node_name_value = pds_node_name + + batch_number_key = "batch_number" + batch_number_value = random_batch_number + conf = "{\"" + \ s3_config_dir_key + "\":\"" + s3_config_dir_value + "\",\"" + \ list_of_product_labels_to_process_key + "\":\"" + list_of_product_labels_to_process_value + "\",\"" + \ + pds_node_name_key + "\":\"" + pds_node_name_value + "\",\"" + \ + batch_number_key + "\":\"" + batch_number_value + "\",\"" + \ efs_config_dir_key + "\":\"" + efs_config_dir_value + "\"}" logger.info(f"Triggering Nucleus workflow {dag_name} with parameters : {conf}") diff --git a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-processing-status-tracker.py b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-processing-status-tracker.py new file mode 100644 index 0000000..dd07736 --- /dev/null +++ b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-product-processing-status-tracker.py @@ -0,0 +1,82 @@ +# This lambda is used to store the product processing status in database + +import boto3 +import logging +import json +import os +import time +from xml.dom import minidom + +logger = logging.getLogger("pds-nucleus-datasync-completion") +logger.setLevel(logging.DEBUG) +logger.addHandler(logging.StreamHandler()) + +s3 = boto3.resource('s3') +s3_client = boto3.client('s3') + +s3_bucket_name = "pds-nucleus-staging" +db_clust_arn = os.environ.get('DB_CLUSTER_ARN') +db_secret_arn = os.environ.get('DB_SECRET_ARN') +efs_mount_path = os.environ.get('EFS_MOUNT_PATH') +# pds_node = os.environ.get('PDS_NODE_NAME') + +rds_data = boto3.client('rds-data') + +def lambda_handler(event, context): + + print(event) + + s3_url_of_product_label_list = event['productsList'] + processing_status = event['processingStatus'] + pds_node = event['pdsNode'] + batch_number = event['batchNumber'] + + for s3_url_of_product_label in s3_url_of_product_label_list.split(','): + + print(f'Saving the processing status of {s3_url_of_product_label} as {processing_status}') + save_product_processing_status_in_database(s3_url_of_product_label, processing_status, pds_node, batch_number) + +def save_product_processing_status_in_database(s3_url_of_product_label, processing_status, pds_node, batch_number): + """ Save processing status for product """ + + logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database") + + sql = """ + REPLACE INTO product_processing_status + ( + s3_url_of_product_label, + processing_status, + pds_node, + batch_number, + last_updated_epoch_time) + VALUES( + :s3_url_of_product_label_param, + :processing_status_param, + :pds_node_param, + :batch_number_param, + :last_updated_epoch_time_param + ) + """ + + s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', + 'value': {'stringValue': s3_url_of_product_label}} + processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} + last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', + 'value': {'longValue': round(time.time() * 1000)}} + pds_node_param = {'name': 'pds_node_param', 'value': {'stringValue': pds_node}} + batch_number_param = {'name': 'batch_number_param', 'value': {'stringValue': batch_number}} + + param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param, pds_node_param, batch_number_param] + + try: + response = rds_data.execute_statement( + resourceArn=db_clust_arn, + secretArn=db_secret_arn, + database='pds_nucleus', + sql=sql, + parameters=param_set) + logger.debug(str(response)) + + except Exception as e: + logger.error(f"Error writing to product_processing_status table. Exception: {str(e)}") + raise e diff --git a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-s3-file-event-processor.py b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-s3-file-event-processor.py index 8ec1fd8..15d21ef 100644 --- a/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-s3-file-event-processor.py +++ b/terraform/terraform-modules/product-copy-completion-checker/lambda/pds-nucleus-s3-file-event-processor.py @@ -25,11 +25,9 @@ rds_data = boto3.client('rds-data') def lambda_handler(event, context): - - s3_bucket = event['Records'][0].get("s3").get("bucket").get("name") - - s3_key = event['Records'][0].get("s3").get("object").get("key") - + s3_event = json.loads(event['Records'][0].get("body")) + s3_bucket = s3_event['Records'][0].get("s3").get("bucket").get("name") + s3_key = s3_event['Records'][0].get("s3").get("object").get("key") s3_url_of_file = "s3://" + s3_bucket + "/" + s3_key logger.info(f"s3_url_of_file: {s3_url_of_file}") @@ -47,7 +45,7 @@ def handle_file_types(s3_url_of_file, s3_bucket, s3_key): # TODO: Product label received (THIS CAN BE LBLX ) if s3_url_of_file.lower().endswith(".xml") and not s3_url_of_file.lower().endswith(".aux.xml"): logger.debug(f"Received product file: {s3_url_of_file}") - save_product_processing_status_in_database(s3_url_of_file, "INCOMPLETE") + save_product_completion_status_in_database(s3_url_of_file, "INCOMPLETE") save_files_for_product_label(s3_url_of_file, s3_bucket, s3_key) # Data file received @@ -115,8 +113,8 @@ def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_o raise e -def save_product_processing_status_in_database(s3_url_of_product_label, processing_status): - """ Creates a record for product """ +def save_product_completion_status_in_database(s3_url_of_product_label, completion_status): + """ Creates a product completion status record for product """ logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database") @@ -124,12 +122,12 @@ def save_product_processing_status_in_database(s3_url_of_product_label, processi REPLACE INTO product ( s3_url_of_product_label, - processing_status, + completion_status, pds_node, last_updated_epoch_time) VALUES( :s3_url_of_product_label_param, - :processing_status_param, + :completion_status_param, :pds_node_param, :last_updated_epoch_time_param ) @@ -137,12 +135,12 @@ def save_product_processing_status_in_database(s3_url_of_product_label, processi s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param', 'value': {'stringValue': s3_url_of_product_label}} - processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}} + completion_status_param = {'name': 'completion_status_param', 'value': {'stringValue': completion_status}} last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param', 'value': {'longValue': round(time.time() * 1000)}} pds_node_param = {'name': 'pds_node_param', 'value': {'stringValue': pds_node}} - param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param, pds_node_param] + param_set = [s3_url_of_product_label_param, completion_status_param, last_updated_epoch_time_param, pds_node_param] try: response = rds_data.execute_statement( diff --git a/terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json b/terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json new file mode 100755 index 0000000..169f2ab --- /dev/null +++ b/terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json @@ -0,0 +1,53 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogStream", + "logs:CreateLogGroup", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*:log-stream:*" + }, + { + "Effect": "Allow", + "Action": "rds-data:ExecuteStatement", + "Resource": "arn:aws:rds:*:${pds_nucleus_aws_account_id}:cluster:${rds_cluster_id}" + }, + { + "Effect": "Allow", + "Action": "secretsmanager:GetSecretValue", + "Resource": "arn:aws:secretsmanager:*:${pds_nucleus_aws_account_id}:secret:pds/nucleus/rds/*" + }, + { + "Action": [ + "s3:GetBucket*", + "s3:GetObject*", + "s3:PutObject*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + "arn:aws:s3:::pds-nucleus*", + "arn:aws:s3:::pds-nucleus*/*", + "arn:aws:s3:::pds-*-staging*", + "arn:aws:s3:::pds-*-staging*/*" + ] + }, + { + "Effect": "Allow", + "Action": "airflow:CreateCliToken", + "Resource": "arn:aws:airflow:*:${pds_nucleus_aws_account_id}:environment/pds*" + }, + { + "Effect": "Allow", + "Action": [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Resource": "arn:aws:sqs:*:${pds_nucleus_aws_account_id}:pds-*" + } + ] +} diff --git a/terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf b/terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf index 6161141..28caee3 100644 --- a/terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf +++ b/terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf @@ -27,7 +27,7 @@ resource "aws_secretsmanager_secret_version" "pds_nucleus_rds_password" { } resource "aws_rds_cluster" "default" { - cluster_identifier = "pdsnucleus" + cluster_identifier = var.rds_cluster_id engine = "aurora-mysql" engine_version = "5.7.mysql_aurora.2.03.2" availability_zones = var.database_availability_zones @@ -89,41 +89,50 @@ data "aws_iam_policy" "mcp_operator_policy" { name = var.permission_boundary_for_iam_roles } -data "aws_iam_policy_document" "assume_role_lambda_apigw" { +data "aws_iam_policy_document" "assume_role_lambda" { statement { effect = "Allow" principals { type = "Service" - identifiers = ["lambda.amazonaws.com", "apigateway.amazonaws.com"] + identifiers = ["lambda.amazonaws.com", "scheduler.amazonaws.com"] } actions = ["sts:AssumeRole"] } } -data "aws_iam_policy_document" "inline_policy_lambda" { - statement { - actions = [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:PutLogEvents", - "lambda:InvokeFunction", - "rds-data:ExecuteStatement", - "secretsmanager:GetSecretValue", - "s3:GetObject", - "s3:PutObject", - "airflow:CreateCliToken" - ] - resources = ["*"] +data "aws_caller_identity" "current" {} + +data "template_file" "lambda_inline_policy_template" { + template = file("terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json") + vars = { + pds_nucleus_aws_account_id = data.aws_caller_identity.current.account_id + rds_cluster_id = var.rds_cluster_id + region = var.region } + + depends_on = [data.aws_caller_identity.current] +} + +resource "local_file" "lambda_inline_policy_file" { + content = data.template_file.lambda_inline_policy_template.rendered + filename = "terraform-modules/product-copy-completion-checker/lambda_inline_policy.json" + + depends_on = [data.template_file.lambda_inline_policy_template] +} + +data "aws_iam_policy_document" "lambda_inline_policy" { + source_policy_documents = [file("${path.module}/lambda_inline_policy.json")] + + depends_on = [local_file.lambda_inline_policy_file] } resource "aws_iam_role" "pds_nucleus_lambda_execution_role" { name = "pds_nucleus_lambda_execution_role" inline_policy { - name = "unity-cs-lambda-auth-inline-policy" - policy = data.aws_iam_policy_document.inline_policy_lambda.json + name = "pds-nucleus-lambda-execution-inline-policy" + policy = data.aws_iam_policy_document.lambda_inline_policy.json } - assume_role_policy = data.aws_iam_policy_document.assume_role_lambda_apigw.json + assume_role_policy = data.aws_iam_policy_document.assume_role_lambda.json permissions_boundary = data.aws_iam_policy.mcp_operator_policy.arn } @@ -134,7 +143,6 @@ data "archive_file" "pds_nucleus_s3_file_file_event_processor_function_zip" { } - data "archive_file" "pds_nucleus_product_completion_checker_zip" { type = "zip" source_file = "${path.module}/lambda/pds-nucleus-product-completion-checker.py" @@ -163,8 +171,8 @@ resource "aws_lambda_function" "pds_nucleus_init_function" { DB_SECRET_ARN = aws_secretsmanager_secret.pds_nucleus_rds_credentials.arn } } - } + resource "aws_s3_bucket" "pds_nucleus_s3_config_bucket" { bucket = var.pds_nucleus_config_bucket_name } @@ -204,6 +212,15 @@ resource "aws_cloudwatch_log_group" "pds_nucleus_s3_file_file_event_processor_fu name = "/aws/lambda/pds_nucleus_s3_file_event_processor-${var.pds_node_names[count.index]}" } +# Create SQS queue event source for pds_nucleus_s3_file_file_event_processor_function for each PDS Node +resource "aws_lambda_event_source_mapping" "event_source_mapping" { + count = length(var.pds_node_names) + event_source_arn = aws_sqs_queue.pds_nucleus_files_to_save_in_database_sqs_queue[count.index].arn + enabled = true + function_name = aws_lambda_function.pds_nucleus_s3_file_file_event_processor_function[count.index].function_name + batch_size = 1 +} + # Create pds_nucleus_product_completion_checker_function for each PDS Node resource "aws_lambda_function" "pds_nucleus_product_completion_checker_function" { count = length(var.pds_node_names) @@ -238,6 +255,24 @@ resource "aws_cloudwatch_log_group" "pds_nucleus_product_completion_checker_func name = "/aws/lambda/pds-nucleus-product-completion-checker-${var.pds_node_names[count.index]}" } +# Create aws_scheduler_schedule for pds_nucleus_product_completion_checker_function for each PDS Node +resource "aws_scheduler_schedule" "schedule_for_pds_nucleus_product_completion_checker" { + count = length(var.pds_node_names) + name = "schedule_for_pds_nucleus_product_completion_checker_${var.pds_node_names[count.index]}" + group_name = "default" + + flexible_time_window { + mode = "OFF" + } + + schedule_expression = "rate(1 minute)" + + target { + arn = aws_lambda_function.pds_nucleus_product_completion_checker_function[count.index].arn + role_arn = aws_iam_role.pds_nucleus_lambda_execution_role.arn + } +} + # Apply lambda permissions for each pds_nucleus_s3_file_file_event_processor_function of each Node resource "aws_lambda_permission" "s3-lambda-permission" { count = length(var.pds_node_names) @@ -248,16 +283,57 @@ resource "aws_lambda_permission" "s3-lambda-permission" { source_arn = aws_s3_bucket.pds_nucleus_s3_staging_bucket[count.index].arn } -# Creat aws_s3_bucket_notification for each s3 bucket nof each Node +# Create an SQS queue to receive S3 bucket notifications for each s3 bucket of each Node +resource "aws_sqs_queue" "pds_nucleus_files_to_save_in_database_sqs_queue" { + count = length(var.pds_node_names) + name = "pds-nucleus-files-to-save-in-database-${var.pds_node_names[count.index]}" + delay_seconds = 0 + visibility_timeout_seconds = 30 + message_retention_seconds = 345600 + receive_wait_time_seconds = 0 + sqs_managed_sse_enabled = true +} + +# Create an SQS policy document for SQS queue of each Node +data "aws_iam_policy_document" "pds_nucleus_files_to_save_in_database_sqs_queue_policy_document" { + count = length(var.pds_node_names) + + statement { + effect = "Allow" + + principals { + type = "Service" + identifiers = ["s3.amazonaws.com"] + } + + actions = ["sqs:SendMessage"] + resources = [aws_sqs_queue.pds_nucleus_files_to_save_in_database_sqs_queue[count.index].arn] + + condition { + test = "StringEquals" + variable = "aws:SourceArn" + values = [aws_s3_bucket.pds_nucleus_s3_staging_bucket[count.index].arn] + } + } +} + +# Create an SQS policy for SQS queue of each Node +resource "aws_sqs_queue_policy" "pds_nucleus_files_to_save_in_database_sqs_queue_policy" { + count = length(var.pds_node_names) + queue_url = aws_sqs_queue.pds_nucleus_files_to_save_in_database_sqs_queue[count.index].url + policy = data.aws_iam_policy_document.pds_nucleus_files_to_save_in_database_sqs_queue_policy_document[count.index].json +} + +# Create an aws_s3_bucket_notification for each s3 bucket of each Node resource "aws_s3_bucket_notification" "pds_nucleus_s3_staging_bucket_notification" { count = length(var.pds_node_names) # convert PDS node name to S3 bucket name compatible format bucket = "${lower(replace(var.pds_node_names[count.index], "_", "-"))}-${var.pds_nucleus_staging_bucket_name_postfix}" - lambda_function { - lambda_function_arn = aws_lambda_function.pds_nucleus_s3_file_file_event_processor_function[count.index].arn - events = ["s3:ObjectCreated:*"] + queue { + events = ["s3:ObjectCreated:*"] + queue_arn = aws_sqs_queue.pds_nucleus_files_to_save_in_database_sqs_queue[count.index].arn } } @@ -274,3 +350,34 @@ resource "aws_lambda_invocation" "invoke_pds_nucleus_init_function" { depends_on = [aws_lambda_function.pds_nucleus_init_function, aws_rds_cluster.default] } + + +data "archive_file" "pds_nucleus_product_processing_status_tracker_function_zip" { + type = "zip" + source_file = "${path.module}/lambda/pds-nucleus-product-processing-status-tracker.py" + output_path = "${path.module}/lambda/pds-nucleus-product-processing-status-tracker.zip" +} + +# Create pds_nucleus_product_processing_status_tracker_function for each PDS Node +resource "aws_lambda_function" "pds_nucleus_product_processing_status_tracker_function" { + function_name = "pds_nucleus_product_processing_status_tracker" + filename = "${path.module}/lambda/pds-nucleus-product-processing-status-tracker.zip" + source_code_hash = data.archive_file.pds_nucleus_product_processing_status_tracker_function_zip.output_base64sha256 + role = aws_iam_role.pds_nucleus_lambda_execution_role.arn + runtime = "python3.9" + handler = "pds-nucleus-product-processing-status-tracker.lambda_handler" + timeout = 10 + depends_on = [data.archive_file.pds_nucleus_product_processing_status_tracker_function_zip] + + environment { + variables = { + DB_CLUSTER_ARN = aws_rds_cluster.default.arn + DB_SECRET_ARN = aws_secretsmanager_secret.pds_nucleus_rds_credentials.arn + } + } +} + +# Create CloudWatch Log Group for pds_nucleus_s3_file_file_event_processor_function for each PDS Node +resource "aws_cloudwatch_log_group" "pds_nucleus_product_processing_status_tracker_function_log_group" { + name = "/aws/lambda/pds_nucleus_product_processing_status_tracker" +} diff --git a/terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json b/terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json new file mode 100644 index 0000000..74ca197 --- /dev/null +++ b/terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json @@ -0,0 +1,53 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogStream", + "logs:CreateLogGroup", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*:log-stream:*" + }, + { + "Effect": "Allow", + "Action": "rds-data:ExecuteStatement", + "Resource": "arn:aws:rds:*:${pds_nucleus_aws_account_id}:cluster:${rds_cluster_id}" + }, + { + "Effect": "Allow", + "Action": "secretsmanager:GetSecretValue", + "Resource": "arn:aws:secretsmanager:*:${pds_nucleus_aws_account_id}:secret:pds/nucleus/rds/*" + }, + { + "Action": [ + "s3:GetBucket*", + "s3:GetObject*", + "s3:PutObject*", + "s3:List*" + ], + "Effect": "Allow", + "Resource": [ + "arn:aws:s3:::pds-nucleus*", + "arn:aws:s3:::pds-nucleus*/*", + "arn:aws:s3:::pds-*-staging*", + "arn:aws:s3:::pds-*-staging*/*" + ] + }, + { + "Effect": "Allow", + "Action": "airflow:CreateCliToken", + "Resource": "arn:aws:airflow:us-west-2:441083951559:environment/pds*" + }, + { + "Effect": "Allow", + "Action": [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ], + "Resource": "arn:aws:sqs:*:${pds_nucleus_aws_account_id}:pds-*" + } + ] +} diff --git a/terraform/terraform-modules/product-copy-completion-checker/variables.tf b/terraform/terraform-modules/product-copy-completion-checker/variables.tf index 47189e9..bc98faf 100644 --- a/terraform/terraform-modules/product-copy-completion-checker/variables.tf +++ b/terraform/terraform-modules/product-copy-completion-checker/variables.tf @@ -3,6 +3,12 @@ variable "permission_boundary_for_iam_roles" { sensitive = true } +variable "rds_cluster_id" { + default = "pdsnucleus" + type = string + sensitive = true +} + variable "database_name" { default = "pds_nucleus" type = string @@ -92,3 +98,8 @@ variable "airflow_env_name" { default = "pds-nucleus-airflow-env" type = string } + +variable "region" { + description = "AWS Region" + type = string +} diff --git a/terraform/terraform-modules/test-data/dags/template-pds-basic-registry-load-use-case.py b/terraform/terraform-modules/test-data/dags/template-pds-basic-registry-load-use-case.py index 252cda0..2b2e339 100644 --- a/terraform/terraform-modules/test-data/dags/template-pds-basic-registry-load-use-case.py +++ b/terraform/terraform-modules/test-data/dags/template-pds-basic-registry-load-use-case.py @@ -16,6 +16,86 @@ ECS_LAUNCH_TYPE = "FARGATE" ECS_SUBNETS = ${pds_nucleus_ecs_subnets} ECS_SECURITY_GROUPS = ["${pds_nucleus_ecs_security_groups}"] +LAMBDA_FUNCTION_NAME = "pds_nucleus_product_processing_status_tracker" + + +################################################################################## +# Success/Failure monitoring +################################################################################## + +# Save product processing status validate_successful +def save_product_processing_status_validate_successful(context): + client = boto3.client('lambda') + + response = client.invoke( + FunctionName=LAMBDA_FUNCTION_NAME, + InvocationType='Event', + Payload=json.dumps({ + "productsList": context["dag_run"].conf["list_of_product_labels_to_process"], + "pdsNode": context["dag_run"].conf["pds_node_name"], + "processingStatus": "validate_successful", + "batchNumber": context["dag_run"].conf["batch_number"], + }), + ) + + print(response) + +# Save product processing status validate_failed +def save_product_processing_status_validate_failed(context): + client = boto3.client('lambda') + + response = client.invoke( + FunctionName=LAMBDA_FUNCTION_NAME, + InvocationType='Event', + Payload=json.dumps({ + "productsList": context["dag_run"].conf["list_of_product_labels_to_process"], + "pdsNode": context["dag_run"].conf["pds_node_name"], + "processingStatus": "validate_failed", + "batchNumber": context["dag_run"].conf["batch_number"], + }), + ) + + print(response) + +# Save product processing status harvest_successful +def save_product_processing_status_harvest_successful(context): + client = boto3.client('lambda') + + response = client.invoke( + FunctionName=LAMBDA_FUNCTION_NAME, + InvocationType='Event', + Payload=json.dumps({ + "productsList": context["dag_run"].conf["list_of_product_labels_to_process"], + "pdsNode": context["dag_run"].conf["pds_node_name"], + "processingStatus": "harvest_successful", + "batchNumber": context["dag_run"].conf["batch_number"], + }), + ) + + print(response) + + +# Save product processing status harvest_failed +def save_product_processing_status_harvest_failed(context): + client = boto3.client('lambda') + + response = client.invoke( + FunctionName=LAMBDA_FUNCTION_NAME, + InvocationType='Event', + Payload=json.dumps({ + "productsList": context["dag_run"].conf["list_of_product_labels_to_process"], + "pdsNode": context["dag_run"].conf["pds_node_name"], + "processingStatus": "harvest_failed", + "batchNumber": context["dag_run"].conf["batch_number"], + }), + ) + + print(response) + + +################################################################################## +# DAG and Tasks Definitions +################################################################################## dag = DAG( dag_id="${pds_nucleus_basic_registry_dag_id}", @@ -59,7 +139,9 @@ awslogs_group="/pds/ecs/validate", awslogs_stream_prefix="ecs/pds-validate-task", awslogs_fetch_interval=timedelta(seconds=1), - number_logs_exception=500 + number_logs_exception=500, + on_success_callback=save_product_processing_status_validate_successful, + on_failure_callback=save_product_processing_status_validate_failed, ) # PDS Harvest Task @@ -87,7 +169,9 @@ awslogs_stream_prefix="ecs/pds-registry-loader-harvest", awslogs_fetch_interval=timedelta(seconds=1), number_logs_exception=500, - trigger_rule=TriggerRule.ALL_DONE + trigger_rule=TriggerRule.ALL_DONE, + on_success_callback=save_product_processing_status_harvest_successful, + on_failure_callback=save_product_processing_status_harvest_failed, ) # PDS Nucleus Config Init Task @@ -169,7 +253,8 @@ awslogs_group="/pds/ecs/pds-nucleus-config-init", awslogs_stream_prefix="ecs/pds-nucleus-config-init", awslogs_fetch_interval=timedelta(seconds=1), - number_logs_exception=500 + number_logs_exception=500, + trigger_rule=TriggerRule.ALL_DONE ) @@ -198,7 +283,8 @@ awslogs_group="/pds/ecs/pds-nucleus-s3-to-efs-copy", awslogs_stream_prefix="ecs/pds-nucleus-s3-to-efs-copy", awslogs_fetch_interval=timedelta(seconds=1), - number_logs_exception=500 + number_logs_exception=500, + trigger_rule=TriggerRule.ALL_DONE ) @@ -207,8 +293,8 @@ task_id='Print_End_Time', dag=dag, bash_command='date', - trigger_rule=TriggerRule.ALL_SUCCESS + trigger_rule=TriggerRule.ALL_DONE ) # Workflow -print_start_time >> config_init >> config_s3_to_efs_copy >> validate >> harvest >> config_s3_to_efs_copy_cleanup >> config_init_cleanup >> print_end_time +print_start_time >> config_init >> config_s3_to_efs_copy >> validate >> harvest >> config_s3_to_efs_copy_cleanup >> config_init_cleanup >> print_end_time