Skip to content

Commit

Permalink
Merge branch 'master' into AMB-2327-Major-pipeline-issue-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CLJ2006 authored Jan 27, 2025
2 parents 215c176 + 8f8efd9 commit 07af6ef
Show file tree
Hide file tree
Showing 76 changed files with 2,287 additions and 235 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/continuous-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ jobs:
with:
fetch-depth: 0 # This causes all history to be fetched, which is required for calculate-version to function

- name: Install Python 3.8
- name: Install Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.8
python-version: 3.9

- name: Upgrade python pip
run: python -m pip install --upgrade pip
Expand Down
17 changes: 14 additions & 3 deletions ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Ack lambda handler"""

import json
from utils_for_ack_lambda import get_environment
from typing import Union
from logging_decorators import ack_lambda_handler_logging_decorator, convert_messsage_to_ack_row_logging_decorator
from update_ack_file import update_ack_file, create_ack_data
from clients import s3_client


ENVIRONMENT = get_environment()
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"

@convert_messsage_to_ack_row_logging_decorator
def convert_message_to_ack_row(message, created_at_formatted_string):
"""
Expand Down Expand Up @@ -50,7 +55,7 @@ def lambda_handler(event, context):
created_at_formatted_string = None

array_of_rows = []

for i, record in enumerate(event["Records"]):

try:
Expand All @@ -66,7 +71,13 @@ def lambda_handler(event, context):

for message in incoming_message_body:
array_of_rows.append(convert_message_to_ack_row(message, created_at_formatted_string))
row_count = get_row_count_stream(SOURCE_BUCKET_NAME, f"processing/{file_key}")
update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows, row_count=row_count)
return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}

update_ack_file(file_key, created_at_formatted_string=created_at_formatted_string, ack_data_rows=array_of_rows)

return {"statusCode": 200, "body": json.dumps("Lambda function executed successfully!")}
def get_row_count_stream(bucket_name, key):
response = s3_client.get_object(Bucket=bucket_name, Key=key)
count = sum(1 for line in response['Body'].iter_lines() if line.strip())

return count
67 changes: 67 additions & 0 deletions ack_backend/src/audit_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Add the filename to the audit table and check for duplicates."""

import os
from typing import Union
from boto3.dynamodb.conditions import Key,Attr
from clients import dynamodb_client, dynamodb_resource, logger
from errors import UnhandledAuditTableError


def update_audit_table_status(file_key: str) -> str:
"""
Update the status in the audit table.
"""
try:
table_name = os.environ["AUDIT_TABLE_NAME"]
file_name_gsi = "filename_index"
file_name_response = dynamodb_resource.Table(table_name).query(
IndexName=file_name_gsi, KeyConditionExpression=Key("filename").eq(file_key)
)
items = file_name_response.get("Items", [])
message_id = items[0].get("message_id")
queue_name = items[0].get("queue_name")
# Add to the audit table
dynamodb_client.update_item(
TableName=table_name,
Key={"message_id": {"S": message_id}},
UpdateExpression="SET #status = :status",
ExpressionAttributeNames={"#status": "status"},
ExpressionAttributeValues={":status": {"S": "Processed"}},
ConditionExpression="attribute_exists(message_id)"
)
logger.info("%s file, with message id %s, and the status successfully updated to audit table", file_key, message_id)
return queue_name
except Exception as error: # pylint: disable = broad-exception-caught
error_message = error #f"Error adding {file_key} to the audit table"
logger.error(error_message)
raise UnhandledAuditTableError(error_message) from error

def get_queued_file_details(queue_name: str) -> tuple[Union[None,str],Union[None,str]]:
"""
Check for queued files which return none or oldest file queued for processing.
Returns a tuple in the format (file_name, message_id) for the oldest file.
Defaults to (none,none) if no file found in queued status
"""
table_name = os.environ["AUDIT_TABLE_NAME"]
queue_name_gsi = "queue_name_index"

queue_response = dynamodb_resource.Table(table_name).query(
IndexName=queue_name_gsi,
KeyConditionExpression=Key("queue_name").eq(queue_name)
& Key("status").eq("Queued"),
)
if queue_response["Items"]:
file_name, message_id = get_file_name(queue_response)
return file_name, message_id
else:
return None, None

def get_file_name(queue_response: dict) -> tuple[str,str]:
"""
Returns (file_name, message_id) for the oldest file.
"""
sorted_item = sorted(queue_response["Items"], key=lambda x: x["timestamp"])
first_record = sorted_item[0]
file_name = first_record.get("filename")
message_id = first_record.get("message_id")
return file_name, message_id
5 changes: 4 additions & 1 deletion ack_backend/src/clients.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Initialise clients and logger"""

import logging
from boto3 import client as boto3_client
from boto3 import client as boto3_client, resource as boto3_resource

REGION_NAME = "eu-west-2"

s3_client = boto3_client("s3", region_name=REGION_NAME)
firehose_client = boto3_client("firehose", region_name=REGION_NAME)
lambda_client = boto3_client('lambda', region_name=REGION_NAME)
dynamodb_client = boto3_client("dynamodb", region_name=REGION_NAME)
dynamodb_resource = boto3_resource("dynamodb", region_name=REGION_NAME)


# Logger
Expand Down
29 changes: 29 additions & 0 deletions ack_backend/src/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Custom exceptions for the Filename Processor."""


class DuplicateFileError(Exception):
"""A custom exception for when it is identified that the file is a duplicate."""


class ProcessingError(Exception):
"""A custom exception for when it is identified that supplier_vaccine file is under processing"""


class UnhandledAuditTableError(Exception):
"""A custom exception for when an unexpected error occurs whilst adding the file to the audit table."""


class VaccineTypePermissionsError(Exception):
"""A custom exception for when the supplier does not have the necessary vaccine type permissions."""


class InvalidFileKeyError(Exception):
"""A custom exception for when the file key is invalid."""


class InvalidSupplierError(Exception):
"""A custom exception for when the supplier has not been correctly identified."""


class UnhandledSqsError(Exception):
"""A custom exception for when an unexpected error occurs whilst sending a message to SQS."""
79 changes: 72 additions & 7 deletions ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""Functions for adding a row of data to the ack file"""

import os
import json
from io import StringIO, BytesIO
from typing import Union
from botocore.exceptions import ClientError
from constants import Constants
from clients import s3_client, logger

from audit_table import update_audit_table_status, get_queued_file_details
from clients import s3_client, logger, lambda_client
#TODO move to constants
ENVIRONMENT = os.getenv("ENVIRONMENT")
SOURCE_BUCKET_NAME = f"immunisation-batch-{ENVIRONMENT}-data-sources"
FILE_NAME_PROC_LAMBDA_NAME = os.getenv("FILE_NAME_PROC_LAMBDA_NAME")

def create_ack_data(
created_at_formatted_string: str,
Expand Down Expand Up @@ -63,7 +68,8 @@ def obtain_current_ack_content(ack_bucket_name: str, ack_file_key: str) -> Strin


def upload_ack_file(
ack_bucket_name: str, ack_file_key: str, accumulated_csv_content: StringIO, ack_data_row: any
ack_bucket_name: str, ack_file_key: str, accumulated_csv_content: StringIO, ack_data_row: any, row_count: int, archive_ack_file_key: str, file_key: str
, created_at_formatted_string: str
) -> None:
"""Adds the data row to the uploaded ack file"""
for row in ack_data_row:
Expand All @@ -72,12 +78,71 @@ def upload_ack_file(
accumulated_csv_content.write(cleaned_row + "\n")
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))
s3_client.upload_fileobj(csv_file_like_object, ack_bucket_name, ack_file_key)
logger.info("Ack file updated to %s: %s", ack_bucket_name, ack_file_key)
row_count_dest = get_row_count_stream(ack_bucket_name, ack_file_key)
if row_count == row_count_dest:
move_file(ack_bucket_name, ack_file_key, archive_ack_file_key)
source_key = f"processing/{file_key}"
destination_key = f"archive/{file_key}"
move_file(SOURCE_BUCKET_NAME, source_key, destination_key)
queue_name = update_audit_table_status(file_key)
file_key, message_id = get_queued_file_details(queue_name)
if file_key and message_id is not None:
# Directly invoke the Lambda function
invoke_filename_lambda(SOURCE_BUCKET_NAME, file_key, message_id)

logger.info("Ack file updated to %s: %s", ack_bucket_name, archive_ack_file_key)


def update_ack_file(file_key: str, created_at_formatted_string: str, ack_data_rows: any) -> None:
def update_ack_file(
file_key: str,
created_at_formatted_string: str,
ack_data_rows: any,
row_count
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_file_key = f"forwardedFile/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
ack_file_key = f"TempAck/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
archive_ack_file_key = f"forwardedFile/{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
ack_bucket_name = os.getenv("ACK_BUCKET_NAME")
accumulated_csv_content = obtain_current_ack_content(ack_bucket_name, ack_file_key)
upload_ack_file(ack_bucket_name, ack_file_key, accumulated_csv_content, ack_data_rows)
upload_ack_file(ack_bucket_name, ack_file_key, accumulated_csv_content, ack_data_rows, row_count, archive_ack_file_key, file_key, created_at_formatted_string)

def get_row_count_stream(bucket_name, key):
response = s3_client.get_object(Bucket=bucket_name, Key=key)
count = sum(1 for _ in response['Body'].iter_lines())
return count

def move_file(bucket_name: str, source_key: str, destination_key: str) -> None:

""" Moves a file from one location to another in S3 by copying and then deleting it.
Args: bucket_name (str): Name of the S3 bucket.
source_key (str): Source file key.
destination_key (str): Destination file key.
"""
s3_client.copy_object(
Bucket=bucket_name,
CopySource={"Bucket": bucket_name, "Key": source_key},
Key=destination_key
)
s3_client.delete_object(Bucket=bucket_name, Key=source_key)
logger.info("File moved from %s to %s", source_key, destination_key)


def invoke_filename_lambda(source_bucket_name, file_key, message_id):
lambda_payload = {"Records":[
{
"s3": {
"bucket": {
"name": source_bucket_name
},
"object": {
"key": file_key
}
},
"message_id": message_id
}
]
}
lambda_client.invoke(
FunctionName=FILE_NAME_PROC_LAMBDA_NAME,
InvocationType="Event",
Payload=json.dumps(lambda_payload))
10 changes: 10 additions & 0 deletions ack_backend/src/utils_for_ack_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Utils for ack lambda"""

import os


def get_environment() -> str:
"""Returns the current environment. Defaults to internal-dev for pr and user environments"""
_env = os.getenv("ENVIRONMENT")
# default to internal-dev for pr and user environments
return _env if _env in ["internal-dev", "int", "ref", "sandbox", "prod"] else "internal-dev"
2 changes: 1 addition & 1 deletion azure/azure-release-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ extends:
- int
- ref
jinja_templates:
DOMAIN_ENDPOINT: https://prod.imms.prod.vds.platform.nhs.uk
DOMAIN_ENDPOINT: https://blue.imms.prod.vds.platform.nhs.uk

Loading

0 comments on commit 07af6ef

Please sign in to comment.