Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Celery job processing, routes, & BatchAggregation lib #783

Merged
merged 26 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/workflows/python-versions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ on:
jobs:
build:
runs-on: ubuntu-latest
services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
strategy:
matrix:
python-version: [3.8, 3.9, "3.10", "3.11"]
Expand All @@ -30,11 +40,14 @@ jobs:
- name: Run tests
env:
TRAVIS: true # one test is skipped on CI and looks for this env value
REDIS_HOST: redis
CELERY_BROKER_URL: redis://localhost:6379/0
CELERY_RESULT_BACKEND: redis://localhost:6379/0
run: |
coverage run
coverage report
- name: Coveralls
if: ${{ matrix.python-version == 3.10 }}
env:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls --service=github
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ endpoints.yml

.vscode/
.noseids
tmp/*
.DS_Store
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ COPY . .
RUN pip install -U .[online,test,doc]

# make documentation
RUN /bin/bash -lc ./make_docs.sh
RUN /bin/bash -lc ./scripts/make_docs.sh

ADD ./ /usr/src/aggregation

ARG REVISION=''
ENV REVISION=$REVISION

# load configs and start flask app
CMD ["bash", "./start-flask.sh"]
CMD ["bash", "./scripts/start-flask.sh"]
51 changes: 48 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,59 @@ services:
args:
REVISION: fake-git-sha-id
volumes:
- ./panoptes_aggregation:/usr/src/aggregation/panoptes_aggregation
- ./:/usr/src/aggregation
- ~/.aws:/root/.aws
environment:
- AWS_REGION=${AWS_REGION}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}
- AWS_SECURITY_TOKEN=${AWS_SECURITY_TOKEN}
- LISTEN_PORT=5000
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- FLASK_DEBUG=1
- FLASK_ENV=development
- LISTEN_PORT=4000
ports:
- "5000:5000"
- "4000:4000"
links:
- redis:redis

worker:
build:
context: ./
args:
REVISION: fake-git-sha-id
command: celery --app panoptes_aggregation.tasks.celery worker --loglevel=info
volumes:
- ./:/usr/src/aggregation
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- FLASK_ENV=development
links:
- redis:redis
depends_on:
- redis

dashboard:
build: .
command: celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=redis://redis:6379/0
ports:
- 5556:5555
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
links:
- redis:redis
depends_on:
- redis
- worker

redis:
image: redis
command: redis-server --appendonly yes
120 changes: 0 additions & 120 deletions kubernetes/deployment-production.tmpl

This file was deleted.

136 changes: 136 additions & 0 deletions panoptes_aggregation/batch_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from celery import Celery
import json
import pandas as pd
import os
import urllib3
from shutil import make_archive
import uuid

from azure.storage.blob import BlobServiceClient

from panoptes_client import Panoptes, Project, Workflow
from panoptes_aggregation.workflow_config import workflow_extractor_config
from panoptes_aggregation.scripts import batch_utils

import logging
panoptes_client_logger = logging.getLogger('panoptes_client').setLevel(logging.ERROR)

celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")

zwolf marked this conversation as resolved.
Show resolved Hide resolved
@celery.task(name="run_aggregation")
def run_aggregation(project_id, workflow_id, user_id):
ba = BatchAggregator(project_id, workflow_id, user_id)
ba.save_exports()

wf_df = ba.process_wf_export(ba.wf_csv)
cls_df = ba.process_cls_export(ba.cls_csv)

extractor_config = workflow_extractor_config(ba.tasks)
extracted_data = batch_utils.batch_extract(cls_df, extractor_config)

batch_standard_reducers = {
'question_extractor': ['question_reducer', 'question_consensus_reducer'],
'survey_extractor': ['survey_reducer']
}

for task_type, extract_df in extracted_data.items():
extract_df.to_csv(f'{ba.output_path}/{ba.workflow_id}_{task_type}.csv')
reducer_list = batch_standard_reducers[task_type]
reduced_data = {}

for reducer in reducer_list:
# This is an override. The workflow_reducer_config method returns a config object
# that is incompatible with the batch_utils batch_reduce method
reducer_config = {'reducer_config': {reducer: {}}}
reduced_data[reducer] = batch_utils.batch_reduce(extract_df, reducer_config)
filename = f'{ba.output_path}/{ba.workflow_id}_reductions.csv'
reduced_data[reducer].to_csv(filename, mode='a')
ba.upload_files()

# hit up panoptes, let em know you're done

zwolf marked this conversation as resolved.
Show resolved Hide resolved
class BatchAggregator:
"""
Bunch of stuff to manage a batch aggregation run
"""

def __init__(self, project_id, workflow_id, user_id):
self.project_id = project_id
self.workflow_id = workflow_id
self.user_id = user_id
self._generate_uuid()
self._connect_api_client()

def save_exports(self):
self.output_path = f'tmp/{self.workflow_id}'
os.mkdir(self.output_path)

cls_export = Workflow(self.workflow_id).describe_export('classifications')
full_cls_url = cls_export['media'][0]['src']
cls_file = f'{self.output_path}/{self.workflow_id}_cls_export.csv'
self._download_export(full_cls_url, cls_file)

wf_export = Project(self.project_id).describe_export('workflows')
full_wf_url = wf_export['media'][0]['src']
wf_file = f'{self.output_path}/{self.workflow_id}_workflow_export.csv'
self._download_export(full_wf_url, wf_file)

self.cls_csv = cls_file
self.wf_csv = wf_file
return {'classifications': cls_file, 'workflows': wf_file}

def process_wf_export(self, wf_csv):
self.wf_df = pd.read_csv(wf_csv)
self.wf_maj_version = self.wf_df.query(f'workflow_id == {self.workflow_id}')['version'].max()
self.wf_min_version = self.wf_df.query(f'workflow_id == {self.workflow_id} & version == {self.wf_maj_version}')['minor_version'].max()
self.workflow_version = f'{self.wf_maj_version}.{self.wf_min_version}'
self.workflow_row = self.wf_df.query(f'workflow_id == {self.workflow_id} & minor_version == {self.wf_min_version}')
self.tasks = json.loads(self.workflow_row.iloc[0]['tasks'])
return self.wf_df

def process_cls_export(self, cls_csv):
cls_df = pd.read_csv(cls_csv)
self.cls_df = cls_df.query(f'workflow_version == {self.workflow_version}')
return self.cls_df

def connect_blob_storage(self):
connect_str = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
self.blob_service_client = BlobServiceClient.from_connection_string(connect_str)
self.blob_service_client.create_container(name=self.id)

def upload_file_to_storage(self, container_name, filepath):
blob = filepath.split('/')[-1]
blob_client = self.blob_service_client.get_blob_client(container=container_name, blob=blob)
with open(file=filepath, mode="rb") as data:
blob_client.upload_blob(data, overwrite=True)

def upload_files(self):
self.connect_blob_storage()
reductions_file = f'{self.output_path}/{self.workflow_id}_reductions.csv'
self.upload_file_to_storage(self.id, reductions_file)
zipfile = make_archive(f'tmp/{self.id}', 'zip', self.output_path)
self.upload_file_to_storage(self.id, zipfile)

def _generate_uuid(self):
self.id = uuid.uuid4().hex

def _download_export(self, url, filepath):
http = urllib3.PoolManager()
r = http.request('GET', url, preload_content=False)
with open(filepath, 'wb') as out:
while True:
data = r.read(65536)
if not data:
break
out.write(data)
r.release_conn()

def _connect_api_client(self):
# connect to the API only once for this function request
Panoptes.connect(
endpoint=os.getenv('PANOPTES_URL', 'https://panoptes.zooniverse.org/'),
client_id=os.getenv('PANOPTES_CLIENT_ID'),
client_secret=os.getenv('PANOPTES_CLIENT_SECRET')
)
Loading
Loading