Skip to content

Commit

Permalink
Formatting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Valswyn-NHS committed Jan 28, 2025
1 parent 2a984b6 commit 8e7b628
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 127 deletions.
32 changes: 8 additions & 24 deletions ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import json
from typing import Union
from logging_decorators import (
ack_lambda_handler_logging_decorator,
convert_messsage_to_ack_row_logging_decorator,
)
from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator
from update_ack_file import update_ack_file, create_ack_data


Expand All @@ -20,9 +17,7 @@ def get_error_message_for_ack_file(message_diagnostics) -> Union[None, str]:
if message_diagnostics.get("statusCode") in (None, 500):
return "An unhandled error occurred during batch processing"

return message_diagnostics.get(
"error_message", "Unable to determine diagnostics issue"
)
return message_diagnostics.get("error_message", "Unable to determine diagnostics issue")


@convert_messsage_to_ack_row_logging_decorator
Expand All @@ -37,8 +32,7 @@ def convert_message_to_ack_row(message, created_at_formatted_string):
created_at_formatted_string=created_at_formatted_string,
local_id=message.get("local_id"),
row_id=message.get("row_id"),
successful_api_response=diagnostics
is None, # Response is successful if and only if there are no diagnostics
successful_api_response=diagnostics is None, # Response is successful if and only if there are no diagnostics
diagnostics=get_error_message_for_ack_file(diagnostics),
imms_id=message.get("imms_id"),
)
Expand All @@ -53,9 +47,7 @@ def lambda_handler(event, context):
"""

if not event.get("Records"):
raise ValueError(
"Error in ack_processor_lambda_handler: No records found in the event"
)
raise ValueError("Error in ack_processor_lambda_handler: No records found in the event")

file_key = None
created_at_formatted_string = None
Expand All @@ -67,9 +59,7 @@ def lambda_handler(event, context):
try:
incoming_message_body = json.loads(record["body"])
except Exception as body_json_error:
raise ValueError(
"Could not load incoming message body"
) from body_json_error
raise ValueError("Could not load incoming message body") from body_json_error

if i == 0:
# IMPORTANT NOTE: An assumption is made here that the file_key and created_at_formatted_string are the same
Expand All @@ -79,18 +69,12 @@ def lambda_handler(event, context):
vaccine_type = incoming_message_body[0].get("vaccine_type")
supplier = incoming_message_body[0].get("supplier")
supplier_queue = f"{supplier}_{vaccine_type}"
created_at_formatted_string = incoming_message_body[0].get(
"created_at_formatted_string"
)
created_at_formatted_string = incoming_message_body[0].get("created_at_formatted_string")

for message in incoming_message_body:
ack_data_rows.append(
convert_message_to_ack_row(message, created_at_formatted_string)
)
ack_data_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))

update_ack_file(
file_key, message_id, supplier_queue, created_at_formatted_string, ack_data_rows
)
update_ack_file(file_key, message_id, supplier_queue, created_at_formatted_string, ack_data_rows)

return {
"statusCode": 200,
Expand Down
17 changes: 3 additions & 14 deletions ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,15 @@
from boto3.dynamodb.conditions import Key
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError
from constants import (
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
FileStatus,
AuditTableKeys,
)
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, FileStatus, AuditTableKeys


def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
"""
Checks for queued files.
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
"""
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(
AUDIT_TABLE_NAME
).query(
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
Expand All @@ -28,11 +21,7 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
queued_files_details: list = queued_files_found_in_audit_table["Items"]

# Return the oldest queued file
return (
sorted(queued_files_details, key=lambda x: x["timestamp"])[0]
if queued_files_details
else None
)
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None


def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
Expand Down
40 changes: 8 additions & 32 deletions ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,8 @@
from io import StringIO, BytesIO
from typing import Union
from botocore.exceptions import ClientError
from constants import (
ACK_HEADERS,
SOURCE_BUCKET_NAME,
ACK_BUCKET_NAME,
FILE_NAME_PROC_LAMBDA_NAME,
)
from audit_table import (
change_audit_table_status_to_processed,
get_next_queued_file_details,
)
from constants import ACK_HEADERS, SOURCE_BUCKET_NAME, ACK_BUCKET_NAME, FILE_NAME_PROC_LAMBDA_NAME
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
from clients import s3_client, logger, lambda_client
from utils_for_ack_lambda import get_row_count

Expand All @@ -29,13 +21,7 @@ def create_ack_data(
"""Returns a dictionary containing the ack headers as keys, along with the relevant values."""
# Pack multi-line diagnostics down to single line (because Imms API diagnostics may be multi-line)
diagnostics = (
" ".join(
diagnostics.replace("\r", " ")
.replace("\n", " ")
.replace("\t", " ")
.replace("\xa0", " ")
.split()
)
" ".join(diagnostics.replace("\r", " ").replace("\n", " ").replace("\t", " ").replace("\xa0", " ").split())
if diagnostics is not None
else None
)
Expand All @@ -48,9 +34,7 @@ def create_ack_data(
"RESPONSE_TYPE": "Business",
"RESPONSE_CODE": "30001" if successful_api_response else "30002",
"RESPONSE_DISPLAY": (
"Success"
if successful_api_response
else "Business Level Response Value - Processing Error"
"Success" if successful_api_response else "Business Level Response Value - Processing Error"
),
"RECEIVED_TIME": created_at_formatted_string,
"MAILBOX_FROM": "", # TODO: Leave blank for DPS, use mailbox name if picked up from MESH mail box
Expand All @@ -65,9 +49,7 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = s3_client.get_object(
Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key
)
existing_ack_file = s3_client.get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
existing_content = existing_ack_file["Body"].read().decode("utf-8")
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
Expand Down Expand Up @@ -95,9 +77,7 @@ def upload_ack_file(
"""Adds the data row to the uploaded ack file"""
for row in ack_data_rows:
data_row_str = [str(item) for item in row.values()]
cleaned_row = (
"|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
)
cleaned_row = "|".join(data_row_str).replace(" |", "|").replace("| ", "|").strip()
accumulated_csv_content.write(cleaned_row + "\n")
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
s3_client.upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
Expand Down Expand Up @@ -129,9 +109,7 @@ def update_ack_file(
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = (
f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
)
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
temp_ack_file_key = f"TempAck/{ack_filename}"
archive_ack_file_key = f"forwardedFile/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)
Expand All @@ -146,9 +124,7 @@ def update_ack_file(
)


def move_file(
bucket_name: str, source_file_key: str, destination_file_key: str
) -> None:
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
s3_client.copy_object(
Bucket=bucket_name,
Expand Down
4 changes: 1 addition & 3 deletions ack_backend/src/utils_for_ack_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

def get_environment() -> str:
"""Returns the current environment"""
_env = os.getenv("ENVIRONMENT")
# default to internal-dev for pr and user environments
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else _env
return os.getenv("ENVIRONMENT")


def get_row_count(bucket_name: str, file_key: str) -> int:
Expand Down
17 changes: 3 additions & 14 deletions recordprocessor/src/audit_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
from boto3.dynamodb.conditions import Key
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError
from constants import (
AUDIT_TABLE_NAME,
AUDIT_TABLE_QUEUE_NAME_GSI,
AuditTableKeys,
FileStatus,
)
from constants import AUDIT_TABLE_NAME, AUDIT_TABLE_QUEUE_NAME_GSI, AuditTableKeys, FileStatus


def change_audit_table_status_to_processed(file_key: str, message_id: str) -> None:
Expand Down Expand Up @@ -42,9 +37,7 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
Checks for queued files.
Returns a dictionary containing the details of the oldest queued file, or returns None if no queued files are found.
"""
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(
AUDIT_TABLE_NAME
).query(
queued_files_found_in_audit_table: dict = dynamodb_resource.Table(AUDIT_TABLE_NAME).query(
IndexName=AUDIT_TABLE_QUEUE_NAME_GSI,
KeyConditionExpression=Key(AuditTableKeys.QUEUE_NAME).eq(queue_name)
& Key(AuditTableKeys.STATUS).eq(FileStatus.QUEUED),
Expand All @@ -53,8 +46,4 @@ def get_next_queued_file_details(queue_name: str) -> Union[dict, None]:
queued_files_details: list = queued_files_found_in_audit_table["Items"]

# Return the oldest queued file
return (
sorted(queued_files_details, key=lambda x: x["timestamp"])[0]
if queued_files_details
else None
)
return sorted(queued_files_details, key=lambda x: x["timestamp"])[0] if queued_files_details else None
52 changes: 12 additions & 40 deletions recordprocessor/src/file_level_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,10 @@
from clients import logger, s3_client
from make_and_upload_ack_file import make_and_upload_ack_file
from mappings import Vaccine
from utils_for_recordprocessor import (
get_csv_content_dict_reader,
invoke_filename_lambda,
)
from utils_for_recordprocessor import get_csv_content_dict_reader, invoke_filename_lambda
from errors import InvalidHeaders, NoOperationPermissions
from logging_decorator import file_level_validation_logging_decorator
from audit_table import (
change_audit_table_status_to_processed,
get_next_queued_file_details,
)
from audit_table import change_audit_table_status_to_processed, get_next_queued_file_details
from constants import SOURCE_BUCKET_NAME, EXPECTED_CSV_HEADERS


Expand All @@ -43,32 +37,23 @@ def validate_action_flag_permissions(

# Convert action flags into the expected operation names
requested_permissions_set = {
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}"
for action in operations_requested
f"{vaccine_type}_{'CREATE' if action == 'NEW' else action}" for action in operations_requested
}

# Check if any of the CSV permissions match the allowed permissions
if not requested_permissions_set.intersection(allowed_permissions_list):
raise NoOperationPermissions(
f"{supplier} does not have permissions to perform any of the requested actions."
)
raise NoOperationPermissions(f"{supplier} does not have permissions to perform any of the requested actions.")

logger.info(
"%s permissions %s match one of the requested permissions required to %s",
supplier,
allowed_permissions_list,
requested_permissions_set,
)
return {
perm.split("_")[1].upper()
for perm in allowed_permissions_list
if perm.startswith(vaccine_type)
}
return {perm.split("_")[1].upper() for perm in allowed_permissions_list if perm.startswith(vaccine_type)}


def move_file(
bucket_name: str, source_file_key: str, destination_file_key: str
) -> None:
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
s3_client.copy_object(
Bucket=bucket_name,
Expand All @@ -91,30 +76,22 @@ def file_level_validation(incoming_message_body: dict) -> None:
try:
message_id = incoming_message_body.get("message_id")
vaccine: Vaccine = next( # Convert vaccine_type to Vaccine enum
vaccine
for vaccine in Vaccine
if vaccine.value == incoming_message_body.get("vaccine_type").upper()
vaccine for vaccine in Vaccine if vaccine.value == incoming_message_body.get("vaccine_type").upper()
)
supplier = incoming_message_body.get("supplier").upper()
file_key = incoming_message_body.get("filename")
permission = incoming_message_body.get("permission")
created_at_formatted_string = incoming_message_body.get(
"created_at_formatted_string"
)
created_at_formatted_string = incoming_message_body.get("created_at_formatted_string")

# Fetch the data
csv_reader, csv_data = get_csv_content_dict_reader(file_key)

validate_content_headers(csv_reader)

# Validate has permission to perform at least one of the requested actions
allowed_operations_set = validate_action_flag_permissions(
supplier, vaccine.value, permission, csv_data
)
allowed_operations_set = validate_action_flag_permissions(supplier, vaccine.value, permission, csv_data)

make_and_upload_ack_file(
message_id, file_key, True, True, created_at_formatted_string
)
make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string)

move_file(SOURCE_BUCKET_NAME, file_key, f"processing/{file_key}")

Expand All @@ -134,13 +111,8 @@ def file_level_validation(incoming_message_body: dict) -> None:
# NOTE: The Exception may occur before the file_id, file_key and created_at_formatted_string are assigned
message_id = message_id or "Unable to ascertain message_id"
file_key = file_key or "Unable to ascertain file_key"
created_at_formatted_string = (
created_at_formatted_string
or "Unable to ascertain created_at_formatted_string"
)
make_and_upload_ack_file(
message_id, file_key, False, False, created_at_formatted_string
)
created_at_formatted_string = created_at_formatted_string or "Unable to ascertain created_at_formatted_string"
make_and_upload_ack_file(message_id, file_key, False, False, created_at_formatted_string)

move_file(SOURCE_BUCKET_NAME, file_key, f"archive/{file_key}")

Expand Down

0 comments on commit 8e7b628

Please sign in to comment.