Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking product label processing status in an RDS database and using SQS to control the flow of S3 events to lambda functions #117

Merged
merged 9 commits into from
Jul 24, 2024
Merged
1 change: 1 addition & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ module "product-copy-completion-checker" {

database_availability_zones = var.database_availability_zones
airflow_env_name = var.airflow_env_name
region = var.region

depends_on = [module.common]
}
Expand Down
4 changes: 2 additions & 2 deletions terraform/terraform-modules/common/common.tf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Terraform script to create the common resources for PDS Nucleus

resource "aws_security_group" "nucleus_security_group" {
name = "nucleus_security_group"
description = "nucleus_security_group"
name = var.nucleus_security_group_name
description = "PDS Nucleus security group"
vpc_id = var.vpc_id

ingress {
Expand Down
8 changes: 8 additions & 0 deletions terraform/terraform-modules/common/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ variable "mwaa_dag_s3_bucket_name" {
type = string
sensitive = true
}

variable "nucleus_security_group_name" {
description = "The name of the PDS Nucleus security group"
default = "pds_nucleus_security_group"
type = string
sensitive = true
}

Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
"Action": "iam:PassRole",
"Effect": "Allow",
"Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds_nucleus_*"
},
{
"Action": "lambda:InvokeFunction",
"Effect": "Allow",
"Resource": "arn:aws:lambda:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:function:pds_nucleus_*"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def lambda_handler(event, context):
create_product_table()
create_datafile_table()
create_product_datafile_mapping_table()
create_product_processing_status_table()
return f"Processed lambda request ID: {context.aws_request_id}"
except Exception as e:
logger.error(f"Error creating database tables. Exception: {str(e)}")
Expand All @@ -41,9 +42,9 @@ def create_product_table():
CREATE TABLE product
(
s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1,
processing_status VARCHAR(10),
completion_status VARCHAR(50),
last_updated_epoch_time BIGINT,
pds_node VARCHAR(10),
pds_node VARCHAR(50),
PRIMARY KEY (s3_url_of_product_label)
);
"""
Expand Down Expand Up @@ -92,3 +93,24 @@ def create_product_datafile_mapping_table():
database='pds_nucleus',
sql=sql)
logger.debug(f"Response for create_product_datafile_mapping_table() : {str(response)}")


def create_product_processing_status_table():
""" Created product processing status table """
sql = """
CREATE TABLE product_processing_status
(
s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1,
processing_status VARCHAR(50),
last_updated_epoch_time BIGINT,
pds_node VARCHAR(50),
batch_number VARCHAR(100),
PRIMARY KEY (s3_url_of_product_label)
);
"""
response = rds_data.execute_statement(
resourceArn=db_clust_arn,
secretArn=db_secret_arn,
database='pds_nucleus',
sql=sql)
logger.debug(f"Response for create_product_processing_status_table() : {str(response)}")
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ def process_completed_products():

sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and
WHERE completion_status = 'INCOMPLETE' and
pds_node = :pds_node_param and
s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 5;
IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 100;
"""

pds_node_param = {'name': 'pds_node_param',
Expand All @@ -104,7 +104,7 @@ def process_completed_products():

for data_dict in record:
for data_type, s3_url_of_product_label in data_dict.items():
update_product_processing_status_in_database(s3_url_of_product_label, 'COMPLETE')
update_product_completion_status_in_database(s3_url_of_product_label, 'COMPLETE')
list_of_product_labels_to_process.append(s3_url_of_product_label)

if count == n:
Expand All @@ -116,22 +116,22 @@ def process_completed_products():
count = 0
list_of_product_labels_to_process = []

def update_product_processing_status_in_database(s3_url_of_product_label, processing_status):
def update_product_completion_status_in_database(s3_url_of_product_label, completion_status):
""" Updates the product processing status of the given s3_url_of_product_label """
sql = """
UPDATE product
SET processing_status = :processing_status_param,
SET completion_status = :completion_status_param,
last_updated_epoch_time = :last_updated_epoch_time_param
WHERE s3_url_of_product_label = :s3_url_of_product_label_param
"""

processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}}
completion_status_param = {'name': 'completion_status_param', 'value': {'stringValue': completion_status}}
last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param',
'value': {'longValue': round(time.time() * 1000)}}
s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param',
'value': {'stringValue': s3_url_of_product_label}}

param_set = [processing_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param]
param_set = [completion_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param]

response = rds_data.execute_statement(
resourceArn=db_clust_arn,
Expand All @@ -140,7 +140,7 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces
sql=sql,
parameters=param_set)

logger.debug(f"Response for update_product_processing_status_in_database: {str(response)}")
logger.debug(f"Response for update_product_completion_status_in_database: {str(response)}")

def submit_data_to_nucleus(list_of_product_labels_to_process):
""" Submits data to Nucleus """
Expand Down Expand Up @@ -168,7 +168,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process
list_of_s3_urls_to_copy.extend(get_list_of_data_files(s3_url_of_product_label))

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
current_time = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
random_batch_number = current_time + uuid.uuid4().hex

try:
Expand Down Expand Up @@ -225,7 +225,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process
logger.error(f"Error creating harvest config files in s3 bucker: {pds_nucleus_config_bucket_name}. Exception: {str(e)}")
return

trigger_nucleus_workflow(list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir)
trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir)

logger.info(f"Triggered Nucleus workflow: {dag_name} for product labels: {list_of_product_labels_to_process_with_file_paths}")

Expand Down Expand Up @@ -268,7 +268,7 @@ def get_list_of_data_files(s3_url_of_product_label):
return list_of_data_files


def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, efs_config_dir):
def trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process, s3_config_dir, efs_config_dir):
""" Triggers Nucleus workflow with parameters """

# Convert list to comma seperated list
Expand All @@ -290,9 +290,17 @@ def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, e
list_of_product_labels_to_process_key = "list_of_product_labels_to_process"
list_of_product_labels_to_process_value = str(comma_seperated_list_of_product_labels_to_process)

pds_node_name_key = "pds_node_name"
pds_node_name_value = pds_node_name

batch_number_key = "batch_number"
batch_number_value = random_batch_number

conf = "{\"" + \
s3_config_dir_key + "\":\"" + s3_config_dir_value + "\",\"" + \
list_of_product_labels_to_process_key + "\":\"" + list_of_product_labels_to_process_value + "\",\"" + \
pds_node_name_key + "\":\"" + pds_node_name_value + "\",\"" + \
batch_number_key + "\":\"" + batch_number_value + "\",\"" + \
efs_config_dir_key + "\":\"" + efs_config_dir_value + "\"}"

logger.info(f"Triggering Nucleus workflow {dag_name} with parameters : {conf}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# This lambda is used to store the product processing status in database

import boto3
import logging
import json
import os
import time
from xml.dom import minidom

logger = logging.getLogger("pds-nucleus-datasync-completion")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')

s3_bucket_name = "pds-nucleus-staging"
db_clust_arn = os.environ.get('DB_CLUSTER_ARN')
db_secret_arn = os.environ.get('DB_SECRET_ARN')
efs_mount_path = os.environ.get('EFS_MOUNT_PATH')
# pds_node = os.environ.get('PDS_NODE_NAME')

rds_data = boto3.client('rds-data')

def lambda_handler(event, context):

print(event)

s3_url_of_product_label_list = event['productsList']
processing_status = event['processingStatus']
pds_node = event['pdsNode']
batch_number = event['batchNumber']

for s3_url_of_product_label in s3_url_of_product_label_list.split(','):

print(f'Saving the processing status of {s3_url_of_product_label} as {processing_status}')
save_product_processing_status_in_database(s3_url_of_product_label, processing_status, pds_node, batch_number)

def save_product_processing_status_in_database(s3_url_of_product_label, processing_status, pds_node, batch_number):
""" Save processing status for product """

logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database")

sql = """
REPLACE INTO product_processing_status
(
s3_url_of_product_label,
processing_status,
pds_node,
batch_number,
last_updated_epoch_time)
VALUES(
:s3_url_of_product_label_param,
:processing_status_param,
:pds_node_param,
:batch_number_param,
:last_updated_epoch_time_param
)
"""

s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param',
'value': {'stringValue': s3_url_of_product_label}}
processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}}
last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param',
'value': {'longValue': round(time.time() * 1000)}}
pds_node_param = {'name': 'pds_node_param', 'value': {'stringValue': pds_node}}
batch_number_param = {'name': 'batch_number_param', 'value': {'stringValue': batch_number}}

param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param, pds_node_param, batch_number_param]

try:
response = rds_data.execute_statement(
resourceArn=db_clust_arn,
secretArn=db_secret_arn,
database='pds_nucleus',
sql=sql,
parameters=param_set)
logger.debug(str(response))

except Exception as e:
logger.error(f"Error writing to product_processing_status table. Exception: {str(e)}")
raise e
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@
rds_data = boto3.client('rds-data')

def lambda_handler(event, context):

s3_bucket = event['Records'][0].get("s3").get("bucket").get("name")

s3_key = event['Records'][0].get("s3").get("object").get("key")

s3_event = json.loads(event['Records'][0].get("body"))
s3_bucket = s3_event['Records'][0].get("s3").get("bucket").get("name")
s3_key = s3_event['Records'][0].get("s3").get("object").get("key")
s3_url_of_file = "s3://" + s3_bucket + "/" + s3_key

logger.info(f"s3_url_of_file: {s3_url_of_file}")
Expand All @@ -47,7 +45,7 @@ def handle_file_types(s3_url_of_file, s3_bucket, s3_key):
# TODO: Product label received (THIS CAN BE LBLX )
if s3_url_of_file.lower().endswith(".xml") and not s3_url_of_file.lower().endswith(".aux.xml"):
logger.debug(f"Received product file: {s3_url_of_file}")
save_product_processing_status_in_database(s3_url_of_file, "INCOMPLETE")
save_product_completion_status_in_database(s3_url_of_file, "INCOMPLETE")
save_files_for_product_label(s3_url_of_file, s3_bucket, s3_key)

# Data file received
Expand Down Expand Up @@ -115,34 +113,34 @@ def save_product_data_file_mapping_in_database(s3_url_of_product_label, s3_url_o
raise e


def save_product_processing_status_in_database(s3_url_of_product_label, processing_status):
""" Creates a record for product """
def save_product_completion_status_in_database(s3_url_of_product_label, completion_status):
""" Creates a product completion status record for product """

logger.debug(f"Saving product processing status for: {s3_url_of_product_label} in database")

sql = """
REPLACE INTO product
(
s3_url_of_product_label,
processing_status,
completion_status,
pds_node,
last_updated_epoch_time)
VALUES(
:s3_url_of_product_label_param,
:processing_status_param,
:completion_status_param,
:pds_node_param,
:last_updated_epoch_time_param
)
"""

s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param',
'value': {'stringValue': s3_url_of_product_label}}
processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}}
completion_status_param = {'name': 'completion_status_param', 'value': {'stringValue': completion_status}}
last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param',
'value': {'longValue': round(time.time() * 1000)}}
pds_node_param = {'name': 'pds_node_param', 'value': {'stringValue': pds_node}}

param_set = [s3_url_of_product_label_param, processing_status_param, last_updated_epoch_time_param, pds_node_param]
param_set = [s3_url_of_product_label_param, completion_status_param, last_updated_epoch_time_param, pds_node_param]

try:
response = rds_data.execute_statement(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*:log-stream:*"
},
{
"Effect": "Allow",
"Action": "rds-data:ExecuteStatement",
"Resource": "arn:aws:rds:*:${pds_nucleus_aws_account_id}:cluster:${rds_cluster_id}"
},
{
"Effect": "Allow",
"Action": "secretsmanager:GetSecretValue",
"Resource": "arn:aws:secretsmanager:*:${pds_nucleus_aws_account_id}:secret:pds/nucleus/rds/*"
},
{
"Action": [
"s3:GetBucket*",
"s3:GetObject*",
"s3:PutObject*",
"s3:List*"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::pds-nucleus*",
"arn:aws:s3:::pds-nucleus*/*",
"arn:aws:s3:::pds-*-staging*",
"arn:aws:s3:::pds-*-staging*/*"
]
},
{
"Effect": "Allow",
"Action": "airflow:CreateCliToken",
"Resource": "arn:aws:airflow:*:${pds_nucleus_aws_account_id}:environment/pds*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:*:${pds_nucleus_aws_account_id}:pds-*"
}
]
}
Loading
Loading