Skip to content

Commit

Permalink
Optimize the time processing when reading the csv files
Browse files Browse the repository at this point in the history
  • Loading branch information
ChronoBoot committed Jan 17, 2024
1 parent b99da33 commit 2e1805a
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 17 deletions.
9 changes: 7 additions & 2 deletions backend/src/data_processing/simple_load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from dotenv import load_dotenv
import requests
from backend.src.data_processing.load_data_abc import LoadData
from azure.storage.blob import BlobServiceClient
import os
from backend.utils.profiling_utils import conditional_profile

class SimpleLoadData(LoadData):
"""
Expand All @@ -27,14 +27,19 @@ class SimpleLoadData(LoadData):
'https://loanscoringgit.blob.core.windows.net/blob-csv-files/previous_application.csv',
]

def __init__(self) -> None:
logging.basicConfig(level=logging.DEBUG)
logging.debug("SimpleLoadData initialized")

@conditional_profile
def download_file(self, url: str, filepath: str) -> None:
with requests.get(url, stream=True) as r:
r.raise_for_status() # This will check for any HTTP errors
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)


@conditional_profile
def load(self, file_urls: list, download_path: str) -> None:
"""
Load data from Azure Blob Storage and save it to a local directory.
Expand Down
31 changes: 28 additions & 3 deletions backend/src/data_processing/simple_read_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import pandas as pd
from backend.src.data_processing.read_data_abc import ReadDataABC
from backend.utils.profiling_utils import conditional_profile

class SimpleReadData(ReadDataABC):
"""
Expand All @@ -11,6 +12,13 @@ class SimpleReadData(ReadDataABC):
This class provides a simple way to read data from CSV files.
"""

CHUNK_SIZE = 10000

def __init__(self) -> None:
logging.basicConfig(level=logging.DEBUG)
logging.debug("SimpleReadData initialized")

@conditional_profile
def one_hot_encode(self, data: pd.DataFrame, columns_names: list) -> pd.DataFrame:
"""
One-hot encode the categorical columns in the data.
Expand All @@ -30,6 +38,7 @@ def one_hot_encode(self, data: pd.DataFrame, columns_names: list) -> pd.DataFram

return one_hot_encoded_data

@conditional_profile
def update_aggregation_dict(self, data: pd.DataFrame, aggregation_dict: dict, prefixes: list) -> None:
"""
Update the aggregation dictionary with a count for each column starting with the prefixes.
Expand All @@ -46,6 +55,7 @@ def update_aggregation_dict(self, data: pd.DataFrame, aggregation_dict: dict, pr

logging.debug("Aggregation dictionary updated")

@conditional_profile
def flatten_and_reset_index(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Flatten the MultiIndex columns and reset the index.
Expand All @@ -61,6 +71,7 @@ def flatten_and_reset_index(self, data: pd.DataFrame) -> pd.DataFrame:

logging.debug("Data flattened and index reset")

@conditional_profile
def aggregate_data(self, data: pd.DataFrame, aggregation_dict: dict, prefixes: list, groupby_col: str) -> pd.DataFrame:
"""
Aggregate the data.
Expand All @@ -84,6 +95,7 @@ def aggregate_data(self, data: pd.DataFrame, aggregation_dict: dict, prefixes: l

return aggregated_data

@conditional_profile
def get_aggregated_bureau_data(self, bureau_data: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate the data from bureau_balance.
Expand Down Expand Up @@ -141,6 +153,7 @@ def get_aggregated_bureau_data(self, bureau_data: pd.DataFrame) -> pd.DataFrame:

return aggregated_bureau_data

@conditional_profile
def get_aggregated_credit_card_balance_data(self, credit_card_balance_data: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate the data from credit_card_balance.
Expand Down Expand Up @@ -184,6 +197,7 @@ def get_aggregated_credit_card_balance_data(self, credit_card_balance_data: pd.D

return aggregated_credit_card_balance_data

@conditional_profile
def get_aggregated_installments_payments_data(self, installments_payments_data: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate the data from installments_payments.
Expand All @@ -210,6 +224,7 @@ def get_aggregated_installments_payments_data(self, installments_payments_data:

return aggregated_installments_payments_data

@conditional_profile
def get_aggregated_previous_application_data(self, previous_application_data: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate the data from previous_application.
Expand Down Expand Up @@ -272,6 +287,7 @@ def get_aggregated_previous_application_data(self, previous_application_data: pd

return aggregated_previous_application_data

@conditional_profile
def get_aggregated_pos_cash_balance_data(self, pos_cash_balance_data: pd.DataFrame) -> pd.DataFrame:
"""
Aggregate the data from pos_cash_balance.
Expand Down Expand Up @@ -300,6 +316,7 @@ def get_aggregated_pos_cash_balance_data(self, pos_cash_balance_data: pd.DataFra

return aggregated_pos_cash_balance_data

@conditional_profile
def retrieve_data(self, files_path: str, sampling_frequency: int, training : bool = True) -> pd.DataFrame:
"""
Read data from a list of CSV files.
Expand Down Expand Up @@ -332,17 +349,23 @@ def retrieve_data(self, files_path: str, sampling_frequency: int, training : boo
ReadDataABC.POS_CASH_BALANCE_NAME: self.get_aggregated_pos_cash_balance_data
}


for file_name, aggregation_method in data_files.items():
temp_data = pd.read_csv(f"{files_path}/{file_name}")
temp_data = temp_data[temp_data['SK_ID_CURR'].isin(data['SK_ID_CURR'])]
aggregated_data = aggregation_method(temp_data)
filtered_data = pd.DataFrame()

for chunk in pd.read_csv(f"{files_path}/{file_name}", chunksize=SimpleReadData.CHUNK_SIZE):
filtered_chunk = chunk[chunk['SK_ID_CURR'].isin(data['SK_ID_CURR'])]
filtered_data = pd.concat([filtered_data, filtered_chunk])

aggregated_data = aggregation_method(filtered_data)
data = pd.merge(data, aggregated_data, on="SK_ID_CURR", how="outer")

logging.debug("Data retrieved")

# Concatenate all the data into a single DataFrame
return data

@conditional_profile
def write_data(self, files_path : str, filename: str, sampling_frequency: int = 1, training : bool = True):
"""
Write the data for the model.
Expand All @@ -360,6 +383,7 @@ def write_data(self, files_path : str, filename: str, sampling_frequency: int =

logging.debug("Data written")

@conditional_profile
def read_data(self, file_path: str, filename: str):
"""
Read data from a CSV file.
Expand All @@ -382,6 +406,7 @@ def read_data(self, file_path: str, filename: str):
# Return the data
return data

@conditional_profile
def write_data_structure_json(self, data: pd.DataFrame, file_path: str, filename: str):
"""
Write the data structure to a JSON file.
Expand Down
26 changes: 25 additions & 1 deletion backend/src/main.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
import os
from dotenv import load_dotenv
from flask import Flask, request, jsonify
from backend.src.data_processing.simple_load_data import SimpleLoadData
from backend.src.data_processing.simple_read_data import SimpleReadData
from backend.src.models.random_forest_loan_predictor import RandomForestLoanPredictor
import backend.utils.profiling_utils as profiling_utils
import pandas as pd
import logging

load_dotenv()

app = Flask(__name__)
predictor = RandomForestLoanPredictor()
loader = SimpleLoadData()
reader = SimpleReadData()
logging.basicConfig(level=logging.DEBUG)
app.logger = logging.getLogger(__name__)
app.logger.addHandler(logging.StreamHandler())
app.logger.setLevel(logging.DEBUG)

DEBUG_MODE = bool(os.getenv('DEBUG_MODE', False))

FILES_FOLDER = 'data'
DATA_FILE_MODEL = 'data_for_model.csv'
Expand All @@ -18,6 +29,7 @@

@app.route('/test', methods=['GET'])
def test():
app.logger.info('Hello World!')
return jsonify({'message': 'Hello World!'}), 200

@app.route('/train', methods=['POST'])
Expand All @@ -31,46 +43,58 @@ def train():

loans = reader.read_data(FILES_FOLDER, DATA_FILE_MODEL)
predictor.train(loans, target_variable)

app.logger.info('Model trained successfully')
return jsonify({'message': 'Model trained successfully'}), 200

@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
loan = pd.DataFrame(data['loan'], index=[0])
prediction = predictor.predict(loan)

app.logger.info(f'Predicted outcome for loan: {prediction}')
return jsonify({'prediction': prediction}), 200

@app.route('/evaluate', methods=['GET'])
def evaluate():
accuracy = predictor.evaluate()
app.logger.info(f'Model evaluated with accuracy: {accuracy}')
return jsonify({'accuracy': accuracy}), 200

@app.route('/most_important_features', methods=['POST'])
def most_important_features():
data = request.get_json()
nb_features = data['nb_features']
features = predictor.get_most_important_features(nb_features)
app.logger.info(f'Most important features: {features}')
return jsonify({'features': features.to_dict()}), 200

@app.route('/write_model_data', methods=['GET'])
def write_model_data(frequency: int):
reader.write_data(FILES_FOLDER, DATA_FILE_MODEL, frequency)
app.logger.info('Model data written successfully')
return jsonify({'message': 'Model data written successfully'}), 200

@app.route('/generate_structure', methods=['GET'])
def generate_structure():
data = reader.read_data(FILES_FOLDER, DATA_FILE_MODEL)
reader.write_data_structure_json(data, COMMON_STRUCTURE_PATH, JSON_FILE_STRUCTURE)
app.logger.info('Structure generated successfully')
return jsonify({'message': 'Structure generated successfully'}), 200

@app.route('/get_loan_example', methods=['GET'])
def get_loan_example():
reader.write_data(FILES_FOLDER, DATA_FILE_TESTING, 10000, False)
loan_example = reader.read_data(FILES_FOLDER, DATA_FILE_TESTING)
loan_json = loan_example.to_json(orient='records')
app.logger.info('Loan example retrieved successfully')
return jsonify({'loan_example': loan_json}), 200

if __name__ == '__main__':
if profiling_utils.is_profiling_enabled():
app.logger.info('Profiling enabled')

port = int(os.environ.get("PORT", 10000))
host = os.getenv("HOST", '0.0.0.0')
app.run(debug=True, host=host, port=port)
app.run(debug=DEBUG_MODE, host=host, port=port)
7 changes: 7 additions & 0 deletions backend/src/models/random_forest_loan_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from backend.utils.profiling_utils import conditional_profile

import pandas as pd

Expand All @@ -25,8 +26,10 @@ def __init__(self) -> None:
self.y_test = None
self.random_state = 42
self.test_size = 0.2
logging.basicConfig(level=logging.DEBUG)
logging.debug("RandomForestLoanPredictor initialized")

@conditional_profile
def preprocess_data(self, X: pd.DataFrame) -> pd.DataFrame:
"""
Preprocess the data by encoding categorical variables and filling NaN values.
Expand Down Expand Up @@ -55,6 +58,7 @@ def preprocess_data(self, X: pd.DataFrame) -> pd.DataFrame:

return new_data

@conditional_profile
def train(self, loans: pd.DataFrame, target_variable: str) -> None:
"""
Train the predictor on a DataFrame of loans.
Expand All @@ -80,6 +84,7 @@ def train(self, loans: pd.DataFrame, target_variable: str) -> None:
except Exception as e:
logging.error(f"Failed to train the model: {e}")

@conditional_profile
def evaluate(self) -> float:
"""
Evaluate the performance of the predictor.
Expand All @@ -95,6 +100,7 @@ def evaluate(self) -> float:
except Exception as e:
logging.error(f"Failed to evaluate the model: {e}")

@conditional_profile
def predict(self, loan: pd.DataFrame) -> int:
"""
Predict the outcome for a loan.
Expand All @@ -120,6 +126,7 @@ def predict(self, loan: pd.DataFrame) -> int:
except Exception as e:
logging.error(f"Failed to predict the outcome for the loan: {e}")

@conditional_profile
def get_most_important_features(self, nb_features : int) -> pd.DataFrame:
"""
Get the most important features from the model.
Expand Down
25 changes: 24 additions & 1 deletion backend/tests/data_processing/test_simple_read_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import patch
import pandas as pd
import numpy as np
from backend.src.data_processing.read_data_abc import ReadDataABC
from backend.src.data_processing.simple_read_data import SimpleReadData

class TestSimpleReadData(unittest.TestCase):
Expand Down Expand Up @@ -475,19 +476,41 @@ def test_get_aggregated_pos_cash_balance_data(self):
})
pd.testing.assert_frame_equal(result, expected_result)



@patch('backend.src.data_processing.simple_read_data.SimpleReadData.CHUNK_SIZE', 1)
@patch('pandas.read_csv')
@patch('backend.src.data_processing.simple_read_data.SimpleReadData.get_aggregated_bureau_data')
@patch('backend.src.data_processing.simple_read_data.SimpleReadData.get_aggregated_credit_card_balance_data')
@patch('backend.src.data_processing.simple_read_data.SimpleReadData.get_aggregated_installments_payments_data')
@patch('backend.src.data_processing.simple_read_data.SimpleReadData.get_aggregated_previous_application_data')
@patch('backend.src.data_processing.simple_read_data.SimpleReadData.get_aggregated_pos_cash_balance_data')
def test_retrieve_data_concat(self, mock_pos, mock_previous, mock_installments, mock_credit, mock_bureau, mock_read_csv):

# Create either a mock DataFrame or a mock iterator of chunks to return from pd.read_csv
def read_csv_side_effect(filepath, *args, **kwargs):
filepaths = [f"mock_path/{file_name}" for file_name in ReadDataABC.FILES_NAMES if file_name != ReadDataABC.APPLICATION_TRAIN_NAME]

if filepath in filepaths:
mock_chunk = pd.DataFrame({
'SK_ID_CURR': [1, 2, 3],
'DATA': ['A', 'B', 'C']
})
return iter([mock_chunk])
else :
# Create a mock DataFrame to return from pd.read_csv
mock_df = pd.DataFrame({
'SK_ID_CURR': [1, 2, 3],
'DATA': ['A', 'B', 'C']
})
return mock_df

# Create a mock DataFrame to return from pd.read_csv
mock_df = pd.DataFrame({
'SK_ID_CURR': [1, 2, 3],
'DATA': ['A', 'B', 'C']
})
mock_read_csv.return_value = mock_df
mock_read_csv.side_effect = read_csv_side_effect

# Create a mock DataFrame to return from the get_aggregated_* methods
mock_aggregated_bureau = pd.DataFrame({
Expand Down
Empty file added backend/utils/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions backend/utils/profiling_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import os
from dotenv import load_dotenv
import memory_profiler

load_dotenv()


def is_profiling_enabled():
return bool(os.getenv('ENABLE_PROFILING', False))

def conditional_profile(func):
if is_profiling_enabled():
return memory_profiler.profile(func)
else:
return func
2 changes: 1 addition & 1 deletion frontend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _main(FREQUENCY : int):
loan_example = json.loads(loan_example)[0]

# Display the user interface
user_interface = DashUserInterface(categorical_columns, numerical_columns, loan_example, field_descriptions)
user_interface = DashUserInterface(categorical_columns, numerical_columns, loan_example, field_descriptions, PREDICT_URL)
logging.info("User interface displayed")
user_interface.display()

Expand Down
Loading

0 comments on commit 2e1805a

Please sign in to comment.