Skip to content

Commit

Permalink
Feat/parquet-save to s3 (#300)
Browse files Browse the repository at this point in the history
* feat: api to s3

* gzip content

* check if already exist

* feat: S3 parrquet

* fix: test

* fix: dependabot auto approuve
  • Loading branch information
polomarcus authored Dec 19, 2024
1 parent 60504e1 commit 1045a15
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dependabot-auto-approve.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
steps:
- name: Dependabot metadata
id: metadata
uses: dependabot/fetch-metadata@4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d
uses: dependabot/fetch-metadata@@v2
with:
github-token: "${{ secrets.GITHUB_TOKEN }}"
- name: Approve a PR
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/deploy-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,12 @@ jobs:
SCW_ZONE: ${{ secrets.SCW_ZONE }}
with:
args: jobs definition update ${{ secrets.SCALEWAY_JOB_IMPORT_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/mediatree_import:${{ env.PROJECT_VERSION }}
- name: update scaleway job definition with version s3
uses: jawher/[email protected]
env:
SCW_ACCESS_KEY: ${{ secrets.SCW_ACCESS_KEY }}
SCW_SECRET_KEY: ${{ secrets.SCW_SECRET_KEY }}
SCW_ORGANIZATION_ID: ${{ secrets.SCW_ORGANIZATION_ID }}
SCW_ZONE: ${{ secrets.SCW_ZONE }}
with:
args: jobs definition update ${{ secrets.SCALEWAY_JOB_S3_ID }} image-uri=${{ secrets.CONTAINER_REGISTRY_ENDPOINT }}/s3:${{ env.PROJECT_VERSION }}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
secrets/pwd_api.txt
secrets/username_api.txt
secrets/*
s3/*
documents-experts/
cc-bio.json
*.xlsx
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ services:
MEDIATREE_AUTH_URL: https://keywords.mediatree.fr/api/auth/token/
KEYWORDS_URL: https://keywords.mediatree.fr/api/subtitle/ # https://keywords.mediatree.fr/docs/#api-Subtitle-SubtitleList
MODIN_ENGINE: ray
MODIN_CPUS: 4 # "https://modin.readthedocs.io/en/0.11.0/using_modin.html#reducing-or-limiting-the-resources-modin-can-use"
MODIN_MEMORY: 1000000000 # 1Gb
RAY_memory_usage_threshold: 1
mem_limit: "1G"
MODIN_CPUS: 6 # "https://modin.readthedocs.io/en/0.11.0/using_modin.html#reducing-or-limiting-the-resources-modin-can-use"
MODIN_MEMORY: 12000000000 # 1Gb
RAY_memory_usage_threshold: 0.95
volumes:
- ./quotaclimat/:/app/quotaclimat/
- ./postgres/:/app/postgres/
- ./test/:/app/test/
- ./s3:/app/s3/
secrets:
- pwd_api
- username_api
Expand Down
11 changes: 9 additions & 2 deletions quotaclimat/data_processing/mediatree/api_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,17 @@ def parse_reponse_subtitle(response_sub, channel = None, channel_program = "", c
logging.getLogger("modin.logging.default").setLevel(logging.WARNING)
if(total_results > 0):
logging.info(f"{total_results} 'total_results' field")
new_df : pd.DataFrame = json_normalize(response_sub.get('data')) # TODO UserWarning: json_normalize is not currently supported by PandasOnRay, defaulting to pandas implementation.

# To avoid UserWarning: json_normalize is not currently supported by PandasOnRay, defaulting to pandas implementation.
flattened_data = response_sub.get("data", [])
new_df : pd.DataFrame = pd.DataFrame(flattened_data)
new_df["channel.name"] = new_df["channel"].apply(lambda x: x["name"])
new_df["channel.title"] = new_df["channel"].apply(lambda x: x["title"])
new_df["channel.radio"] = new_df["channel"].apply(lambda x: x["radio"])
new_df.drop("channel", axis=1, inplace=True)

logging.debug("Schema from API before formatting :\n%s", new_df.dtypes)
pd.set_option('display.max_columns', None)

logging.debug("setting timestamp")
new_df['timestamp'] = new_df.apply(lambda x: pd.to_datetime(x['start'], unit='s', utc=True), axis=1)
logging.debug("timestamp was set")
Expand Down
71 changes: 53 additions & 18 deletions quotaclimat/data_processing/mediatree/s3/api_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from postgres.insert_data import save_to_pg
from postgres.schemas.models import create_tables, connect_to_db, get_db_session
from postgres.schemas.models import keywords_table

import shutil
from quotaclimat.data_processing.mediatree.keyword.keyword import THEME_KEYWORDS
from typing import List, Optional
from tenacity import *
Expand Down Expand Up @@ -60,40 +60,75 @@
endpoint_url=ENDPOINT_URL,
)

def get_bucket_key(date, channel):
def get_bucket_key(date, channel, filename:str="*", suffix:str="parquet"):
(year, month, day) = (date.year, date.month, date.day)
return f'year={year}/month={month:1}/day={day:1}/channel={channel}/{filename}.{suffix}'

def get_bucket_key_folder(date, channel):
(year, month, day) = (date.year, date.month, date.day)
return f'year={year}/month={month:02}/day={day:02}/channel={channel}/data.json.gz'
return f'year={year}/month={month:1}/day={day:1}/channel={channel}/'

# Function to upload folder to S3
def upload_folder_to_s3(local_folder, bucket_name, base_s3_path):
logging.info(f"Reading local folder {local_folder} and uploading to S3")
for root, _, files in os.walk(local_folder):
logging.info(f"Reading files {len(files)}")
for file in files:
logging.info(f"Reading {file}")
local_file_path = os.path.join(root, file)
relative_path = os.path.relpath(local_file_path, local_folder)
s3_key = os.path.join(base_s3_path, relative_path).replace("\\", "/") # Replace backslashes for S3 compatibility

# Upload file
s3_client.upload_file(local_file_path, bucket_name, s3_key)
logging.info(f"Uploaded: {s3_key}")
# Delete the local folder after successful upload
shutil.rmtree(local_folder)
logging.info(f"Deleted local folder: {local_folder}")

def save_to_s3(df: pd.DataFrame, channel: str, date: pd.Timestamp):
logging.info(f"Saving DF with {len(df)} elements to S3 for {date} and channel {channel}")

# to create partitions
object_key = get_bucket_key(date, channel)
logging.debug(f"Uploading partition: {object_key}")
# json_to_save = df.to_json(None, orient='records', lines=False)
logging.info(f"s3://{BUCKET_NAME}/{object_key}")

try:
json_buffer = BytesIO()
with gzip.GzipFile(fileobj=json_buffer, mode='w') as gz:
df.to_json(gz, orient='records', lines=False)

s3_client.put_object(Bucket=BUCKET_NAME, Key=object_key, Body=json_buffer.getvalue())

logging.info(f"Uploaded partition: {object_key}")
# add partition columns year, month, day to dataframe
df['year'] = date.year
df['month'] = date.month
df['day'] = date.day
df['channel'] = channel

df = df._to_pandas() # collect data accross ray workers to avoid multiple subfolders
based_path = "s3/parquet"
df.to_parquet(based_path,
compression='gzip'
,partition_cols=['year', 'month', 'day', 'channel'])

#saving full_path folder parquet to s3
s3_path = f"{get_bucket_key_folder(date, channel)}"
local_folder = f"{based_path}/{s3_path}"
upload_folder_to_s3(local_folder, BUCKET_NAME, s3_path)

except Exception as e:
logging.error(Exception)
exit()

def check_if_object_exists_in_s3(day, channel):
object_key = get_bucket_key(day, channel)
logging.info(f"Checking if object exists: {object_key}")
folder_prefix = get_bucket_key_folder(day, channel) # Adjust this to return the folder path

logging.debug(f"Checking if folder exists: {folder_prefix}")
try:
s3_client.head_object(Bucket=BUCKET_NAME, Key=object_key)
logging.debug(f"Object already exists, skipping")
return True
response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=folder_prefix, MaxKeys=1)
if "Contents" in response:
logging.info(f"Folder exists in S3: {folder_prefix}")
return True
else:
logging.debug(f"Folder does not exist in S3: {folder_prefix}")
return False
except Exception as e:
logging.info(f"Object does not exist in s3, continuing \n{e}")
logging.error(f"Error while checking folder in S3: {folder_prefix}\n{e}")
return False

async def get_and_save_api_data(exit_event):
Expand Down
Empty file added s3/.empty
Empty file.
12 changes: 9 additions & 3 deletions test/s3/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import pytest
import pandas as pd
from quotaclimat.data_processing.mediatree.s3.api_to_s3 import get_bucket_key, save_to_s3
from quotaclimat.data_processing.mediatree.s3.api_to_s3 import get_bucket_key, save_to_s3, get_bucket_key_folder


def test_get_bucket_key():
friday_6h26 = 1726719981
date = pd.to_datetime(friday_6h26, unit='s', utc=True)
channel = "tf1"
assert get_bucket_key(date, channel) == "year=2024/month=09/day=19/channel=tf1/data.json.gz"
assert get_bucket_key(date, channel) == "year=2024/month=9/day=19/channel=tf1/*.parquet"


def test_get_bucket_key_first_of_the_month():
first_december = 1733040125
date = pd.to_datetime(first_december, unit='s', utc=True)
channel = "tf1"
assert get_bucket_key(date, channel) == "year=2024/month=12/day=01/channel=tf1/data.json.gz"
assert get_bucket_key(date, channel) == "year=2024/month=12/day=1/channel=tf1/*.parquet"

def test_get_bucket_key_first_of_the_month():
first_december = 1733040125
date = pd.to_datetime(first_december, unit='s', utc=True)
channel = "tf1"
assert get_bucket_key_folder(date, channel) == "year=2024/month=12/day=1/channel=tf1/"

1 comment on commit 1045a15

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage

Coverage Report
FileStmtsMissCoverMissing
postgres
   insert_data.py43784%36–38, 56–58, 63
   insert_existing_data_example.py19384%25–27
postgres/schemas
   models.py1681193%137–144, 157, 159–160, 225–226, 240–241
quotaclimat/data_ingestion
   scrap_sitemap.py1341787%27–28, 33–34, 66–71, 95–97, 138–140, 202, 223–228
quotaclimat/data_ingestion/ingest_db
   ingest_sitemap_in_db.py553733%21–42, 45–58, 62–73
quotaclimat/data_ingestion/scrap_html
   scrap_description_article.py36392%19–20, 32
quotaclimat/data_processing/mediatree
   api_import.py21813339%44–48, 53–74, 78–81, 87, 90–132, 138–153, 158, 171–183, 187–193, 206–218, 221–225, 231, 276–277, 280–311, 314–316
   channel_program.py1625765%21–23, 34–36, 53–54, 57–59, 98–99, 108, 124, 175–216
   config.py15287%7, 16
   detect_keywords.py2521694%111–118, 126–127, 271, 341–348, 390
   update_pg_keywords.py674927%15–130, 154, 157, 164–179, 213–250, 257
   utils.py792568%29–53, 56, 65, 86–87, 117–120
quotaclimat/data_processing/mediatree/s3
   api_to_s3.py1579838%73–87, 90–116, 119–132, 135–184, 187–213, 216–218
quotaclimat/utils
   healthcheck_config.py291452%22–24, 27–38
   logger.py241154%22–24, 28–37
   sentry.py11282%22–23
TOTAL149648568% 

Tests Skipped Failures Errors Time
104 0 💤 0 ❌ 0 🔥 7m 48s ⏱️

Please sign in to comment.