Skip to content

Commit

Permalink
Merge branch 'master' into AMB-2315-recordforwarder-tests-part-1
Browse files Browse the repository at this point in the history
  • Loading branch information
Valswyn-NHS authored Jan 27, 2025
2 parents 8057992 + 8f8efd9 commit f10b1e4
Show file tree
Hide file tree
Showing 20 changed files with 279 additions and 122 deletions.
17 changes: 9 additions & 8 deletions ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Ack lambda handler"""

import json
import os
from utils_for_ack_lambda import get_environment
from typing import Union
from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator
from update_ack_file import update_ack_file, create_ack_data
from boto3 import client as boto3_client
from clients import s3_client

s3_client = boto3_client("s3", region_name="eu-west-2")

ENVIRONMENT = get_environment()
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"

@convert_messsage_to_ack_row_logging_decorator
def convert_message_to_ack_row(message, created_at_formatted_string):
Expand Down Expand Up @@ -53,9 +55,7 @@ def lambda_handler(event, context):
created_at_formatted_string = None

array_of_rows = []
environment = os.getenv("ENVIRONMENT")
source_bucket_name = f"immunisation-batch-{environment}-data-sources"


for i, record in enumerate(event["Records"]):

try:
Expand All @@ -71,12 +71,13 @@ def lambda_handler(event, context):

for message in incoming_message_body:
array_of_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
row_count = get_row_count_stream(source_bucket_name, f"processing/{file_key}")
row_count = get_row_count_stream(SOURCE_BUCKET_NAME, f"processing/{file_key}")
update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows, row_count=row_count)
return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}


def get_row_count_stream(bucket_name, key):
response = s3_client.get_object(Bucket=bucket_name, Key=key)
count = sum(1 for _ in response['Body'].iter_lines())
count = sum(1 for line in response['Body'].iter_lines() if line.strip())

return count
22 changes: 14 additions & 8 deletions ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
"""Add the filename to the audit table and check for duplicates."""

import os
from typing import Union
from boto3.dynamodb.conditions import Key,Attr
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError


def add_to_audit_table(file_key: str) -> None:
def update_audit_table_status(file_key: str) -> str:
"""
Adds the filename to the audit table.
Raises an error if the file is a duplicate (after adding it to the audit table).
Update the status in the audit table.
"""
try:
table_name = os.environ["AUDIT_TABLE_NAME"]
file_name_gsi = "filename_index"
# Check for duplicates before adding to the table (if the query returns any items, then the file is a duplicate)
file_name_response = dynamodb_resource.Table(table_name).query(
IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key)
)
items = file_name_response.get("Items", [])
message_id = items[0].get("message_id")
queue_name = items[0].get("queue_name")
# Add to the audit table (regardless of whether it is a duplicate)
# Add to the audit table
dynamodb_client.update_item(
TableName=table_name,
Key={"message_id": {"S": message_id}},
Expand All @@ -37,8 +36,12 @@ def add_to_audit_table(file_key: str) -> None:
logger.error(error_message)
raise UnhandledAuditTableError(error_message) from error

def check_queue(queue_name: str):

def get_queued_file_details(queue_name: str) -> tuple[Union[None,str],Union[None,str]]:
"""
Check for queued files which return none or oldest file queued for processing.
Returns a tuple in the format (file_name, message_id) for the oldest file.
Defaults to (none,none) if no file found in queued status
"""
table_name = os.environ["AUDIT_TABLE_NAME"]
queue_name_gsi = "queue_name_index"

Expand All @@ -53,7 +56,10 @@ def check_queue(queue_name: str):
else:
return None, None

def get_file_name(queue_response: dict):
def get_file_name(queue_response: dict) -> tuple[str,str]:
"""
Returns (file_name, message_id) for the oldest file.
"""
sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"])
first_record = sorted_item[0]
file_name = first_record.get("filename")
Expand Down
26 changes: 16 additions & 10 deletions ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from typing import Union
from botocore.exceptions import ClientError
from constants import Constants
from audit_table import add_to_audit_table, check_queue
environment = os.getenv("ENVIRONMENT")
source_bucket_name = f"immunisation-batch-{environment}-data-sources"
from audit_table import update_audit_table_status, get_queued_file_details
from clients import s3_client, logger, lambda_client
#TODO move to constants
ENVIRONMENT = os.getenv("ENVIRONMENT")
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")

def create_ack_data(
created_at_formatted_string: str,
local_id: str,
Expand Down Expand Up @@ -81,14 +83,14 @@ def upload_ack_file(
move_file(ack_bucket_name, ack_file_key, archive_ack_file_key)
source_key = f"processing/{file_key}"
destination_key = f"archive/{file_key}"
move_file(source_bucket_name, source_key, destination_key)
queue_name = add_to_audit_table(file_key)
file_key, message_id = check_queue(queue_name)
move_file(SOURCE_BUCKET_NAME, source_key, destination_key)
queue_name = update_audit_table_status(file_key)
file_key, message_id = get_queued_file_details(queue_name)
if file_key and message_id is not None:
# Directly invoke the Lambda function
invoke_lambda(source_bucket_name, file_key, message_id)
invoke_filename_lambda(SOURCE_BUCKET_NAME, file_key, message_id)

logger.info("Ack file updated to %s: %s", ack_bucket_name, ack_file_key)
logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)


def update_ack_file(
Expand All @@ -111,7 +113,11 @@ def get_row_count_stream(bucket_name, key):

def move_file(bucket_name: str, source_key: str, destination_key: str) -> None:

""" Moves a file from one location to another in S3 by copying and then deleting it. Args: bucket_name (str): Name of the S3 bucket. source_key (str): Source file key. destination_key (str): Destination file key. """
""" Moves a file from one location to another in S3 by copying and then deleting it.
Args: bucket_name (str): Name of the S3 bucket.
source_key (str): Source file key.
destination_key (str): Destination file key.
"""
s3_client.copy_object(
Bucket=bucket_name,
CopySource={"Bucket": bucket_name, "Key": source_key},
Expand All @@ -121,7 +127,7 @@ def move_file(bucket_name: str, source_key: str, destination_key: str) -> None:
logger.info("File moved from %s to %s", source_key, destination_key)


def invoke_lambda(source_bucket_name, file_key, message_id):
def invoke_filename_lambda(source_bucket_name, file_key, message_id):
lambda_payload = {"Records":[
{
"s3": {
Expand Down
10 changes: 10 additions & 0 deletions ack_backend/src/utils_for_ack_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Utils for ack lambda"""

import os


def get_environment() -> str:
"""Returns the current environment. Defaults to internal-dev for pr and user environments"""
_env = os.getenv("ENVIRONMENT")
# default to internal-dev for pr and user environments
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
20 changes: 20 additions & 0 deletions backend/tests/test_fhir_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ def test_create_immunization_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "create"
},
"body": imms.json(),
}
Expand All @@ -936,6 +937,7 @@ def test_create_immunization_for_batch(self, mock_send_message):
"row_id": aws_event["headers"]["row_id"],
"created_at_formatted_string": aws_event["headers"]["created_at_formatted_string"],
"local_id": aws_event["headers"]["local_id"],
"operation_requested": "create"
}
)

Expand All @@ -961,6 +963,7 @@ def test_create_immunization_for_unauthorized(self):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "create"
},
"body": imms.json(),
}
Expand Down Expand Up @@ -1067,6 +1070,7 @@ def test_duplicate_record_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "create"
},
"body": imms.json(),
}
Expand Down Expand Up @@ -1108,6 +1112,7 @@ def test_pds_unhandled_error_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "create"
},
"body": imms.json(),
}
Expand Down Expand Up @@ -1165,6 +1170,7 @@ def test_update_immunization_etag_missing(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": imms_id},
Expand Down Expand Up @@ -1192,6 +1198,7 @@ def test_update_immunization_duplicate(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": imms_id},
Expand Down Expand Up @@ -1222,6 +1229,7 @@ def test_update_immunization_UnauthorizedVaxError(self):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": imms_id},
Expand Down Expand Up @@ -1277,6 +1285,7 @@ def test_update_immunization_for_batch_existing_record_is_none(self, mock_sqs_me
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": imms_id},
Expand Down Expand Up @@ -1314,6 +1323,7 @@ def test_update_immunization_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": imms_id},
Expand All @@ -1336,6 +1346,7 @@ def test_update_immunization_for_batch(self, mock_send_message):
"row_id": aws_event["headers"]["row_id"],
"created_at_formatted_string": aws_event["headers"]["created_at_formatted_string"],
"local_id": aws_event["headers"]["local_id"],
"operation_requested": "update"
}
)

Expand Down Expand Up @@ -1473,6 +1484,7 @@ def test_validation_error_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": imms,
"pathParameters": {"id": "valid-id"},
Expand Down Expand Up @@ -1592,6 +1604,7 @@ def test_inconsistent_imms_id_for_batch(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"body": bad_json,
"pathParameters": {"id": "an-id"},
Expand Down Expand Up @@ -1663,6 +1676,7 @@ def test_validate_imms_id_for_batch(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "update"
},
"pathParameters": {"id": "invalid %$ id"},
"body": valid_json,
Expand Down Expand Up @@ -1706,6 +1720,7 @@ def test_validate_imms_id_for_batch(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "delete"
},
}

Expand Down Expand Up @@ -1765,6 +1780,7 @@ def test_delete_immunization_unauthorised_vax(self, mock_sqs_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "delete"
},
"pathParameters": {"id": imms_id},
}
Expand All @@ -1790,6 +1806,7 @@ def test_delete_immunization_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "delete"
},
"pathParameters": {"id": imms_id},
}
Expand All @@ -1807,6 +1824,7 @@ def test_delete_immunization_for_batch(self, mock_send_message):
"row_id": lambda_event["headers"]["row_id"],
"created_at_formatted_string": lambda_event["headers"]["created_at_formatted_string"],
"local_id": lambda_event["headers"]["local_id"],
"operation_requested": "delete"
}
)

Expand Down Expand Up @@ -1850,6 +1868,7 @@ def test_immunization_exception_not_found_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "delete"
},
"pathParameters": {"id": "a-non-existing-id"},
}
Expand Down Expand Up @@ -1897,6 +1916,7 @@ def test_immunization_unhandled_error_for_batch(self, mock_send_message):
"row_id": "123",
"created_at_formatted_string": "2020-01-01",
"local_id": ValidValues.test_local_id,
"operation_requested": "delete"
},
"pathParameters": {"id": "a-non-existing-id"},
}
Expand Down
2 changes: 1 addition & 1 deletion delta_backend/src/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def handler(event, context):
imms_id = new_image["PK"]["S"].split("#")[1]
vaccine_type = get_vaccine_type(new_image["PatientSK"]["S"])
supplier_system = new_image["SupplierSystem"]["S"]
if supplier_system not in ("DPS_FULL", "DPS_REDUCED"):
if supplier_system not in ("DPSFULL", "DPSREDUCED"):
operation = new_image["Operation"]["S"]
if operation == "CREATE":
operation = "NEW"
Expand Down
Loading

0 comments on commit f10b1e4

Please sign in to comment.