Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Celery job processing, routes, & BatchAggregation lib #783

Merged
merged 26 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .github/workflows/python-versions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ on:
jobs:
build:
runs-on: ubuntu-latest
services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
strategy:
matrix:
python-version: [3.8, 3.9, "3.10", "3.11"]
Expand All @@ -30,11 +40,14 @@ jobs:
- name: Run tests
env:
TRAVIS: true # one test is skipped on CI and looks for this env value
REDIS_HOST: redis
CELERY_BROKER_URL: redis://localhost:6379/0
CELERY_RESULT_BACKEND: redis://localhost:6379/0
run: |
coverage run
coverage report
- name: Coveralls
if: ${{ matrix.python-version == 3.10 }}
env:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls --service=github
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@ endpoints.yml

.vscode/
.noseids
tmp/*
.DS_Store
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ COPY . .
RUN pip install -U .[online,test,doc]

# make documentation
RUN /bin/bash -lc ./make_docs.sh
RUN /bin/bash -lc ./scripts/make_docs.sh

ADD ./ /usr/src/aggregation

ARG REVISION=''
ENV REVISION=$REVISION

# load configs and start flask app
CMD ["bash", "./start-flask.sh"]
CMD ["bash", "./scripts/start-flask.sh"]
51 changes: 48 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,59 @@ services:
args:
REVISION: fake-git-sha-id
volumes:
- ./panoptes_aggregation:/usr/src/aggregation/panoptes_aggregation
- ./:/usr/src/aggregation
- ~/.aws:/root/.aws
environment:
- AWS_REGION=${AWS_REGION}
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN}
- AWS_SECURITY_TOKEN=${AWS_SECURITY_TOKEN}
- LISTEN_PORT=5000
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- FLASK_DEBUG=1
- FLASK_ENV=development
- LISTEN_PORT=4000
ports:
- "5000:5000"
- "4000:4000"
links:
- redis:redis

worker:
build:
context: ./
args:
REVISION: fake-git-sha-id
command: celery --app panoptes_aggregation.tasks.celery worker --loglevel=info
volumes:
- ./:/usr/src/aggregation
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- FLASK_ENV=development
links:
- redis:redis
depends_on:
- redis

dashboard:
build: .
command: celery --app panoptes_aggregation.tasks.celery flower --port=5555 --broker=redis://redis:6379/0
ports:
- 5556:5555
environment:
- FLASK_DEBUG=1
- APP_SETTINGS=project.server.config.DevelopmentConfig
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/0
links:
- redis:redis
depends_on:
- redis
- worker

redis:
image: redis
command: redis-server --appendonly yes
120 changes: 0 additions & 120 deletions kubernetes/deployment-production.tmpl

This file was deleted.

88 changes: 88 additions & 0 deletions panoptes_aggregation/batch_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from celery import Celery
import json
import pandas as pd
import os
import urllib3
from os import getenv

from panoptes_client import Panoptes, Project, Workflow
from panoptes_aggregation.workflow_config import workflow_extractor_config, workflow_reducer_config
from panoptes_aggregation.scripts import batch_utils
from panoptes_client.panoptes import PanoptesAPIException

import logging
panoptes_client_logger = logging.getLogger('panoptes_client').setLevel(logging.ERROR)

celery = Celery(__name__)
celery.conf.broker_url = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379")
celery.conf.result_backend = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379")

zwolf marked this conversation as resolved.
Show resolved Hide resolved
@celery.task(name="run_aggregation")
def run_aggregation(project_id, workflow_id, user_id):
ba = BatchAggregator(project_id, workflow_id, user_id)
exports = ba.save_exports()
wf_df = ba.process_wf_export(ba.wf_csv)
cls_df = ba.process_cls_export(ba.cls_csv)

extractor_config = workflow_extractor_config(ba.tasks)
extracted_data = batch_utils.batch_extract(cls_df, extractor_config)

reducer_config = workflow_reducer_config(extractor_config)
reduced_data = batch_utils.batch_reduce(extracted_data, reducer_config)

zwolf marked this conversation as resolved.
Show resolved Hide resolved
class BatchAggregator:
"""
Bunch of stuff to manage a batch aggregation run
"""

def __init__(self, project_id, workflow_id, user_id):
self.project_id = project_id
self.workflow_id = workflow_id
self.user_id = user_id
self._connect_api_client()

def save_exports(self):
cls_export = Workflow(self.workflow_id).describe_export('classifications')
full_cls_url = cls_export['media'][0]['src']
wf_export = Project(self.project_id).describe_export('workflows')
full_wf_url = wf_export['media'][0]['src']
cls_file = f'tmp/{self.workflow_id}_cls_export.csv'
self._download_export(full_cls_url, cls_file)
wf_file = f'tmp/{self.project_id}_workflow_export.csv'
self._download_export(full_wf_url, wf_file)
self.cls_csv = cls_file
self.wf_csv = wf_file
return {'cls_csv': cls_file, 'wf_csv': wf_file}

def process_wf_export(self, wf_csv):
self.wf_df = pd.read_csv(wf_csv)
self.wf_maj_version = self.wf_df.query(f'workflow_id == {self.workflow_id}')['version'].max()
self.wf_min_version = self.wf_df.query(f'workflow_id == {self.workflow_id} & version == {self.wf_maj_version}')['minor_version'].max()
self.workflow_version = f'{self.wf_maj_version}.{self.wf_min_version}'
self.workflow_row = self.wf_df.query(f'workflow_id == {self.workflow_id} & minor_version == {self.wf_min_version}')
self.tasks = json.loads(self.workflow_row.iloc[0]['tasks'])
return self.wf_df

def process_cls_export(self, cls_csv):
cls_df = pd.read_csv(cls_csv)
self.cls_df = cls_df.query(f'workflow_version == {self.workflow_version}')
return self.cls_df

def _download_export(self, url, filepath):
http = urllib3.PoolManager()
r = http.request('GET', url, preload_content=False)
with open(filepath, 'wb') as out:
while True:
data = r.read(65536)
if not data:
break
out.write(data)
r.release_conn()

def _connect_api_client(self):
# connect to the API only once for this function request
Panoptes.connect(
endpoint=getenv('PANOPTES_URL', 'https://panoptes.zooniverse.org/'),
client_id=getenv('PANOPTES_CLIENT_ID'),
client_secret=getenv('PANOPTES_CLIENT_SECRET')
)
20 changes: 20 additions & 0 deletions panoptes_aggregation/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from panoptes_aggregation import reducers
from panoptes_aggregation import extractors
from panoptes_aggregation import running_reducers
from panoptes_aggregation import batch_aggregation
from panoptes_aggregation import __version__
import numpy as np
from celery.result import AsyncResult


# see https://stackoverflow.com/a/75666126
Expand Down Expand Up @@ -115,6 +117,24 @@ def index():
for route, route_function in panoptes.panoptes.items():
application.route('/panoptes/{0}'.format(route), methods=['POST', 'PUT'])(lambda: route_function(request.args.to_dict(), request.get_json()))

@application.route('/run_aggregation', methods=['POST'])
def run_aggregation():
content = request.json
project_id = content['project_id']
workflow_id = content['workflow_id']
user_id = content['user_id']
task = batch_aggregation.run_aggregation.delay(project_id, workflow_id, user_id)
return json.dumps({"task_id": task.id}), 202

@application.route('/tasks/<task_id>', methods=['GET'])
def get_status(task_id):
task_result = AsyncResult(task_id)
result = {
'task_id': task_id,
'task_status': task_result.status
}
return jsonify(result), 200

@application.route('/docs')
def web_docs():
return application.send_static_file('index.html')
Expand Down
Empty file.
Loading
Loading