diff --git a/Lambda_Dockerfile b/Lambda_Dockerfile new file mode 100644 index 0000000..c778beb --- /dev/null +++ b/Lambda_Dockerfile @@ -0,0 +1,12 @@ +FROM --platform=linux/amd64 public.ecr.aws/lambda/python:3.12 + +RUN dnf install -y git + +COPY lambda_requirements.txt ${LAMBDA_TASK_ROOT}/requirements.txt + +RUN pip install -r requirements.txt --target "${LAMBDA_TASK_ROOT}" + +COPY src/ ${LAMBDA_TASK_ROOT} + +# Pass the name of the function handler as an argument to the runtime +CMD [ "regtech_data_validator.lambda_wrapper.lambda_handler" ] \ No newline at end of file diff --git a/SQS_Dockerfile b/SQS_Dockerfile new file mode 100644 index 0000000..0fd4ff5 --- /dev/null +++ b/SQS_Dockerfile @@ -0,0 +1,13 @@ +FROM --platform=linux/amd64 python:3.12 + +WORKDIR /usr/app + +COPY sqs_requirements.txt requirements.txt + +RUN pip install -r requirements.txt + +COPY src/ . + +ENV PYTHONPATH "${PYTHONPATH}:/usr/app" + +CMD python regtech_data_validator/sqs_wrapper.py \ No newline at end of file diff --git a/lambda_requirements.txt b/lambda_requirements.txt new file mode 100644 index 0000000..b1dfd7a --- /dev/null +++ b/lambda_requirements.txt @@ -0,0 +1,12 @@ +polars +awslambdaric +pandera +ujson +boto3 +tabulate +fsspec +s3fs +sqlalchemy +pydantic +psycopg2-binary +pyarrow \ No newline at end of file diff --git a/sqs_requirements.txt b/sqs_requirements.txt new file mode 100644 index 0000000..1afca78 --- /dev/null +++ b/sqs_requirements.txt @@ -0,0 +1,11 @@ +polars +pandera +ujson +boto3 +tabulate +fsspec +s3fs +sqlalchemy +pydantic +psycopg2-binary +pyarrow \ No newline at end of file diff --git a/src/regtech_data_validator/data_formatters.py b/src/regtech_data_validator/data_formatters.py index 476dc7f..47147bb 100644 --- a/src/regtech_data_validator/data_formatters.py +++ b/src/regtech_data_validator/data_formatters.py @@ -210,8 +210,10 @@ def df_to_download( def upload(path: str, content: bytes) -> None: + print(f"Uploading to {path}") bucket = path.split("s3://")[1].split("/")[0] opath = path.split("s3://")[1].replace(bucket + "/", "") + print(f"Bucket: {bucket}, OPath: {opath}, Data: {content}") s3 = boto3.client("s3") s3.put_object( Bucket=bucket, diff --git a/src/regtech_data_validator/model.py b/src/regtech_data_validator/model.py index db8742f..da55c54 100644 --- a/src/regtech_data_validator/model.py +++ b/src/regtech_data_validator/model.py @@ -42,40 +42,40 @@ class SubmissionDAO(Base): def __str__(self): return f"Submission ID: {self.id}, State: {self.state}, Ruleset: {self.validation_ruleset_version}, Filing Period: {self.filing}, Submission: {self.submission_time}" - +#model for finding table class FindingDAO(Base): __tablename__ = "findings" - validation_type: Mapped[str], - validation_id: Mapped[str], - row: Mapped[int], - unique_identifier: Mapped[str], - scope: Mapped[str], - phase: Mapped[str], - submission_id: Mapped[int], - field_1: Mapped[str], - field_2: Mapped[str], - field_3: Mapped[str], - field_4: Mapped[str], - field_5: Mapped[str], - field_6: Mapped[str], - field_7: Mapped[str], - field_8: Mapped[str], - field_9: Mapped[str], - field_10: Mapped[str], - field_11: Mapped[str], - field_12: Mapped[str], - field_13: Mapped[str], - value_1: Mapped[str], - value_2: Mapped[str], - value_3: Mapped[str], - value_4: Mapped[str], - value_5: Mapped[str], - value_6: Mapped[str], - value_7: Mapped[str], - value_8: Mapped[str], - value_9: Mapped[str], - value_10: Mapped[str], - value_11: Mapped[str], - value_12: Mapped[str], - value_13: Mapped[str] - + id: Mapped[int] = mapped_column(index=True, primary_key=True, autoincrement=True) + validation_type: Mapped[str] + validation_id: Mapped[str] + row: Mapped[int] + unique_identifier: Mapped[str] + scope: Mapped[str] + phase: Mapped[str] + submission_id: Mapped[int] + field_1: Mapped[str] + field_2: Mapped[str] = mapped_column(nullable=True) + field_3: Mapped[str] = mapped_column(nullable=True) + field_4: Mapped[str] = mapped_column(nullable=True) + field_5: Mapped[str] = mapped_column(nullable=True) + field_6: Mapped[str] = mapped_column(nullable=True) + field_7: Mapped[str] = mapped_column(nullable=True) + field_8: Mapped[str] = mapped_column(nullable=True) + field_9: Mapped[str] = mapped_column(nullable=True) + field_10: Mapped[str] = mapped_column(nullable=True) + field_11: Mapped[str] = mapped_column(nullable=True) + field_12: Mapped[str] = mapped_column(nullable=True) + field_13: Mapped[str] = mapped_column(nullable=True) + value_1: Mapped[str] + value_2: Mapped[str] = mapped_column(nullable=True) + value_3: Mapped[str] = mapped_column(nullable=True) + value_4: Mapped[str] = mapped_column(nullable=True) + value_5: Mapped[str] = mapped_column(nullable=True) + value_6: Mapped[str] = mapped_column(nullable=True) + value_7: Mapped[str] = mapped_column(nullable=True) + value_8: Mapped[str] = mapped_column(nullable=True) + value_9: Mapped[str] = mapped_column(nullable=True) + value_10: Mapped[str] = mapped_column(nullable=True) + value_11: Mapped[str] = mapped_column(nullable=True) + value_12: Mapped[str] = mapped_column(nullable=True) + value_13: Mapped[str] = mapped_column(nullable=True) diff --git a/src/regtech_data_validator/service.py b/src/regtech_data_validator/service.py index 5e7c7b4..c071c8a 100644 --- a/src/regtech_data_validator/service.py +++ b/src/regtech_data_validator/service.py @@ -4,17 +4,18 @@ import logging import polars as pl -from botocore import ClientError +from botocore.exceptions import ClientError from fsspec import AbstractFileSystem, filesystem from pydantic import PostgresDsn -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker +from sqlalchemy import create_engine, func, select +from sqlalchemy.orm import sessionmaker, Session from urllib import parse +from regtech_data_validator.checks import Severity from regtech_data_validator.validator import validate_batch_csv, ValidationSchemaError -from regtech_data_validator.data_formatters import df_to_download -from regtech_data_validator.model import SubmissionDAO, SubmissionState - +from regtech_data_validator.data_formatters import df_to_download, df_to_dicts +from regtech_data_validator.model import SubmissionDAO, SubmissionState, FindingDAO +from regtech_data_validator.validation_results import ValidationPhase logger = logging.getLogger() logger.setLevel("INFO") @@ -36,8 +37,9 @@ def get_secret(secret_name): def service_validate(bucket, file): - lei = file.split("/")[2] - submission_id = file.split("/")[3].split(".csv")[0] + file_paths = file.split("/") + lei = file_paths[2] + submission_id = file_paths[3].split(".csv")[0] filing_conn = None @@ -50,7 +52,7 @@ def service_validate(bucket, file): try: s3_path = f"{bucket}/{file}" - + print(f"Path to file: {s3_path}") fs: AbstractFileSystem = filesystem("filecache", target_protocol='s3', cache_storage='/tmp/files/') with fs.open(s3_path, "r") as f: final_state = SubmissionState.VALIDATION_SUCCESSFUL @@ -67,11 +69,12 @@ def service_validate(bucket, file): all_findings.append(findings) submission.state = final_state + build_validation_results(submission, filing_conn) filing_conn.commit() if all_findings: final_df = pl.concat(all_findings, how="diagonal") - df_to_download(final_df, path=f"{bucket}/{submission_id}_report.csv") + df_to_download(final_df, path=f"s3://{bucket}/{file_paths[0]}/{file_paths[1]}/{file_paths[2]}/{submission_id}_report.csv") except ValidationSchemaError: logger.exception("The file is malformed.") @@ -105,3 +108,82 @@ def get_filing_db_connection(): ) conn_str = str(postgres_dsn) return create_engine(conn_str) + + +def build_validation_results(submission: SubmissionDAO, session: Session): + phase = get_validation_phase(session, submission.id) + + findings = get_findings(session, submission.id, 200) + + data = [] + for row in findings: + finding_dict = row.__dict__.copy() + finding_dict.pop("_sa_instance_state", None) + data.append(finding_dict) + + val_json = df_to_dicts(pl.DataFrame(data), max_group_size=200) + if phase == ValidationPhase.SYNTACTICAL: + single_errors = (get_field_counts(session, submission.id, Severity.ERROR, "single-field"),) + val_res = { + "syntax_errors": { + "single_field_count": single_errors, + "multi_field_count": 0, # this will always be zero for syntax errors + "register_count": 0, # this will always be zero for syntax errors + "total_count": single_errors, + "details": val_json, + } + } + else: + errors_list = [e for e in val_json if e["validation"]["severity"] == Severity.ERROR] + warnings_list = [w for w in val_json if w["validation"]["severity"] == Severity.WARNING] + val_res = { + "syntax_errors": { + "single_field_count": 0, + "multi_field_count": 0, + "register_count": 0, + "total_count": 0, + "details": [], + }, + "logic_errors": { + "single_field_count": get_field_counts(session, submission.id, Severity.ERROR, "single-field"), + "multi_field_count": get_field_counts(session, submission.id, Severity.ERROR, "multi-field"), + "register_count": get_field_counts(session, submission.id, Severity.ERROR, "register"), + "total_count": get_field_counts(session, submission.id, Severity.ERROR), + "details": errors_list, + }, + "logic_warnings": { + "single_field_count": get_field_counts(session, submission.id, Severity.WARNING, "single-field"), + "multi_field_count": get_field_counts(session, submission.id, Severity.WARNING, "multi-field"), + "register_count": 0, + "total_count": get_field_counts(session, submission.id, Severity.WARNING), + "details": warnings_list, + }, + } + + submission.validation_results = val_res + + +def get_validation_phase(session: Session, submission_id: int): + phase = session.execute(select(FindingDAO.phase).filter(FindingDAO.submission_id == submission_id)) + return phase.scalar() + + +def get_field_counts(session: Session, submission_id: int, severity: Severity, scope: str = None): + query = session.query(func.count(FindingDAO.id)).filter( + FindingDAO.submission_id == submission_id, + FindingDAO.validation_type == severity, + ) + if scope: + query.filter(FindingDAO.scope == scope) + + return session.scalar(query) + + +def get_findings(session, submission_id, max_group_size): + row_number = func.row_number().over(partition_by=FindingDAO.validation_id, order_by=FindingDAO.id).label('row_num') + + subquery = (session.query(FindingDAO.id).add_columns(row_number).filter(FindingDAO.submission_id == submission_id).subquery()) + filtered_subquery = (session.query(subquery.c.id).filter(subquery.c.row_num <= max_group_size)) + query = session.query(FindingDAO).filter(FindingDAO.id.in_(filtered_subquery)) + findings = session.scalars(query).all() + return findings diff --git a/src/regtech_data_validator/sqs_wrapper.py b/src/regtech_data_validator/sqs_wrapper.py index 87252f7..ce13262 100644 --- a/src/regtech_data_validator/sqs_wrapper.py +++ b/src/regtech_data_validator/sqs_wrapper.py @@ -22,7 +22,7 @@ def watch_queue(): MessageAttributeNames=['.*'], MaxNumberOfMessages=1, VisibilityTimeout=1200, - # WaitTimeSeconds=10, + WaitTimeSeconds=20, ) if response and 'Messages' in response: