Skip to content
This repository has been archived by the owner on May 18, 2023. It is now read-only.

Make cloud functions datasets processing use elpis core library #95

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cloudFunctionTests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Setup poetry
uses: abatilo/actions-poetry@v2
with:
poetry-version: "1.1.13"
poetry-version: "1.2.2"

- name: Poetry install
run: |
Expand Down
4 changes: 2 additions & 2 deletions functions/functions/datasets/delete_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functions_framework import Context
from humps.main import decamelize
from loguru import logger
from models import Dataset
from models.dataset import CloudDataset
from utils.cloud_storage import delete_folder_blob
from utils.firestore_event_converter import unpack

Expand All @@ -27,7 +27,7 @@ def delete_dataset_from_bucket(data: Dict, context: Context) -> None:
logger.info(f"Raw firestore dataset event: {data}")

dataset = decamelize(unpack(data["oldValue"]))
dataset = Dataset.from_dict(dataset)
dataset = CloudDataset.from_dict(dataset)

delete_folder_blob(
bucket_name=DATASET_BUCKET_NAME,
Expand Down
6 changes: 3 additions & 3 deletions functions/functions/datasets/process_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from functions_framework import Context
from humps.main import decamelize
from loguru import logger
from models.dataset import Dataset, ProcessingJob
from models.dataset import CloudDataset, ProcessingJob
from utils.firestore_event_converter import unpack
from utils.pubsub import publish_to_topic

Expand All @@ -23,9 +23,9 @@ def process_dataset(data: Dict, context: Context) -> None:
"""
# Convert the firestore event into a dataset object.
dataset = decamelize(unpack(data["value"]))
dataset = Dataset.from_dict(dataset)
dataset = CloudDataset.from_dict(dataset)

logger.info(f"Firestore newly-created dataset information: {dataset}")

jobs = map(ProcessingJob.to_dict, dataset.to_batch())
jobs = map(ProcessingJob.to_dict, dataset.to_jobs())
publish_to_topic(topic_name=TOPIC_NAME, data=jobs)
107 changes: 16 additions & 91 deletions functions/functions/datasets/process_file.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import base64
import json
import os
from copy import copy
from itertools import chain
from pathlib import Path
from typing import List, Tuple

import utils.audio as audio
from elpis.datasets import process_batch
from firebase_admin import firestore
from loguru import logger
from models import Annotation, DatasetOptions, ProcessingJob
from utils.clean_text import clean_text
from models import ProcessingJob
from utils.cloud_storage import download_blob, list_blobs_with_prefix, upload_blob
from utils.extract_annotations import extract_annotations
from utils.firebase import get_firestore_client

DEFAULT_DIR = Path("/tmp/")
Expand All @@ -36,29 +32,18 @@ def process_dataset_file(event, context) -> None:
# Decode and deserialize the processing job
data = base64.b64decode(event["data"]).decode("utf-8")
data = json.loads(data)
logger.info(f"Event data: {data}")
job = ProcessingJob.from_dict(data)
try:
job = ProcessingJob.from_dict(data)
except Exception as error:
logger.error(f"Failed to deserialize job: {data}")
logger.error(error)
return

transcription_file, audio_file = download_files(job)
annotations = extract_annotations(transcription_file, job.options.elan_options)

# Resample audio to standardise for training
audio.resample(
audio_path=audio_file, destination=audio_file, sample_rate=TARGET_SAMPLE_RATE
)
job.transcription_file = transcription_file
job.audio_file = audio_file

# Clean the annotations
annotations = map(
lambda annotation: clean_annotation(annotation, job.options), annotations
)

# Generate training files from the annotations
processed_files = chain(
*map(
lambda annotation: generate_training_files(annotation, audio_file),
annotations,
)
)
processed_files = process_batch(job, output_dir=DEFAULT_DIR)

# Upload all the training files
for file in processed_files:
Expand All @@ -73,91 +58,31 @@ def download_files(job: ProcessingJob, dir: Path = DEFAULT_DIR) -> Tuple[Path, P
"""Download the required transcription and audio files for the job.

Parameters:
job: The processing job.
job: The job to process.
dir: The directory in which to store the files.

Returns:
A tuple containing the path of the downloaded transcription,
and audio files.
"""
# Download transcription file
transcription_file = dir / job.transcription_file_name
transcription_file = dir / job.transcription_file
download_blob(
bucket_name=FILES_BUCKET,
source_blob_name=f"{job.user_id}/{job.transcription_file_name}",
source_blob_name=f"{job.user_id}/{job.transcription_file}",
destination_file_name=transcription_file,
)

# Download audio file
audio_file = dir / job.audio_file_name
audio_file = dir / job.audio_file
download_blob(
bucket_name=FILES_BUCKET,
source_blob_name=f"{job.user_id}/{job.audio_file_name}",
source_blob_name=f"{job.user_id}/{job.audio_file}",
destination_file_name=audio_file,
)
return transcription_file, audio_file


def clean_annotation(annotation: Annotation, options: DatasetOptions) -> Annotation:
"""Cleans the text within an annotation.

Parameters:
annotation: The annotation to clean.
options: The cleaning options for the dataset.

Returns:
A new annotation whose transcript has been cleaned.
"""
transcript = clean_text(annotation.transcript, options)
result = copy(annotation)
result.transcript = transcript
return result


def generate_training_files(
annotation: Annotation, audio_file: Path, dir: Path = DEFAULT_DIR
) -> Tuple[Path, Path]:
"""Generates a transcript and audio file pairing for this annotation.

If the annotation is timed (has a start and stop time), we return a path
to a new audio file, which is constrained to the given times. Otherwise,
the annotation spans the entire audio path, and so we return this path,
unmodified.

Parameters:
annotation: The annotation for a given section of audio within the
supplied audio_file.
audio_file: The file which the annotation references.

Returns:
A tuple containing a transcription and audio file path for the given
annotation.
"""
# Get a unique name prefix based on annotation start time
name = audio_file.stem
if annotation.start_ms is not None:
name = f"{name}_{annotation.start_ms}"

# Save transcription_file
transcription_file = dir / f"{name}.json"
with open(transcription_file, "w") as f:
json.dump(annotation.to_dict(), f)

# Save audio file.
# Type ignoring is because is_timed ensures sample_rate, start_ms and stop_ms exist
if annotation.is_timed():
cut_audio_file = dir / f"{name}.wav"
audio.cut(
audio_path=audio_file,
destination=cut_audio_file,
start_ms=annotation.start_ms, # type: ignore
stop_ms=annotation.stop_ms, # type: ignore
)
audio_file = cut_audio_file

return transcription_file, audio_file


def post_processing_hook(job: ProcessingJob) -> None:
"""Determine if all the files in a dataset have been processed, and if so,
updates the document accordingly in firestore.
Expand Down
10 changes: 2 additions & 8 deletions functions/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from models.annotation import Annotation
from models.dataset import Dataset, DatasetOptions, ElanOptions, ProcessingJob
from models.elan_tier_selector import TierSelector
from models.dataset import CloudDataset, ProcessingJob
from models.model import Model

__all__ = [
"Annotation",
"Dataset",
"DatasetOptions",
"ElanOptions",
"CloudDataset",
"ProcessingJob",
"TierSelector",
"Model",
]
41 changes: 0 additions & 41 deletions functions/models/annotation.py

This file was deleted.

Loading