Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 to lambda tests #252

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
888 changes: 705 additions & 183 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pandera = "^0.20.3"
requests = "^2.32.3"
tabulate = "^0.9.0"
ujson = "^5.9.0"
polars = "^0.20.31"
psutil = "^5.9.8"
matplotlib = "^3.9.0"

[tool.poetry.group.dev.dependencies]
pytest = "8.3.2"
Expand Down
627 changes: 259 additions & 368 deletions src/regtech_data_validator/check_functions.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/regtech_data_validator/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from pandera import Check
from pandera.backends.base import BaseCheckBackend
from pandera.backends.pandas.checks import PandasCheckBackend
from pandera.backends.polars.checks import PolarsCheckBackend


class Severity(StrEnum):
Expand Down Expand Up @@ -56,4 +56,4 @@ def __init__(
@classmethod
def get_backend(cls, check_obj: Any) -> Type[BaseCheckBackend]:
"""Assume Pandas DataFrame and return PandasCheckBackend"""
return PandasCheckBackend
return PolarsCheckBackend
80 changes: 41 additions & 39 deletions src/regtech_data_validator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
from regtech_data_validator.data_formatters import df_to_csv, df_to_str, df_to_json, df_to_table, df_to_download
from typing import Annotated, Optional

import pandas as pd
import polars as pl
import typer
import typer.core

from regtech_data_validator.create_schemas import validate_phases
from regtech_data_validator.validator import validate_batch_csv
from regtech_data_validator.validation_results import ValidationPhase

# Need to do this because the latest version of typer, if the rich package exists
# will create a Panel with borders in the error output. This causes stderr during
Expand Down Expand Up @@ -37,11 +38,11 @@ def parse_key_value(kv_str: str) -> KeyValueOpt:


class OutputFormat(StrEnum):
CSV = 'csv'
JSON = 'json'
PANDAS = 'pandas'
POLARS = 'polars'
TABLE = 'table'
DOWNLOAD = 'download'
CSV = 'csv'


@app.command()
Expand Down Expand Up @@ -76,50 +77,51 @@ def validate(
),
] = None,
output: Annotated[Optional[OutputFormat], typer.Option()] = OutputFormat.TABLE,
) -> tuple[bool, pd.DataFrame]:
) -> tuple[bool, pl.DataFrame]:
"""
Validate CFPB data submission
"""
context_dict = {x.key: x.value for x in context} if context else {}
input_df = None
try:
input_df = pd.read_csv(path, dtype=str, na_filter=False)
except Exception as e:
raise RuntimeError(e)
validation_results = validate_phases(input_df, context_dict)

status = 'SUCCESS'
no_of_findings = 0
total_errors = 0
findings_df = pd.DataFrame()
if not validation_results.is_valid:
status = 'FAILURE'
findings_df = validation_results.findings
no_of_findings = len(findings_df.index.unique())
warning_count = validation_results.warning_counts.total_count
error_count = validation_results.error_counts.total_count

match output:
case OutputFormat.PANDAS:
print(df_to_str(findings_df))
case OutputFormat.CSV:
print(df_to_csv(findings_df))
case OutputFormat.JSON:
print(df_to_json(findings_df))
case OutputFormat.TABLE:
print(df_to_table(findings_df))
case OutputFormat.DOWNLOAD:
print(df_to_download(findings_df, warning_count, error_count))
case _:
raise ValueError(f'output format "{output}" not supported')

total_findings = 0
final_phase = ValidationPhase.LOGICAL
all_findings = []
final_df = pl.DataFrame()

for findings, phase in validate_batch_csv(path, context_dict, batch_size=50000, batch_count=5):
total_findings += findings.height
final_phase = phase
with pl.Config(tbl_width_chars=0, tbl_rows=-1, tbl_cols=-1):
print(f"Findings: {findings}")
# persist findings to datastore
all_findings.append(findings)

if all_findings:
final_df = pl.concat(all_findings, how="diagonal")
status = "SUCCESS" if total_findings == 0 else "FAILURE"

match output:
case OutputFormat.CSV:
print(df_to_csv(final_df))
case OutputFormat.POLARS:
print(df_to_str(final_df))
case OutputFormat.JSON:
print(df_to_json(final_df))
case OutputFormat.TABLE:
print(df_to_table(final_df))
case OutputFormat.DOWNLOAD:
# uses streaming sink_csv, which doesn't print out
# to a string to save memory
df_to_download(final_df)
case _:
raise ValueError(f'output format "{output}" not supported')

typer.echo(
f"status: {status}, total errors: {total_errors}, findings: {no_of_findings}, validation phase: {validation_results.phase}",
f"Status: {status}, Total Errors: {total_findings}, Validation Phase: {final_phase}",
err=True,
)

# returned values are only used in unit tests
return validation_results.is_valid, findings_df
return (status, final_df)


if __name__ == '__main__':
Expand Down
255 changes: 0 additions & 255 deletions src/regtech_data_validator/create_schemas.py

This file was deleted.

Loading
Loading