Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for missing files #1374

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beagle/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@
BEAGLE_NATS_UPDATE_SAMPLE_QUEUE = os.environ.get("BEAGLE_NATS_UPDATE_SAMPLE_QUEUE", "beagle_nats_update_sample_queue")
BEAGLE_RUNNER_QUEUE = os.environ.get("BEAGLE_RUNNER_QUEUE", "beagle_runner_queue")
BEAGLE_DEFAULT_QUEUE = os.environ.get("BEAGLE_DEFAULT_QUEUE", "beagle_default_queue")
BEAGLE_CHECK_FILES_QUEUE = os.environ.get("BEAGLE_CHECK_FILES_QUEUE", "beagle_check_files_queue")
BEAGLE_JOB_SCHEDULER_QUEUE = os.environ.get("BEAGLE_JOB_SCHEDULER_QUEUE", "beagle_job_scheduler_queue")
BEAGLE_SHARED_TMPDIR = os.environ.get("BEAGLE_SHARED_TMPDIR", "/juno/work/ci/temp")

Expand Down Expand Up @@ -474,9 +475,7 @@
NOTIFIER_WES_CC = os.environ.get("BEAGLE_NOTIFIER_WHOLE_EXOME_SEQUENCING_CC", "")

DEFAULT_MAPPING = json.loads(os.environ.get("BEAGLE_COPY_MAPPING", "{}"))
"""

"""
MAPPING = json.loads(os.environ.get("BEAGLE_FILE_MAPPING", "{}"))
COPY_FILE_PERMISSION = 0o644
COPY_DIR_PERMISSION = 0o750
Expand All @@ -502,3 +501,6 @@

MANUAL_RESTART_REPORT_THRESHOLD = os.environ.get("MANUAL_RESTART_REPORT_THRESHOLD", 4)
MANUAL_RESTART_REPORT_PATH = os.environ.get("MANUAL_RESTART_REPORT_PATH", "/tmp/report.txt")

MISSING_FILES_REPORT_PATH = os.environ.get("BEAGLE_MISSING_FILES_REPORT_PATH")
MISSING_FILES_REPORT_COUNT = int(os.environ.get("BEAGLE_MISSING_FILES_REPORT_COUNT", 10))
6 changes: 6 additions & 0 deletions beagle_etl/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def at_start(sender, **k):
"runner.tasks.fail_job": {"queue": settings.BEAGLE_RUNNER_QUEUE},
"notifier.tasks.send_notification": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"file_system.tasks.populate_job_group_notifier_metadata": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"file_system.tasks.check_fastq_files": {"queue": settings.BEAGLE_CHECK_FILES_QUEUE},
"beagle_etl.tasks.job_processor": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"beagle_etl.tasks.process_smile_events": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"beagle_etl.tasks.fetch_request_nats": {"queue": settings.BEAGLE_NATS_NEW_REQUEST_QUEUE},
Expand Down Expand Up @@ -98,4 +99,9 @@ def at_start(sender, **k):
"schedule": settings.CHECK_JOB_TIMEOUTS,
"options": {"queue": settings.BEAGLE_RUNNER_QUEUE},
},
"check_missing_files": {
"task": "file_system.tasks.check_fastq_files",
"schedule": crontab(day_of_week=1, hour=0, minute=0),
"options": {"queue": settings.BEAGLE_CHECK_FILES_QUEUE},
},
}
21 changes: 16 additions & 5 deletions container/beagle.def
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,26 @@ Includecmd: no

export BEAGLE_CELERY_BEAT_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_beat.pid
export BEAGLE_CELERY_WORKER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_worker.pid
export BEAGLE_CELERY_CHECK_FILES_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_check_files.pid
export BEAGLE_CELERY_SCHEDULER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_scheduler.pid
export BEAGLE_CELERY_RUNNER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_runner.pid
export BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_NATS_NEW_REQUEST_QUEUE}.beagle_nats_new_request.pid

echo "Starting celery services..."
echo "Exported parameters:"
echo "BEAGLE_CELERY_BEAT_PID_FILE=${BEAGLE_CELERY_BEAT_PID_FILE}"
echo "BEAGLE_CELERY_WORKER_PID_FILE=${BEAGLE_CELERY_WORKER_PID_FILE}"
echo "BEAGLE_CELERY_SCHEDULER_PID_FILE=${BEAGLE_CELERY_SCHEDULER_PID_FILE}"
echo "BEAGLE_CELERY_RUNNER_PID_FILE=${BEAGLE_CELERY_RUNNER_PID_FILE}"
echo "BEAGLE_CELERY_NATS_PID_FILE=${BEAGLE_CELERY_NATS_PID_FILE}"
echo "BEAGLE_CELERY_BEAT_PID_FILE=$BEAGLE_CELERY_BEAT_PID_FILE"
echo "BEAGLE_CELERY_WORKER_PID_FILE=$BEAGLE_CELERY_WORKER_PID_FILE"
echo "BEAGLE_CELERY_CHECK_FILES_PID_FILE=$BEAGLE_CELERY_CHECK_FILES_PID_FILE"
echo "BEAGLE_CELERY_SCHEDULER_PID_FILE=$BEAGLE_CELERY_SCHEDULER_PID_FILE"
echo "BEAGLE_CELERY_RUNNER_PID_FILE=$BEAGLE_CELERY_RUNNER_PID_FILE"
echo "BEAGLE_CELERY_NATS_PID_FILE=$BEAGLE_CELERY_NATS_PID_FILE"

celery -A beagle_etl --workdir ${BEAGLE_PATH} beat -l info -f ${BEAGLE_CELERY_LOG_PATH}/beagle_beat.log --pidfile ${BEAGLE_CELERY_BEAT_PID_FILE} -s ${BEAGLE_CELERY_BEAT_SCHEDULE_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.celerybeat-schedule >/dev/null 2>&1 < /dev/null &

celery -A beagle_etl --workdir ${BEAGLE_PATH} worker --concurrency 100 -l info -Q ${BEAGLE_DEFAULT_QUEUE} -f ${BEAGLE_CELERY_LOG_PATH}/beagle_worker.log --pidfile ${BEAGLE_CELERY_WORKER_PID_FILE} -n ${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_worker >/dev/null 2>&1 < /dev/null &

celery -A beagle_etl --workdir ${BEAGLE_PATH} worker --concurrency 1 -l info -Q ${BEAGLE_CHECK_FILES_QUEUE} -f ${BEAGLE_CELERY_LOG_PATH}/beagle_check_files.log --pidfile ${BEAGLE_CELERY_CHECK_FILES_PID_FILE} -n ${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_worker >/dev/null 2>&1 < /dev/null &

celery -A beagle_etl --workdir ${BEAGLE_PATH} worker --concurrency 1 -l info -Q ${BEAGLE_JOB_SCHEDULER_QUEUE} -f ${BEAGLE_CELERY_LOG_PATH}/beagle_scheduler.log --pidfile ${BEAGLE_CELERY_SCHEDULER_PID_FILE} -n ${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_scheduler >/dev/null 2>&1 < /dev/null &

celery -A beagle_etl --workdir ${BEAGLE_PATH} worker --concurrency 1 -l debug -Q ${BEAGLE_NATS_NEW_REQUEST_QUEUE} -f ${BEAGLE_CELERY_LOG_PATH}/beagle_nats_new_request.log --pidfile ${BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE} -n ${BEAGLE_NATS_NEW_REQUEST_QUEUE}.beagle_nats_new_request >/dev/null 2>&1 < /dev/null &
Expand Down Expand Up @@ -143,6 +147,7 @@ Includecmd: no

export BEAGLE_CELERY_BEAT_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_beat.pid
export BEAGLE_CELERY_WORKER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_worker.pid
export BEAGLE_CELERY_CHECK_FILES_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_check_files.pid
export BEAGLE_CELERY_SCHEDULER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_scheduler.pid
export BEAGLE_CELERY_RUNNER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_runner.pid
export BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_NATS_NEW_REQUEST_QUEUE}.beagle_nats_new_request.pid
Expand All @@ -155,6 +160,7 @@ Includecmd: no
echo "BEAGLE_CELERY_EVENT_QUEUE_PREFIX:"$BEAGLE_CELERY_EVENT_QUEUE_PREFIX
echo "BEAGLE_CELERY_BEAT_PID_FILE:"$BEAGLE_CELERY_BEAT_PID_FILE
echo "BEAGLE_CELERY_WORKER_PID_FILE:"$BEAGLE_CELERY_WORKER_PID_FILE
echo "BEAGLE_CELERY_CHECK_FILES_PID_FILE=$BEAGLE_CELERY_CHECK_FILES_PID_FILE"
echo "BEAGLE_CELERY_SCHEDULER_PID_FILE:"$BEAGLE_CELERY_SCHEDULER_PID_FILE
echo "BEAGLE_CELERY_RUNNER_PID_FILE:"$BEAGLE_CELERY_RUNNER_PID_FILE
echo "BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE:"$BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE
Expand All @@ -169,6 +175,7 @@ Includecmd: no
echo "Removing PID files..."
rm $BEAGLE_CELERY_BEAT_PID_FILE > /dev/null 2>&1 < /dev/null &
rm $BEAGLE_CELERY_WORKER_PID_FILE > /dev/null 2>&1 < /dev/null &
rm BEAGLE_CELERY_CHECK_FILES_PID_FILE > /dev/null 2>&1 < /dev/null &
rm $BEAGLE_CELERY_SCHEDULER_PID_FILE > /dev/null 2>&1 < /dev/null &
rm $BEAGLE_CELERY_RUNNER_PID_FILE > /dev/null 2>&1 < /dev/null &
rm $BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE > /dev/null 2>&1 < /dev/null &
Expand Down Expand Up @@ -203,6 +210,7 @@ Includecmd: no

export BEAGLE_CELERY_BEAT_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_beat.pid
export BEAGLE_CELERY_WORKER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_worker.pid
export BEAGLE_CELERY_CHECK_FILES_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_check_files.pid
export BEAGLE_CELERY_SCHEDULER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_scheduler.pid
export BEAGLE_CELERY_RUNNER_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_CELERY_EVENT_QUEUE_PREFIX}.beagle_runner.pid
export BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE=${BEAGLE_CELERY_PID_PATH}/${BEAGLE_NATS_NEW_REQUEST_QUEUE}.beagle_nats_new_request.pid
Expand All @@ -215,13 +223,15 @@ Includecmd: no
echo "BEAGLE_CELERY_EVENT_QUEUE_PREFIX:"$BEAGLE_CELERY_EVENT_QUEUE_PREFIX
echo "BEAGLE_CELERY_BEAT_PID_FILE:"$BEAGLE_CELERY_BEAT_PID_FILE
echo "BEAGLE_CELERY_WORKER_PID_FILE:"$BEAGLE_CELERY_WORKER_PID_FILE
echo "BEAGLE_CELERY_CHECK_FILES_PID_FILE=$BEAGLE_CELERY_CHECK_FILES_PID_FILE"
echo "BEAGLE_CELERY_SCHEDULER_PID_FILE:"$BEAGLE_CELERY_SCHEDULER_PID_FILE
echo "BEAGLE_CELERY_RUNNER_PID_FILE:"$BEAGLE_CELERY_RUNNER_PID_FILE
echo "BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE:"$BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE
echo ""

export BEAGLE_CELERY_BEAT=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_BEAT_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
export BEAGLE_CELERY_WORKER=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_WORKER_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
export BEAGLE_CELERY_CHECK_FILES=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_CHECK_FILES_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
export BEAGLE_CELERY_SCHEDULER=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_SCHEDULER_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
export BEAGLE_CELERY_RUNNER=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_RUNNER_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
export BEAGLE_CELERY_NATS_NEW_REQUEST=`ps auxww | grep 'celery' | grep $BEAGLE_CELERY_NATS_NEW_REQUEST_PID_FILE | grep -v 'grep' | awk '{print $2}' | wc -l`
Expand All @@ -234,6 +244,7 @@ Includecmd: no
echo "Scheduler - $BEAGLE_CELERY_SCHEDULER"
echo "Runner - $BEAGLE_CELERY_RUNNER"
echo "NATS New Request - $BEAGLE_CELERY_NATS_NEW_REQUEST"
echo "Check Files" - $BEAGLE_CELERY_CHECK_FILES


%apprun beagle-env
Expand Down
18 changes: 18 additions & 0 deletions file_system/migrations/0041_file_available.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 2.2.28 on 2024-11-04 18:07

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("file_system", "0040_auto_20230901_2152"),
]

operations = [
migrations.AddField(
model_name="file",
name="available",
field=models.BooleanField(blank=True, default=True, null=True),
),
]
1 change: 1 addition & 0 deletions file_system/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class File(BaseModel):
sample = models.ForeignKey(Sample, null=True, on_delete=models.SET_NULL)
samples = ArrayField(models.CharField(max_length=100), default=list)
patient_id = models.CharField(max_length=100, null=True, blank=True)
available = models.BooleanField(default=True, null=True, blank=True)

def get_request(self):
return Request.objects.filter(request_id=self.request_id, latest=True).first()
Expand Down
40 changes: 40 additions & 0 deletions file_system/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import logging
from datetime import datetime
from celery import shared_task
from django.apps import apps
from django.conf import settings
from notifier.models import JobGroupNotifier

logger = logging.getLogger(__name__)
Expand All @@ -20,3 +24,39 @@ def populate_job_group_notifier_metadata(request_id, pi, investigator, assay):
job_group_notifier.investigator = investigator
job_group_notifier.assay = assay
job_group_notifier.save()


@shared_task
def check_fastq_files():
File = apps.get_model(app_label="file_system", model_name="File")
files = File.objects.filter(file_group=settings.IMPORT_FILE_GROUP)
current_date = datetime.now().strftime("%m_%d_%Y")
file_name = f"missing_files_report_{current_date}.txt"
report_file = open(os.path.join(settings.MISSING_FILES_REPORT_PATH, file_name), "w")
for f in files:
if not os.path.exists(f.path):
f.available = False
f.save()
report_file.write(f"{f.path}\n")
else:
if not f.available:
f.available = True
f.save()
report_file.close()
remove_oldest_file(settings.MISSING_FILES_REPORT_PATH)


def remove_oldest_file(directory):
oldest_date = None
oldest_file = None
count = 0
for filename in os.listdir(directory):
if filename.startswith("missing_files_report_") and filename.endswith(".txt"):
count += 1
date_str = filename[len("missing_files_report_") : -len(".txt")]
file_date = datetime.strptime(date_str, "%m_%d_%Y")
if oldest_date is None or file_date < oldest_date:
oldest_date = file_date
oldest_file = filename
if count > settings.MISSING_FILES_REPORT_COUNT and oldest_file:
os.remove(os.path.join(directory, oldest_file))