-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: use capepy for decreasing boilerplate
- Loading branch information
Showing
2 changed files
with
45 additions
and
182 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,205 +1,68 @@ | ||
"""ETL script for raw Epi/HAI sequencing report pdf.""" | ||
|
||
import io | ||
import re | ||
import sys | ||
from pathlib import Path | ||
from datetime import datetime | ||
|
||
import boto3 as boto3 | ||
import pandas as pd | ||
import pymupdf | ||
from awsglue.context import GlueContext | ||
from awsglue.utils import getResolvedOptions | ||
from pyspark.sql import SparkSession | ||
from capepy.aws.glue import EtlJob | ||
from pypdf import PdfReader | ||
from tabula.io import read_pdf | ||
|
||
# for our purposes here, the spark and glue context are only (currently) needed | ||
# to get the logger. | ||
spark_ctx = SparkSession.builder.getOrCreate() # pyright: ignore | ||
glue_ctx = GlueContext(spark_ctx) | ||
logger = glue_ctx.get_logger() | ||
|
||
# TODO: | ||
# - add error handling for the format of the document being incorrect | ||
# - figure out how we want to name and namespace clean files (e.g. will we | ||
# take the object key we're given, strip the extension and replace it with | ||
# one for the new format, or will we do something else) | ||
# - see what we can extract out of here to be useful for other ETLs. imagine | ||
# we'd have a few different things that could be made into a reusable | ||
# package | ||
|
||
parameters = getResolvedOptions( | ||
sys.argv, | ||
[ | ||
"RAW_BUCKET_NAME", | ||
"ALERT_OBJ_KEY", | ||
"CLEAN_BUCKET_NAME", | ||
], | ||
) | ||
|
||
raw_bucket_name = parameters["RAW_BUCKET_NAME"] | ||
alert_obj_key = parameters["ALERT_OBJ_KEY"] | ||
clean_bucket_name = parameters["CLEAN_BUCKET_NAME"] | ||
etl_job = EtlJob() | ||
|
||
# NOTE: for now we'll take the alert object key and change out the file | ||
# extension for the clean data (leaving all namespacing and such). this | ||
# will probably need to change | ||
clean_obj_key = str(Path(alert_obj_key).with_suffix(".csv")) | ||
|
||
# NOTE: May need some creds here | ||
s3_client = boto3.client("s3") | ||
|
||
# try to get the pdf object from S3 and handle any error that would keep us | ||
# from continuing. | ||
response = s3_client.get_object(Bucket=raw_bucket_name, Key=alert_obj_key) | ||
|
||
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") | ||
|
||
if status != 200: | ||
err = ( | ||
f"ERROR - Could not get object {alert_obj_key} from bucket " | ||
f"{raw_bucket_name}. ETL Cannot continue." | ||
) | ||
|
||
logger.error(err) | ||
|
||
# NOTE: need to properly handle exception stuff here, and we probably want | ||
# this going somewhere very visible (e.g. SNS topic or a perpetual log | ||
# as someone will need to be made aware) | ||
raise Exception(err) | ||
|
||
logger.info(f"Obtained object {alert_obj_key} from bucket {raw_bucket_name}.") | ||
|
||
# handle the document itself... | ||
clean_obj_key = etl_job.parameters["OBJECT_KEY"].replace(".pdf", ".csv") | ||
|
||
# the response should contain a StreamingBody object that needs to be converted | ||
# to a file like object to make the pdf libraries happy | ||
f = io.BytesIO(response.get("Body").read()) | ||
|
||
doc = pymupdf.open(stream=f) # open a document | ||
# extract all words from the document pages | ||
pages = [page.get_textpage().extractWORDS() for page in doc] | ||
|
||
|
||
# pdf reader splits words on same line into different lines | ||
# because their vertical position is slightly off (font is different) | ||
# data is also split into columns | ||
# | ||
# therefore: | ||
# define boxes of pdf we're interested in based on columns. For each bbox: | ||
# group words by lines (y positions that are close together) | ||
# reorder words based on x position | ||
def inside(word, bbox): | ||
if word[0] > bbox[0] and word[0] < bbox[2]: | ||
if word[1] > bbox[1] and word[1] < bbox[3]: | ||
return True | ||
return False | ||
|
||
|
||
def process_lines(bbox, lines, page): | ||
box_words = [word for word in page if inside(word, bbox)] | ||
y_pos = [word[1] for word in box_words] | ||
y_pos = list(set(y_pos)) | ||
y_pos.sort() # ordered list of vertical positions of every word | ||
while y_pos: | ||
# find words in same line (positions are close) | ||
same_line = [pos for pos in y_pos if pos - y_pos[0] < 4.0] | ||
words = [word for word in box_words if word[1] in same_line] | ||
# sort by horizontal position (left to right) | ||
words.sort(key=lambda x: x[0]) | ||
line = " ".join([word[4] for word in words]) | ||
lines.append(line) | ||
# remove words in line from working list | ||
y_pos = [pos for pos in y_pos if pos not in same_line] | ||
|
||
|
||
lines = [] | ||
bbox_bounds = [ | ||
(23.0, 130.0, 200.0, 320.0), # col 1 | ||
(205.0, 145.0, 410.0, 320.0), # col 2 | ||
(415.0, 145.0, 600.0, 320.0), # col 3 | ||
(23.0, 365.0, 600.0, 375.0), # result | ||
] | ||
f = io.BytesIO(etl_job.get_src_file()) | ||
|
||
# get lines from each bbox | ||
for bbox in bbox_bounds: | ||
process_lines(bbox, lines, pages[0]) | ||
|
||
# get data from last page | ||
bbox = (23.0, 210.0, 330.0, 330.0) | ||
last_page_lines = [] | ||
process_lines(bbox, last_page_lines, pages[2]) | ||
|
||
# adjust - add key to result, colons | ||
last_page_lines[0] = f"Result: {last_page_lines[0]}" | ||
last_page_lines[1:] = [ | ||
": ".join(line.split(" ", 1)) for line in last_page_lines[1:] | ||
] | ||
|
||
lines = lines + last_page_lines | ||
|
||
# combine multi-line facility and patient address into one line | ||
try: | ||
facility_index = [ | ||
i for i, item in enumerate(lines) if re.search("^Facility:", item) | ||
][0] | ||
ordering_provider_index = [ | ||
i | ||
for i, item in enumerate(lines) | ||
if re.search("^Ordering Provider:", item) | ||
][0] | ||
patient_address_index = [ | ||
i | ||
for i, item in enumerate(lines) | ||
if re.search("^Patient Address:", item) | ||
][0] | ||
event_id_index = [ | ||
i for i, item in enumerate(lines) if re.search("^Event ID:", item) | ||
][0] | ||
except KeyError as err: | ||
# get the report date from the 4th line of the pdf | ||
reader = PdfReader(f) | ||
page = reader.pages[0] | ||
date_reported = page.extract_text().split("\n")[3].strip() | ||
datetime.strptime(date_reported, "%m/%d/%Y") | ||
except ValueError as err: | ||
err_message = ( | ||
f"ERROR - Could not properly read facility report date. " | ||
f"ERROR - Could not properly read sequencing report date. " | ||
f"ETL will continue." | ||
f"{err}" | ||
) | ||
# logger.error(err_message) | ||
raise Exception(err_message) | ||
|
||
lines = ( | ||
lines[0:facility_index] | ||
+ [" ".join(lines[facility_index:ordering_provider_index])] | ||
+ lines[ordering_provider_index:patient_address_index] | ||
+ [" ".join(lines[patient_address_index:event_id_index])] | ||
+ lines[event_id_index:] | ||
) | ||
etl_job.logger.error(err_message) | ||
|
||
# convert to dataframe | ||
dict = {k: [v] for k, v in [line.split(":") for line in lines]} | ||
interim = pd.DataFrame.from_dict(dict) | ||
|
||
# write out the transformed data | ||
with io.StringIO() as csv_buff: | ||
interim.to_csv(csv_buff, index=False) | ||
date_reported = "" | ||
|
||
response = s3_client.put_object( | ||
Bucket=clean_bucket_name, Key=clean_obj_key, Body=csv_buff.getvalue() | ||
try: | ||
# get two tables from the pdf | ||
tables = read_pdf(f, multiple_tables=True, pages=2) | ||
assert isinstance(tables, list) | ||
mlst_st = tables[0] | ||
genes = tables[1] | ||
except (IndexError, KeyError) as err: | ||
err_message = ( | ||
f"ERROR - Could not properly read sequencing PDF tables. " | ||
f"ETL Cannot continue." | ||
f"{err}" | ||
) | ||
|
||
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") | ||
etl_job.logger.error(err_message) | ||
|
||
if status != 200: | ||
err = ( | ||
f"ERROR - Could not write transformed data object {clean_obj_key} " | ||
f"to bucket {clean_bucket_name}. ETL Cannot continue." | ||
) | ||
|
||
logger.error(err) | ||
# NOTE: need to properly handle exception stuff here, and we probably | ||
# want this going somewhere very visible (e.g. SNS topic or a | ||
# perpetual log as someone will need to be made aware) | ||
raise Exception(err_message) | ||
|
||
# NOTE: need to properly handle exception stuff here, and we probably | ||
# want this going somewhere very visible (e.g. SNS topic or a | ||
# perpetual log as someone will need to be made aware) | ||
raise Exception(err) | ||
# filter the columns we need and join the tables together | ||
interim = mlst_st[["Accession_ID", "WGS_ID", "MLST_ST"]] | ||
genes_inter = genes.set_index("Unnamed: 0").T | ||
genes_interim = genes_inter.filter(regex="(NDM|KPC|IMP|OXA|VIM|CMY)", axis=1) | ||
interim = interim.join(genes_interim, on="WGS_ID") | ||
interim["Date Reported"] = date_reported | ||
|
||
logger.info( | ||
f"Transformed {raw_bucket_name}/{alert_obj_key} and wrote result " | ||
f"to {clean_bucket_name}/{clean_obj_key}" | ||
) | ||
# write out the transformed data | ||
with io.StringIO() as csv_buff: | ||
interim.to_csv(csv_buff, index=False) | ||
etl_job.write_sink_file(csv_buff.getvalue(), clean_obj_key) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8 | ||
boto3==1.34.103 | ||
pandas==2.2.2 | ||
pyspark==3.5.1 | ||
pymupdf==1.24.10 | ||
capepy>=2.0.0,<3.0.0 | ||
tabula-py>=2.9.0,<3.0.0 | ||
pypdf>=4.3.0,<5.0.0 |