Skip to content

Commit

Permalink
Few changes and formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Valswyn-NHS committed Jan 27, 2025
1 parent 742d27f commit 1c59a10
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 72 deletions.
41 changes: 32 additions & 9 deletions ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import json
from typing import Union
from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator
from logging_decorators import (
ack_lambda_handler_logging_decorator,
convert_messsage_to_ack_row_logging_decorator,
)
from update_ack_file import update_ack_file, create_ack_data


Expand All @@ -17,7 +20,9 @@ def get_error_message_for_ack_file(message_diagnostics) -> Union[None, str]:
if message_diagnostics.get("statusCode") in (None, 500):
return "An unhandled error occurred during batch processing"

return message_diagnostics.get("error_message", "Unable to determine diagnostics issue")
return message_diagnostics.get(
"error_message", "Unable to determine diagnostics issue"
)


@convert_messsage_to_ack_row_logging_decorator
Expand All @@ -32,7 +37,8 @@ def convert_message_to_ack_row(message, created_at_formatted_string):
created_at_formatted_string=created_at_formatted_string,
local_id=message.get("local_id"),
row_id=message.get("row_id"),
successful_api_response=diagnostics is None, # Response is successful if and only if there are no diagnostics
successful_api_response=diagnostics
is None, # Response is successful if and only if there are no diagnostics
diagnostics=get_error_message_for_ack_file(diagnostics),
imms_id=message.get("imms_id"),
)
Expand All @@ -47,7 +53,9 @@ def lambda_handler(event, context):
"""

if not event.get("Records"):
raise ValueError("Error in ack_processor_lambda_handler: No records found in the event")
raise ValueError(
"Error in ack_processor_lambda_handler: No records found in the event"
)

file_key = None
created_at_formatted_string = None
Expand All @@ -59,17 +67,32 @@ def lambda_handler(event, context):
try:
incoming_message_body = json.loads(record["body"])
except Exception as body_json_error:
raise ValueError("Could not load incoming message body") from body_json_error
raise ValueError(
"Could not load incoming message body"
) from body_json_error

if i == 0:
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
# for all messages in the event. The use of FIFO SQS queues ensures that this is the case.
file_key = incoming_message_body[0].get("file_key")
created_at_formatted_string = incoming_message_body[0].get("created_at_formatted_string")
message_id = (incoming_message_body[0].get("row_id")).splint("^")[0]
vaccine_type = incoming_message_body[0].get("vaccine_type")
supplier = incoming_message_body[0].get("supplier")
supplier_queue = f"{supplier}_{vaccine_type}"
created_at_formatted_string = incoming_message_body[0].get(
"created_at_formatted_string"
)

for message in incoming_message_body:
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
ack_data_rows.append(
convert_message_to_ack_row(message, created_at_formatted_string)
)

update_ack_file(file_key, created_at_formatted_string, ack_data_rows)
update_ack_file(
file_key, message_id, supplier_queue, created_at_formatted_string, ack_data_rows
)

return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}
return {
"statusCode": 200,
"body": json.dumps("Lambda function executed successfully!"),
}
30 changes: 15 additions & 15 deletions ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
from boto3.dynamodb.conditions import Key
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_FILENAME_GSI, AUDIT_TABLE_QUEUE_NAME_GSI, FileStatus, AuditTableKeys
from constants import (
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
FileStatus,
AuditTableKeys,
)


def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
"""
Checks for queued files.
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
"""
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(
AUDIT_TABLE_NAME
).query(
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
Expand All @@ -21,21 +28,16 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
queued_files_details: list = queued_files_found_in_audit_table["Items"]

# Return the oldest queued file
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None
return (
sorted(queued_files_details, key=lambda x: x["timestamp"])[0]
if queued_files_details
else None
)


def change_audit_table_status_to_processed(file_key: str) -> str:
def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
try:
# TODO: This could create problems if there are duplicate filenames in the audit table
file_details_in_audit_table: dict = (
dynamodb_resource.Table(AUDIT_TABLE_NAME)
.query(IndexName=AUDIT_TABLE_FILENAME_GSI, KeyConditionExpression=Key(AuditTableKeys.FILENAME).eq(file_key))
.get("Items", [])[0]
)

message_id = file_details_in_audit_table.get("message_id")

# Update the status in the audit table to "Processed"
dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Expand All @@ -53,8 +55,6 @@ def change_audit_table_status_to_processed(file_key: str) -> str:
FileStatus.PROCESSED,
)

return file_details_in_audit_table.get("queue_name")

except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error
85 changes: 69 additions & 16 deletions ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@
from io import StringIO, BytesIO
from typing import Union
from botocore.exceptions import ClientError
from constants import ACK_HEADERS, SOURCE_BUCKET_NAME, ACK_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
from constants import (
ACK_HEADERS,
SOURCE_BUCKET_NAME,
ACK_BUCKET_NAME,
FILE_NAME_PROC_LAMBDA_NAME,
)
from audit_table import (
change_audit_table_status_to_processed,
get_next_queued_file_details,
)
from clients import s3_client, logger, lambda_client
from utils_for_ack_lambda import get_row_count

Expand All @@ -21,7 +29,13 @@ def create_ack_data(
"""Returns a dictionary containing the ack headers as keys, along with the relevant values."""
# Pack multi-line diagnostics down to single line (because Imms API diagnostics may be multi-line)
diagnostics = (
" ".join(diagnostics.replace("\r", " ").replace("\n", " ").replace("\t", " ").replace("\xa0", " ").split())
" ".join(
diagnostics.replace("\r", " ")
.replace("\n", " ")
.replace("\t", " ")
.replace("\xa0", " ")
.split()
)
if diagnostics is not None
else None
)
Expand All @@ -34,7 +48,9 @@ def create_ack_data(
"RESPONSE_TYPE": "Business",
"RESPONSE_CODE": "30001" if successful_api_response else "30002",
"RESPONSE_DISPLAY": (
"Success" if successful_api_response else "Business Level Response Value - Processing Error"
"Success"
if successful_api_response
else "Business Level Response Value - Processing Error"
),
"RECEIVED_TIME": created_at_formatted_string,
"MAILBOX_FROM": "", # TODO: Leave blank for DPS, use mailbox name if picked up from MESH mail box
Expand All @@ -49,7 +65,9 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = s3_client.get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
existing_ack_file = s3_client.get_object(
Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key
)
existing_content = existing_ack_file["Body"].read().decode("utf-8")
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
Expand All @@ -67,6 +85,8 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:

def upload_ack_file(
temp_ack_file_key: str,
message_id: str,
supplier_queue: str,
accumulated_csv_content: StringIO,
ack_data_rows: list,
archive_ack_file_key: str,
Expand All @@ -75,7 +95,9 @@ def upload_ack_file(
"""Adds the data row to the uploaded ack file"""
for row in ack_data_rows:
data_row_str = [str(item) for item in row.values()]
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
cleaned_row = (
"|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
)
accumulated_csv_content.write(cleaned_row + "\n")
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
s3_client.upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
Expand All @@ -88,27 +110,50 @@ def upload_ack_file(
move_file(SOURCE_BUCKET_NAME, f"processing/{file_key}", f"archive/{file_key}")

# Update the audit table and invoke the filename lambda with next file in the queue (if one exists)
queue_name = change_audit_table_status_to_processed(file_key)
next_queued_file_details = get_next_queued_file_details(queue_name)
change_audit_table_status_to_processed(file_key, message_id)
next_queued_file_details = get_next_queued_file_details(supplier_queue)
if next_queued_file_details:
invoke_filename_lambda(next_queued_file_details["filename"], next_queued_file_details["message_id"])
invoke_filename_lambda(
next_queued_file_details["filename"],
next_queued_file_details["message_id"],
)

logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, archive_ack_file_key)


def update_ack_file(file_key: str, created_at_formatted_string: str, ack_data_rows: list) -> None:
def update_ack_file(
file_key: str,
message_id: str,
supplier_queue: str,
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
ack_filename = (
f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
)
temp_ack_file_key = f"TempAck/{ack_filename}"
archive_ack_file_key = f"forwardedFile/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
upload_ack_file(temp_ack_file_key, accumulated_csv_content, ack_data_rows, archive_ack_file_key, file_key)
upload_ack_file(
temp_ack_file_key,
message_id,
supplier_queue,
accumulated_csv_content,
ack_data_rows,
archive_ack_file_key,
file_key,
)


def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
def move_file(
bucket_name: str, source_file_key: str, destination_file_key: str
) -> None:
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
s3_client.copy_object(
Bucket=bucket_name, CopySource={"Bucket": bucket_name, "Key": source_file_key}, Key=destination_file_key
Bucket=bucket_name,
CopySource={"Bucket": bucket_name, "Key": source_file_key},
Key=destination_file_key,
)
s3_client.delete_object(Bucket=bucket_name, Key=source_file_key)
logger.info("File moved from %s to %s", source_file_key, destination_file_key)
Expand All @@ -119,11 +164,19 @@ def invoke_filename_lambda(file_key: str, message_id: str) -> None:
try:
lambda_payload = {
"Records": [
{"s3": {"bucket": {"name": SOURCE_BUCKET_NAME}, "object": {"key": file_key}}, "message_id": message_id}
{
"s3": {
"bucket": {"name": SOURCE_BUCKET_NAME},
"object": {"key": file_key},
},
"message_id": message_id,
}
]
}
lambda_client.invoke(
FunctionName=FILE_NAME_PROC_LAMBDA_NAME, InvocationType="Event", Payload=json.dumps(lambda_payload)
FunctionName=FILE_NAME_PROC_LAMBDA_NAME,
InvocationType="Event",
Payload=json.dumps(lambda_payload),
)
except Exception as error:
logger.error("Error invoking filename lambda: %s", error)
Expand Down
4 changes: 2 additions & 2 deletions ack_backend/src/utils_for_ack_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@


def get_environment() -> str:
"""Returns the current environment. Defaults to internal-dev for pr and user environments"""
"""Returns the current environment"""
_env = os.getenv("ENVIRONMENT")
# default to internal-dev for pr and user environments
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else _env


def get_row_count(bucket_name: str, file_key: str) -> int:
Expand Down
30 changes: 15 additions & 15 deletions recordprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@
from boto3.dynamodb.conditions import Key
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_FILENAME_GSI, AUDIT_TABLE_QUEUE_NAME_GSI, AuditTableKeys, FileStatus
from constants import (
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
AuditTableKeys,
FileStatus,
)


def change_audit_table_status_to_processed(file_key: str) -> str:
def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
"""Updates the status in the audit table to 'Processed' and returns the queue name."""
try:
# TODO: This could create problems if there are duplicate filenames in the audit table
file_details_in_audit_table: dict = (
dynamodb_resource.Table(AUDIT_TABLE_NAME)
.query(IndexName=AUDIT_TABLE_FILENAME_GSI, KeyConditionExpression=Key(AuditTableKeys.FILENAME).eq(file_key))
.get("Items", [])[0]
)

message_id = file_details_in_audit_table.get("message_id")

# Update the status in the audit table to "Processed"
dynamodb_client.update_item(
TableName=AUDIT_TABLE_NAME,
Expand All @@ -36,8 +32,6 @@ def change_audit_table_status_to_processed(file_key: str) -> str:
FileStatus.PROCESSED,
)

return file_details_in_audit_table.get("queue_name")

except Exception as error: # pylint: disable = broad-exception-caught
logger.error(error)
raise UnhandledAuditTableError(error) from error
Expand All @@ -48,7 +42,9 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
Checks for queued files.
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
"""
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(
AUDIT_TABLE_NAME
).query(
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
Expand All @@ -57,4 +53,8 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
queued_files_details: list = queued_files_found_in_audit_table["Items"]

# Return the oldest queued file
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None
return (
sorted(queued_files_details, key=lambda x: x["timestamp"])[0]
if queued_files_details
else None
)
Loading

0 comments on commit 1c59a10

Please sign in to comment.