Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

915 - Add generate computed file command #936

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
92a7849
fix hidden computed_file query bug in Project::purge_computed_files
avrohomgottlieb Oct 14, 2024
82ce51e
add Sample::purge_computed_files method
avrohomgottlieb Oct 14, 2024
b578d27
add ComputedFile::purge, fix a few Project and Sample CF related name…
avrohomgottlieb Oct 15, 2024
b9d8476
add break up loader::_create_computed_file into loader::_create_compu…
avrohomgottlieb Oct 15, 2024
8df351d
add loader::generate_computed_file function
avrohomgottlieb Oct 15, 2024
19bd10e
convert all args to kwargs in loader::generate_computed_file
avrohomgottlieb Oct 15, 2024
77ae571
add generate_computed_file management command
avrohomgottlieb Oct 15, 2024
6a8c4f5
add validation to generate_computed_file command
avrohomgottlieb Oct 15, 2024
0bd2539
remove computed_file_name parameter from CF::get_project|sample_file
avrohomgottlieb Oct 16, 2024
4a30d4e
improve readability of loader::generate_computed_file
avrohomgottlieb Oct 16, 2024
f2c04b3
remove return early statements, clean up logging statements
avrohomgottlieb Oct 16, 2024
99a1175
Merge branch 'feature/batch' into avrohom/915-add-generate-computed-f…
avrohomgottlieb Oct 16, 2024
a2bf77c
add dispatch_to_batch command
avrohomgottlieb Oct 16, 2024
d12761f
improve logging statements, fix download config reference bug
avrohomgottlieb Oct 28, 2024
5b10871
improve code quality in projects query
avrohomgottlieb Oct 28, 2024
b8bfdbc
enforce usage of computed_file property in project and sample model m…
avrohomgottlieb Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions api/scpca_portal/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@

from scpca_portal import common, metadata_file, s3
from scpca_portal.config.logging import get_and_configure_logger
from scpca_portal.models import ComputedFile, Contact, ExternalAccession, Project, Publication
from scpca_portal.models import (
ComputedFile,
Contact,
ExternalAccession,
Project,
Publication,
Sample,
)

logger = get_and_configure_logger(__name__)

Expand Down Expand Up @@ -136,25 +143,61 @@ def create_project(
return project


def _create_computed_file(future, *, update_s3: bool, clean_up_output_data: bool) -> None:
def _create_computed_file(
computed_file: ComputedFile, update_s3: bool, clean_up_output_data: bool
) -> None:
"""
Save computed file returned from future to the db.
Upload file to s3 and clean up output data depending on passed options.
"""
if computed_file := future.result():
# Only upload and clean up projects and the last sample if multiplexed
if computed_file.project or computed_file.sample.is_last_multiplexed_sample:
if update_s3:
s3.upload_output_file(computed_file.s3_key, computed_file.s3_bucket)
if clean_up_output_data:
computed_file.clean_up_local_computed_file()
computed_file.save()

# Only upload and clean up projects and the last sample if multiplexed
if computed_file.project or computed_file.sample.is_last_multiplexed_sample:
if update_s3:
s3.upload_output_file(computed_file.s3_key, computed_file.s3_bucket)
if clean_up_output_data:
computed_file.clean_up_local_computed_file()
computed_file.save()

def _create_computed_file_callback(future, *, update_s3: bool, clean_up_output_data: bool) -> None:
"""
Wrap multiprocessing logic by grabbing computed file future and uploading it tohe s3.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sp

"""
if computed_file := future.result():
_create_computed_file(computed_file, update_s3, clean_up_output_data)

# Close DB connection for each thread.
connection.close()


def generate_computed_file(
*,
download_config: Dict,
project: Project | None = None,
sample: Sample | None = None,
update_s3: bool = True,
) -> None:

# Purge old computed file
if old_computed_file := (project or sample).get_computed_file(download_config):
old_computed_file.purge(update_s3)

if project:
computed_file = ComputedFile.get_project_file(
project, download_config, project.get_output_file_name(download_config)
)
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
elif sample:
computed_file = ComputedFile.get_sample_file(
sample,
download_config,
sample.get_output_file_name(download_config),
Lock(), # this should be removed when CF::get_sample_file is refactored
)
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
sample.project.update_downloadable_sample_count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if project:
computed_file = ComputedFile.get_project_file(
project, download_config, project.get_output_file_name(download_config)
)
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
elif sample:
computed_file = ComputedFile.get_sample_file(
sample,
download_config,
sample.get_output_file_name(download_config),
Lock(), # this should be removed when CF::get_sample_file is refactored
)
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
sample.project.update_downloadable_sample_count()
if project and computed_file := ComputedFile.get_project_file(project, download_config):
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
if sample and computed_file := ComputedFile.get_sample_file(sample, download_config):
_create_computed_file(computed_file, update_s3, clean_up_output_data=False)
sample.project.update_downloadable_sample_count()

I think you can make Lock() the default value for a lock in get_sample_file. Otherwise rewriting it this way makes it a bit clearer.



def generate_computed_files(
project: Project,
max_workers: int,
Expand All @@ -170,7 +213,7 @@ def generate_computed_files(

# Prep callback function
on_get_file = partial(
_create_computed_file,
_create_computed_file_callback,
update_s3=update_s3,
clean_up_output_data=clean_up_output_data,
)
Expand Down
72 changes: 72 additions & 0 deletions api/scpca_portal/management/commands/generate_computed_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
from typing import Dict

from django.core.management.base import BaseCommand

from scpca_portal import common, loader
from scpca_portal.models import Project, Sample

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


class Command(BaseCommand):
help = """
This command is meant to be called as an entrypoint to AWS Batch Fargate job instance.
Individual files are computed according:
- To the project or sample id
- An appropriate corresponding download config

When computation is completed, files are uploaded to S3, and the job is marked as completed.

At which point the instance which generated this computed file will receive a new job
from the job queue and begin computing the next file.
"""

def add_arguments(self, parser):
parser.add_argument("--project-id", type=str)
parser.add_argument("--sample-id", type=str)
parser.add_argument("--download-config", type=dict)

def handle(self, *args, **kwargs):
self.generate_computed_file(**kwargs)

def generate_computed_file(
self,
project_id: str,
sample_id: str,
download_config: Dict,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a string that references a dict in common.py

**kwargs,
) -> None:
"""Generates a project's computed files according predetermined download configurations"""
loader.prep_data_dirs()

if project_id:
project = Project.objects.filter(scpca_id=project_id).first()
sample = None
if not project:
logger.error("project doesn't exist")
return
if download_config not in common.GENERATED_PROJECT_DOWNLOAD_CONFIGS:
logger.error("download_config is not valid")
return
elif sample_id:
sample = Sample.objects.filter(scpca_id=sample_id).first()
project = None
if not sample:
logger.error("sample doesn't exist")
return
if download_config not in common.GENERATED_SAMPLE_DOWNLOAD_CONFIGS:
logger.error("download_config is not valid")
return

else:
logger.error(
"neither project_id nor sample_id were passed. at least one must be passed."
)
return

loader.generate_computed_file(
download_config=download_config, project=project, sample=sample
)
6 changes: 6 additions & 0 deletions api/scpca_portal/models/computed_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,3 +361,9 @@ def zip_file_path(self):
def clean_up_local_computed_file(self):
"""Delete local computed file."""
self.zip_file_path.unlink(missing_ok=True)

def purge(self, delete_from_s3: bool = False) -> None:
"""Purges a computed file, optionally deleting it from S3."""
if delete_from_s3:
s3.delete_output_file(self.s3_key, self.s3_bucket)
self.delete()
15 changes: 5 additions & 10 deletions api/scpca_portal/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ def get_output_file_name(self, download_config: Dict) -> str:
def get_computed_file(self, download_config: Dict) -> ComputedFile:
"Return the project computed file that matches the passed download_config."
if download_config["metadata_only"]:
return self.computed_files.filter(metadata_only=True).first()
return self.project_computed_files.filter(metadata_only=True).first()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self.project_computed_files.filter(metadata_only=True).first()
return self.computed_files.filter(metadata_only=True).first()

Let's keep using the property for now.


return self.computed_files.filter(
return self.project_computed_files.filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self.project_computed_files.filter(
return self.computed_files.filter(

modality=download_config["modality"],
format=download_config["format"],
has_multiplexed_data=(not download_config["excludes_multiplexed"]),
Expand Down Expand Up @@ -315,16 +315,11 @@ def purge_computed_files(self, delete_from_s3: bool = False) -> None:
"""Purges all computed files associated with the project instance."""
# Delete project's sample computed files
for sample in self.samples.all():
for computed_file in sample.computed_files:
if delete_from_s3:
s3.delete_output_file(computed_file.s3_key, computed_file.s3_bucket)
computed_file.delete()
sample.purge_computed_files(delete_from_s3)

# Delete project's project computed files
for computed_file in self.computed_files:
if delete_from_s3:
s3.delete_output_file(computed_file.s3_key, computed_file.s3_bucket)
computed_file.delete()
for computed_file in self.project_computed_files.all():
computed_file.purge(delete_from_s3)

def update_sample_derived_properties(self):
"""
Expand Down
6 changes: 5 additions & 1 deletion api/scpca_portal/models/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_metadata(self) -> Dict:

def get_computed_file(self, download_config: Dict) -> ComputedFile:
"Return the sample computed file that matches the passed download_config."
return self.computed_files.filter(
return self.sample_computed_files.filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self.sample_computed_files.filter(
return self.computed_files.filter(

modality=download_config["modality"],
format=download_config["format"],
).first()
Expand Down Expand Up @@ -265,3 +265,7 @@ def purge(self) -> None:
if library.samples.count() == 1:
library.delete()
self.delete()

def purge_computed_files(self, delete_from_s3: bool = False) -> None:
for computed_file in self.sample_computed_files.all():
computed_file.purge(delete_from_s3)
Loading