Skip to content

Commit

Permalink
INitial Lambda and SQS data validator
Browse files Browse the repository at this point in the history
  • Loading branch information
jcadam14 committed Oct 9, 2024
1 parent 18a453d commit 20346dc
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 46 deletions.
12 changes: 12 additions & 0 deletions Lambda_Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM --platform=linux/amd64 public.ecr.aws/lambda/python:3.12

RUN dnf install -y git

COPY lambda_requirements.txt ${LAMBDA_TASK_ROOT}/requirements.txt

RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"

COPY src/ ${LAMBDA_TASK_ROOT}

# Pass the name of the function handler as an argument to the runtime
CMD [ "regtech_data_validator.lambda_wrapper.lambda_handler" ]
13 changes: 13 additions & 0 deletions SQS_Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM --platform=linux/amd64 python:3.12

WORKDIR /usr/app

COPY sqs_requirements.txt requirements.txt

RUN pip install -r requirements.txt

COPY src/ .

ENV PYTHONPATH "${PYTHONPATH}:/usr/app"

CMD python regtech_data_validator/sqs_wrapper.py
12 changes: 12 additions & 0 deletions lambda_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
polars
awslambdaric
pandera
ujson
boto3
tabulate
fsspec
s3fs
sqlalchemy
pydantic
psycopg2-binary
pyarrow
11 changes: 11 additions & 0 deletions sqs_requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
polars
pandera
ujson
boto3
tabulate
fsspec
s3fs
sqlalchemy
pydantic
psycopg2-binary
pyarrow
2 changes: 2 additions & 0 deletions src/regtech_data_validator/data_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,10 @@ def df_to_download(


def upload(path: str, content: bytes) -> None:
print(f"Uploading to {path}")
bucket = path.split("s3://")[1].split("/")[0]
opath = path.split("s3://")[1].replace(bucket + "/", "")
print(f"Bucket: {bucket}, OPath: {opath}, Data: {content}")
s3 = boto3.client("s3")
s3.put_object(
Bucket=bucket,
Expand Down
70 changes: 35 additions & 35 deletions src/regtech_data_validator/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,40 @@ class SubmissionDAO(Base):
def __str__(self):
return f"Submission ID: {self.id}, State: {self.state}, Ruleset: {self.validation_ruleset_version}, Filing Period: {self.filing}, Submission: {self.submission_time}"


#model for finding table
class FindingDAO(Base):
__tablename__ = "findings"
validation_type: Mapped[str],
validation_id: Mapped[str],
row: Mapped[int],
unique_identifier: Mapped[str],
scope: Mapped[str],
phase: Mapped[str],
submission_id: Mapped[int],
field_1: Mapped[str],
field_2: Mapped[str],
field_3: Mapped[str],
field_4: Mapped[str],
field_5: Mapped[str],
field_6: Mapped[str],
field_7: Mapped[str],
field_8: Mapped[str],
field_9: Mapped[str],
field_10: Mapped[str],
field_11: Mapped[str],
field_12: Mapped[str],
field_13: Mapped[str],
value_1: Mapped[str],
value_2: Mapped[str],
value_3: Mapped[str],
value_4: Mapped[str],
value_5: Mapped[str],
value_6: Mapped[str],
value_7: Mapped[str],
value_8: Mapped[str],
value_9: Mapped[str],
value_10: Mapped[str],
value_11: Mapped[str],
value_12: Mapped[str],
value_13: Mapped[str]

id: Mapped[int] = mapped_column(index=True, primary_key=True, autoincrement=True)
validation_type: Mapped[str]
validation_id: Mapped[str]
row: Mapped[int]
unique_identifier: Mapped[str]
scope: Mapped[str]
phase: Mapped[str]
submission_id: Mapped[int]
field_1: Mapped[str]
field_2: Mapped[str] = mapped_column(nullable=True)
field_3: Mapped[str] = mapped_column(nullable=True)
field_4: Mapped[str] = mapped_column(nullable=True)
field_5: Mapped[str] = mapped_column(nullable=True)
field_6: Mapped[str] = mapped_column(nullable=True)
field_7: Mapped[str] = mapped_column(nullable=True)
field_8: Mapped[str] = mapped_column(nullable=True)
field_9: Mapped[str] = mapped_column(nullable=True)
field_10: Mapped[str] = mapped_column(nullable=True)
field_11: Mapped[str] = mapped_column(nullable=True)
field_12: Mapped[str] = mapped_column(nullable=True)
field_13: Mapped[str] = mapped_column(nullable=True)
value_1: Mapped[str]
value_2: Mapped[str] = mapped_column(nullable=True)
value_3: Mapped[str] = mapped_column(nullable=True)
value_4: Mapped[str] = mapped_column(nullable=True)
value_5: Mapped[str] = mapped_column(nullable=True)
value_6: Mapped[str] = mapped_column(nullable=True)
value_7: Mapped[str] = mapped_column(nullable=True)
value_8: Mapped[str] = mapped_column(nullable=True)
value_9: Mapped[str] = mapped_column(nullable=True)
value_10: Mapped[str] = mapped_column(nullable=True)
value_11: Mapped[str] = mapped_column(nullable=True)
value_12: Mapped[str] = mapped_column(nullable=True)
value_13: Mapped[str] = mapped_column(nullable=True)
102 changes: 92 additions & 10 deletions src/regtech_data_validator/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import logging
import polars as pl

from botocore import ClientError
from botocore.exceptions import ClientError
from fsspec import AbstractFileSystem, filesystem
from pydantic import PostgresDsn
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, func, select
from sqlalchemy.orm import sessionmaker, Session
from urllib import parse

from regtech_data_validator.checks import Severity
from regtech_data_validator.validator import validate_batch_csv, ValidationSchemaError
from regtech_data_validator.data_formatters import df_to_download
from regtech_data_validator.model import SubmissionDAO, SubmissionState

from regtech_data_validator.data_formatters import df_to_download, df_to_dicts
from regtech_data_validator.model import SubmissionDAO, SubmissionState, FindingDAO
from regtech_data_validator.validation_results import ValidationPhase

logger = logging.getLogger()
logger.setLevel("INFO")
Expand All @@ -36,8 +37,9 @@ def get_secret(secret_name):


def service_validate(bucket, file):
lei = file.split("/")[2]
submission_id = file.split("/")[3].split(".csv")[0]
file_paths = file.split("/")
lei = file_paths[2]
submission_id = file_paths[3].split(".csv")[0]

filing_conn = None

Expand All @@ -50,7 +52,7 @@ def service_validate(bucket, file):

try:
s3_path = f"{bucket}/{file}"

print(f"Path to file: {s3_path}")
fs: AbstractFileSystem = filesystem("filecache", target_protocol='s3', cache_storage='/tmp/files/')
with fs.open(s3_path, "r") as f:
final_state = SubmissionState.VALIDATION_SUCCESSFUL
Expand All @@ -67,11 +69,12 @@ def service_validate(bucket, file):
all_findings.append(findings)

submission.state = final_state
build_validation_results(submission, filing_conn)
filing_conn.commit()

if all_findings:
final_df = pl.concat(all_findings, how="diagonal")
df_to_download(final_df, path=f"{bucket}/{submission_id}_report.csv")
df_to_download(final_df, path=f"s3://{bucket}/{file_paths[0]}/{file_paths[1]}/{file_paths[2]}/{submission_id}_report.csv")

except ValidationSchemaError:
logger.exception("The file is malformed.")
Expand Down Expand Up @@ -105,3 +108,82 @@ def get_filing_db_connection():
)
conn_str = str(postgres_dsn)
return create_engine(conn_str)


def build_validation_results(submission: SubmissionDAO, session: Session):
phase = get_validation_phase(session, submission.id)

findings = get_findings(session, submission.id, 200)

data = []
for row in findings:
finding_dict = row.__dict__.copy()
finding_dict.pop("_sa_instance_state", None)
data.append(finding_dict)

val_json = df_to_dicts(pl.DataFrame(data), max_group_size=200)
if phase == ValidationPhase.SYNTACTICAL:
single_errors = (get_field_counts(session, submission.id, Severity.ERROR, "single-field"),)
val_res = {
"syntax_errors": {
"single_field_count": single_errors,
"multi_field_count": 0, # this will always be zero for syntax errors
"register_count": 0, # this will always be zero for syntax errors
"total_count": single_errors,
"details": val_json,
}
}
else:
errors_list = [e for e in val_json if e["validation"]["severity"] == Severity.ERROR]
warnings_list = [w for w in val_json if w["validation"]["severity"] == Severity.WARNING]
val_res = {
"syntax_errors": {
"single_field_count": 0,
"multi_field_count": 0,
"register_count": 0,
"total_count": 0,
"details": [],
},
"logic_errors": {
"single_field_count": get_field_counts(session, submission.id, Severity.ERROR, "single-field"),
"multi_field_count": get_field_counts(session, submission.id, Severity.ERROR, "multi-field"),
"register_count": get_field_counts(session, submission.id, Severity.ERROR, "register"),
"total_count": get_field_counts(session, submission.id, Severity.ERROR),
"details": errors_list,
},
"logic_warnings": {
"single_field_count": get_field_counts(session, submission.id, Severity.WARNING, "single-field"),
"multi_field_count": get_field_counts(session, submission.id, Severity.WARNING, "multi-field"),
"register_count": 0,
"total_count": get_field_counts(session, submission.id, Severity.WARNING),
"details": warnings_list,
},
}

submission.validation_results = val_res


def get_validation_phase(session: Session, submission_id: int):
phase = session.execute(select(FindingDAO.phase).filter(FindingDAO.submission_id == submission_id))
return phase.scalar()


def get_field_counts(session: Session, submission_id: int, severity: Severity, scope: str = None):
query = session.query(func.count(FindingDAO.id)).filter(
FindingDAO.submission_id == submission_id,
FindingDAO.validation_type == severity,
)
if scope:
query.filter(FindingDAO.scope == scope)

return session.scalar(query)


def get_findings(session, submission_id, max_group_size):
row_number = func.row_number().over(partition_by=FindingDAO.validation_id, order_by=FindingDAO.id).label('row_num')

subquery = (session.query(FindingDAO.id).add_columns(row_number).filter(FindingDAO.submission_id == submission_id).subquery())
filtered_subquery = (session.query(subquery.c.id).filter(subquery.c.row_num <= max_group_size))
query = session.query(FindingDAO).filter(FindingDAO.id.in_(filtered_subquery))
findings = session.scalars(query).all()
return findings
2 changes: 1 addition & 1 deletion src/regtech_data_validator/sqs_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def watch_queue():
MessageAttributeNames=['.*'],
MaxNumberOfMessages=1,
VisibilityTimeout=1200,
# WaitTimeSeconds=10,
WaitTimeSeconds=20,
)

if response and 'Messages' in response:
Expand Down

0 comments on commit 20346dc

Please sign in to comment.