diff --git a/.github/workflows/dependabot-auto-approve.yml b/.github/workflows/dependabot-auto-approve.yml index 193b5af7..2ddba355 100644 --- a/.github/workflows/dependabot-auto-approve.yml +++ b/.github/workflows/dependabot-auto-approve.yml @@ -11,7 +11,7 @@ jobs: steps: - name: Dependabot metadata id: metadata - uses: dependabot/fetch-metadata@4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d + uses: dependabot/fetch-metadata@@v2 with: github-token: "${{ secrets.GITHUB_TOKEN }}" - name: Approve a PR diff --git a/.github/workflows/deploy-main.yml b/.github/workflows/deploy-main.yml index 8f9cd2de..475cc6f1 100644 --- a/.github/workflows/deploy-main.yml +++ b/.github/workflows/deploy-main.yml @@ -74,3 +74,12 @@ jobs: SCW_ZONE: ${{ secrets.SCW_ZONE }} with: args: jobs definition update ${{ secrets.SCALEWAY_JOB_IMPORT_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/mediatree_import:${{ env.PROJECT_VERSION }} + - name: update scaleway job definition with version s3 + uses: jawher/action-scw@v2.34.0 + env: + SCW_ACCESS_KEY: ${{ secrets.SCW_ACCESS_KEY }} + SCW_SECRET_KEY: ${{ secrets.SCW_SECRET_KEY }} + SCW_ORGANIZATION_ID: ${{ secrets.SCW_ORGANIZATION_ID }} + SCW_ZONE: ${{ secrets.SCW_ZONE }} + with: + args: jobs definition update ${{ secrets.SCALEWAY_JOB_S3_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/s3:${{ env.PROJECT_VERSION }} diff --git a/.gitignore b/.gitignore index 8504936e..23b6ad51 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ secrets/pwd_api.txt secrets/username_api.txt secrets/* +s3/* documents-experts/ cc-bio.json *.xlsx diff --git a/docker-compose.yml b/docker-compose.yml index c864bff5..af005a4e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -211,14 +211,14 @@ services: MEDIATREE_AUTH_URL: https://keywords.mediatree.fr/api/auth/token/ KEYWORDS_URL: https://keywords.mediatree.fr/api/subtitle/ # https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList MODIN_ENGINE: ray - MODIN_CPUS: 4 # "https://modin.readthedocs.io/en/0.11.0/using_modin.html#reducing-or-limiting-the-resources-modin-can-use" - MODIN_MEMORY: 1000000000 # 1Gb - RAY_memory_usage_threshold: 1 - mem_limit: "1G" + MODIN_CPUS: 6 # "https://modin.readthedocs.io/en/0.11.0/using_modin.html#reducing-or-limiting-the-resources-modin-can-use" + MODIN_MEMORY: 12000000000 # 1Gb + RAY_memory_usage_threshold: 0.95 volumes: - ./quotaclimat/:/app/quotaclimat/ - ./postgres/:/app/postgres/ - ./test/:/app/test/ + - ./s3:/app/s3/ secrets: - pwd_api - username_api diff --git a/quotaclimat/data_processing/mediatree/api_import.py b/quotaclimat/data_processing/mediatree/api_import.py index 770e9cc9..0d01aba0 100644 --- a/quotaclimat/data_processing/mediatree/api_import.py +++ b/quotaclimat/data_processing/mediatree/api_import.py @@ -236,10 +236,17 @@ def parse_reponse_subtitle(response_sub, channel = None, channel_program = "", c logging.getLogger("modin.logging.default").setLevel(logging.WARNING) if(total_results > 0): logging.info(f"{total_results} 'total_results' field") - new_df : pd.DataFrame = json_normalize(response_sub.get('data')) # TODO UserWarning: json_normalize is not currently supported by PandasOnRay, defaulting to pandas implementation. + + # To avoid UserWarning: json_normalize is not currently supported by PandasOnRay, defaulting to pandas implementation. + flattened_data = response_sub.get("data", []) + new_df : pd.DataFrame = pd.DataFrame(flattened_data) + new_df["channel.name"] = new_df["channel"].apply(lambda x: x["name"]) + new_df["channel.title"] = new_df["channel"].apply(lambda x: x["title"]) + new_df["channel.radio"] = new_df["channel"].apply(lambda x: x["radio"]) + new_df.drop("channel", axis=1, inplace=True) + logging.debug("Schema from API before formatting :\n%s", new_df.dtypes) pd.set_option('display.max_columns', None) - logging.debug("setting timestamp") new_df['timestamp'] = new_df.apply(lambda x: pd.to_datetime(x['start'], unit='s', utc=True), axis=1) logging.debug("timestamp was set") diff --git a/quotaclimat/data_processing/mediatree/s3/api_to_s3.py b/quotaclimat/data_processing/mediatree/s3/api_to_s3.py index 37caa6af..5ac48856 100644 --- a/quotaclimat/data_processing/mediatree/s3/api_to_s3.py +++ b/quotaclimat/data_processing/mediatree/s3/api_to_s3.py @@ -19,7 +19,7 @@ from postgres.insert_data import save_to_pg from postgres.schemas.models import create_tables, connect_to_db, get_db_session from postgres.schemas.models import keywords_table - +import shutil from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS from typing import List, Optional from tenacity import * @@ -60,9 +60,31 @@ endpoint_url=ENDPOINT_URL, ) -def get_bucket_key(date, channel): +def get_bucket_key(date, channel, filename:str="*", suffix:str="parquet"): + (year, month, day) = (date.year, date.month, date.day) + return f'year={year}/month={month:1}/day={day:1}/channel={channel}/{filename}.{suffix}' + +def get_bucket_key_folder(date, channel): (year, month, day) = (date.year, date.month, date.day) - return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json.gz' + return f'year={year}/month={month:1}/day={day:1}/channel={channel}/' + +# Function to upload folder to S3 +def upload_folder_to_s3(local_folder, bucket_name, base_s3_path): + logging.info(f"Reading local folder {local_folder} and uploading to S3") + for root, _, files in os.walk(local_folder): + logging.info(f"Reading files {len(files)}") + for file in files: + logging.info(f"Reading {file}") + local_file_path = os.path.join(root, file) + relative_path = os.path.relpath(local_file_path, local_folder) + s3_key = os.path.join(base_s3_path, relative_path).replace("\\", "/") # Replace backslashes for S3 compatibility + + # Upload file + s3_client.upload_file(local_file_path, bucket_name, s3_key) + logging.info(f"Uploaded: {s3_key}") + # Delete the local folder after successful upload + shutil.rmtree(local_folder) + logging.info(f"Deleted local folder: {local_folder}") def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp): logging.info(f"Saving DF with {len(df)} elements to S3 for {date} and channel {channel}") @@ -70,30 +92,43 @@ def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp): # to create partitions object_key = get_bucket_key(date, channel) logging.debug(f"Uploading partition: {object_key}") - # json_to_save = df.to_json(None, orient='records', lines=False) - logging.info(f"s3://{BUCKET_NAME}/{object_key}") try: - json_buffer = BytesIO() - with gzip.GzipFile(fileobj=json_buffer, mode='w') as gz: - df.to_json(gz, orient='records', lines=False) - - s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer.getvalue()) - - logging.info(f"Uploaded partition: {object_key}") + # add partition columns year, month, day to dataframe + df['year'] = date.year + df['month'] = date.month + df['day'] = date.day + df['channel'] = channel + + df = df._to_pandas() # collect data accross ray workers to avoid multiple subfolders + based_path = "s3/parquet" + df.to_parquet(based_path, + compression='gzip' + ,partition_cols=['year', 'month', 'day', 'channel']) + + #saving full_path folder parquet to s3 + s3_path = f"{get_bucket_key_folder(date, channel)}" + local_folder = f"{based_path}/{s3_path}" + upload_folder_to_s3(local_folder, BUCKET_NAME, s3_path) + except Exception as e: logging.error(Exception) exit() def check_if_object_exists_in_s3(day, channel): - object_key = get_bucket_key(day, channel) - logging.info(f"Checking if object exists: {object_key}") + folder_prefix = get_bucket_key_folder(day, channel) # Adjust this to return the folder path + + logging.debug(f"Checking if folder exists: {folder_prefix}") try: - s3_client.head_object(Bucket=BUCKET_NAME, Key=object_key) - logging.debug(f"Object already exists, skipping") - return True + response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=folder_prefix, MaxKeys=1) + if "Contents" in response: + logging.info(f"Folder exists in S3: {folder_prefix}") + return True + else: + logging.debug(f"Folder does not exist in S3: {folder_prefix}") + return False except Exception as e: - logging.info(f"Object does not exist in s3, continuing \n{e}") + logging.error(f"Error while checking folder in S3: {folder_prefix}\n{e}") return False async def get_and_save_api_data(exit_event): diff --git a/s3/.empty b/s3/.empty new file mode 100644 index 00000000..e69de29b diff --git a/test/s3/test_s3.py b/test/s3/test_s3.py index 8cf0eca5..4496f806 100644 --- a/test/s3/test_s3.py +++ b/test/s3/test_s3.py @@ -1,17 +1,23 @@ import pytest import pandas as pd -from quotaclimat.data_processing.mediatree.s3.api_to_s3 import get_bucket_key, save_to_s3 +from quotaclimat.data_processing.mediatree.s3.api_to_s3 import get_bucket_key, save_to_s3, get_bucket_key_folder def test_get_bucket_key(): friday_6h26 = 1726719981 date = pd.to_datetime(friday_6h26, unit='s', utc=True) channel = "tf1" - assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json.gz" + assert get_bucket_key(date, channel) == "year=2024/month=9/day=19/channel=tf1/*.parquet" def test_get_bucket_key_first_of_the_month(): first_december = 1733040125 date = pd.to_datetime(first_december, unit='s', utc=True) channel = "tf1" - assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json.gz" \ No newline at end of file + assert get_bucket_key(date, channel) == "year=2024/month=12/day=1/channel=tf1/*.parquet" + +def test_get_bucket_key_first_of_the_month(): + first_december = 1733040125 + date = pd.to_datetime(first_december, unit='s', utc=True) + channel = "tf1" + assert get_bucket_key_folder(date, channel) == "year=2024/month=12/day=1/channel=tf1/"