Skip to content

Commit

Permalink
linter
Browse files Browse the repository at this point in the history
  • Loading branch information
lchen-2101 committed Oct 22, 2024
1 parent a763a99 commit d8b8690
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
1 change: 0 additions & 1 deletion src/regtech_data_validator/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass
from enum import StrEnum
from pathlib import Path
from regtech_data_validator.data_formatters import df_to_csv, df_to_str, df_to_json, df_to_table, df_to_download
from typing import Annotated, Optional

Expand Down
21 changes: 16 additions & 5 deletions src/regtech_data_validator/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def _add_validation_metadata(failed_check_fields_df: pl.DataFrame, check: SBLChe
return validation_fields_df


def validate(schema: pa.DataFrameSchema, submission_df: pl.DataFrame, row_start: int, process_errors: bool) -> ValidationResults:
def validate(
schema: pa.DataFrameSchema, submission_df: pl.DataFrame, row_start: int, process_errors: bool
) -> ValidationResults:
"""
validate received dataframe with schema and return list of
schema errors
Expand Down Expand Up @@ -167,6 +169,7 @@ def validate_batch(
for validation_results in validate_func(path, context, batch_size, batch_count, max_errors):
yield validation_results


def validate_batch_parquet(
path: Path | str,
context: dict[str, str] | None = None,
Expand All @@ -191,7 +194,7 @@ def validate_batch_parquet(
'aws_access_key_id': creds.access_key,
'aws_secret_access_key': creds.secret_key,
'session_token': creds.token,
'aws_region': 'us-east-1'
'aws_region': 'us-east-1',
}

lf = pl.scan_parquet(path, allow_missing_columns=True, storage_options=storage_options)
Expand Down Expand Up @@ -226,6 +229,7 @@ def validate_batch_csv(
):
from datetime import datetime
import psutil

start = datetime.now()
has_syntax_errors = False
real_path = get_real_file_path(path)
Expand Down Expand Up @@ -257,7 +261,7 @@ def validate_batch_csv(

if os.path.isdir("/tmp/s3"):
shutil.rmtree("/tmp/s3")

print(f"Total time: {(datetime.now() - start).total_seconds()} seconds")
print(f"Total Memory: {psutil.Process(os.getpid()).memory_info().rss / (1024*1024)}MB")

Expand Down Expand Up @@ -287,22 +291,28 @@ def validate_chunks(schema, path, batch_size, batch_count, max_errors, checks):
row_start = 0
while batches:
df = pl.concat(batches)
validation_results, total_count, process_errors = validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks)
validation_results, total_count, process_errors = validate_chunk(
schema, df, total_count, row_start, max_errors, process_errors, checks
)
row_start += df.height
batches = reader.next_batches(batch_count)
yield validation_results, df["uid"].to_list()


def validate_lazy_chunks(schema, lf: pl.LazyFrame, batch_size: int, batch_count, max_errors, checks):
process_errors = True
total_count = 0
row_start = 0
df = lf.slice(row_start, batch_size).collect()
while df.height:
validation_results, total_count, process_errors = validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks)
validation_results, total_count, process_errors = validate_chunk(
schema, df, total_count, row_start, max_errors, process_errors, checks
)
row_start += df.height
yield validation_results, df["uid"].to_list()
df = lf.slice(row_start, batch_size).collect()


def validate_chunk(schema, df, total_count, row_start, max_errors, process_errors, checks):
validation_results = validate(schema, df, row_start, process_errors)
if not validation_results.findings.is_empty():
Expand All @@ -316,6 +326,7 @@ def validate_chunk(schema, df, total_count, row_start, max_errors, process_error
validation_results.findings = validation_results.findings.head(head_count)
return validation_results, total_count, process_errors


def get_real_file_path(path):
path = str(path)
if path.startswith("s3://"):
Expand Down

0 comments on commit d8b8690

Please sign in to comment.