Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFlow - Consolidate Observation Frames #29

Open
wants to merge 45 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
de45ad4
Make Observation PSC - DataFlow
wtgee Mar 17, 2019
515a2f5
* ProcessPICID will make a PSC for each non-filtered source and will
wtgee Mar 17, 2019
dd45c8b
Adding run scripts
wtgee Mar 17, 2019
c295fbf
pep8
wtgee Mar 17, 2019
2466c10
Remove unused code
wtgee Mar 17, 2019
0c02240
Add new params to metadata file
wtgee Mar 17, 2019
e5942ef
Making the ProcessPICID a transform
wtgee Mar 18, 2019
b7e9a4a
Adding script to run dataflow
wtgee Mar 18, 2019
efb3495
For now DataFlow simply concatenates the files.
wtgee Mar 18, 2019
33a9559
Comment out more things not used
wtgee Mar 18, 2019
c49c68d
Making the simple output work
wtgee Mar 18, 2019
500c7c8
Merge remote-tracking branch 'origin/master' into df-make-observation…
wtgee Mar 20, 2019
729d8cc
Adding a README
wtgee Mar 20, 2019
48a8e6a
Remove earlier dataflow tests.
wtgee Mar 20, 2019
5dd5498
Output all attibutes to see if bucket event not working.
wtgee Mar 20, 2019
8023eb5
Use proper name for event bucket notification.
wtgee Mar 20, 2019
266d7d6
Update sha
wtgee Mar 20, 2019
bae7dc5
Merge branch 'master' of github.com:panoptes/panoptes-network into df…
wtgee Mar 20, 2019
f393e53
Don't update state upon receiving.
wtgee Mar 20, 2019
b56c2e9
Cleanup unused code.
wtgee Mar 20, 2019
1dedb24
Merge branch 'df-make-observation-psc' of github.com:panoptes/panopte…
wtgee Mar 20, 2019
e7115c2
Updating df template metadata file
wtgee Mar 20, 2019
c730b8f
* Small fixes
wtgee Mar 20, 2019
a3c9979
Merge branch 'df-make-observation-psc' of github.com:panoptes/panopte…
wtgee Mar 20, 2019
3cce404
Removing unused
wtgee Mar 20, 2019
94b3625
Merge branch 'df-make-observation-psc' of github.com:panoptes/panopte…
wtgee Mar 20, 2019
c95c179
Update container sha
wtgee Mar 21, 2019
afa3f86
Only respond to one message at a time (i.e. `pull` vs `subscribe`)
wtgee Mar 21, 2019
e761103
Better handling of ack and message.
wtgee Mar 21, 2019
cf2c01d
Adding script for deploying to kubernetes and clarifying script names.
wtgee Mar 21, 2019
ca42568
New build script for the `gce-find-similar-sources` that will automat…
wtgee Mar 21, 2019
3f401e4
Adding CF to respond to Observation PSC upload. This will in turn tri…
wtgee Mar 21, 2019
fde47a7
Fix re to find sequence_id
wtgee Mar 21, 2019
2650e04
* Adding a `force_new` paramaeter for not checking the existence of
wtgee Mar 23, 2019
53b56da
Auto-convert the CR2 files into separate RGB.
wtgee Mar 24, 2019
42a29f4
Fix method name
wtgee Mar 25, 2019
adac932
When comparing stamps align the index with the target.
wtgee Apr 30, 2019
1a8e3a7
Merge remote-tracking branch 'origin/develop' into df-make-observatio…
wtgee May 13, 2019
8ed1c6a
Merge branch 'develop' into df-make-observation-psc
wtgee May 14, 2019
e1dbc58
Removing github markdown
wtgee Oct 20, 2019
51fe696
Some mindor updates and cleanup for moving to panoptes-exp
wtgee Oct 20, 2019
9f02e8d
Move directory name
wtgee Oct 20, 2019
08d9ed3
Small log cleanup
wtgee Oct 20, 2019
ce39882
Updates to the update observation cf
wtgee Oct 20, 2019
cfd0e5a
Moving foldering
wtgee Oct 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions cf-get-state/README.md → cf-get-observation-state/README.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
Get Sequence/Image State
========================
Get Observation State
=====================

This folder defines a [Google Cloud Function](https://cloud.google.com/functions/).

Small helper function to lookup the `state` column on either a sequence or an
image in the `metadata` db.
image in the `metadata.observations` db.

Endpoint: https://us-central1-panoptes-survey.cloudfunctions.net/get-state
Endpoint: https://us-central1-panoptes-survey.cloudfunctions.net/get-observation-state

Can be passed either a `sequence_id` or an `image_id`.

Payload: JSON message of the form:

```json
{
'state': str,
'sequence_id': str,
'image_id': str
}
```
```json
{
'sequence_id': str,
'image_id': str
}
```

Deploy
------

[Google Documentation](https://cloud.google.com/functions/docs/deploying/filesystem)

From the directory containing the cloud function. The `entry_point` is the
name of the function in `main.py` that we want called and `header-to-db`
name of the function in `main.py` that we want called and `get-observation-state`
is the name of the Cloud Function we want to create.

```bash
gcloud functions deploy \
get-state \
get-observation-state \
--entry-point get-state \
--runtime python37 \
--trigger-http
Expand Down
4 changes: 2 additions & 2 deletions cf-get-state/deploy.sh → cf-get-observation-state/deploy.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash -e

echo "Deploying cloud function: cf-get-state"
echo "Deploying cloud function: cf-get-observation-state"

gcloud functions deploy \
get-state \
get-observation-state \
--entry-point get_state \
--runtime python37 \
--trigger-http
11 changes: 4 additions & 7 deletions cf-get-state/main.py → cf-get-observation-state/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
from psycopg2 import OperationalError
from psycopg2.pool import SimpleConnectionPool

PROJECT_ID = os.getenv('POSTGRES_USER', 'panoptes-survey')
BUCKET_NAME = os.getenv('BUCKET_NAME', 'panoptes-survey')

CONNECTION_NAME = os.getenv(
'INSTANCE_CONNECTION_NAME',
'panoptes-survey:us-central1:panoptes-meta'
'panoptes-exp:us-central1:panoptes-metadata'
)
DB_USER = os.getenv('POSTGRES_USER', 'panoptes')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD', None)
DB_NAME = os.getenv('POSTGRES_DATABASE', 'metadata')
DB_USER = os.getenv('DB_USER', 'panoptes')
DB_PASSWORD = os.getenv('DB_PASSWORD', None)
DB_NAME = os.getenv('DB_NAME', 'observations')

pg_config = {
'user': DB_USER,
Expand Down
2 changes: 1 addition & 1 deletion cf-image-received/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def image_received(request):
Triggered when file is uploaded to bucket.

FITS: Set header variables and then forward to endpoint for adding headers
to the metadatabase. The header is looked up from the file id, including the
to the metadatabase. The header is lokoed up from the file id, including the
storage bucket file generation id, which are stored into the headers.

CR2: Trigger creation of timelapse and jpg images.
Expand Down
41 changes: 41 additions & 0 deletions cf-observation-psc-created/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
File Upload to Bucket Storage
=============================

This folder defines a [Google Cloud Function](https://cloud.google.com/functions/).

Triggered when a new PSC file is uploaded for an observation.

The Observation PSC is created by the `df-make-observation-pc` job, which will
upload a CSV file to the `panoptes-observation-psc` bucket. This CF will listen
to that bucket and process new files:

1. Update the `metadata.sequences` table with the RA/Dec boundaries for
the sequence.
2. Send a PubSub message to the `find-similar-sources` topic to trigger
creation of the similar sources.

Endpoint: No public endpoint


Deploy
------

[Google Documentation](https://cloud.google.com/functions/docs/deploying/filesystem)

> :bulb: There is also a small convenience script called `deploy.sh` that does the same thing.
```bash
./deploy.sh
```

From the directory containing the cloud function. The `entry_point` is the
name of the function in `main.py` that we want called and `observation-psc-created`
is the name of the Cloud Function we want to create.

```bash
gcloud functions deploy \
observation-psc-created \
--entry-point observation_psc_created \
--runtime python37 \
--trigger-resource panoptes-observation-psc \
--trigger-event google.storage.object.finalize
```
8 changes: 8 additions & 0 deletions cf-observation-psc-created/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash -e

gcloud functions deploy \
observation-psc-created \
--entry-point observation_psc_created \
--runtime python37 \
--trigger-resource panoptes-observation-psc \
--trigger-event google.storage.object.finalize
53 changes: 53 additions & 0 deletions cf-observation-psc-created/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
import re
import requests

from flask import jsonify
from google.cloud import pubsub

PROJECT_ID = os.getenv('PROJECT_ID', 'panoptes-survey')

publisher = pubsub.PublisherClient()
PUB_TOPIC = os.getenv('PUB_TOPIC', 'find-similar-sources')
pubsub_topic = f'projects/{PROJECT_ID}/topics/{PUB_TOPIC}'

update_state_url = os.getenv(
'HEADER_ENDPOINT',
'https://us-central1-panoptes-survey.cloudfunctions.net/update-state'
)


def observation_psc_created(data, context):
""" Triggered when a new PSC file is uploaded for an observation.

The Observation PSC is created by the `df-make-observation-pc` job, which will
upload a CSV file to the `panoptes-observation-psc` bucket. This CF will listen
to that bucket and process new files:
1. Update the `metadata.sequences` table with the RA/Dec boundaries for
the sequence.
2. Send a PubSub message to the `find-similar-sources` topic to trigger
creation of the similar sources.

"""

object_id = data['id']

matches = re.match('panoptes-observation-psc/(PAN.{3}[/_].*[/_]20.{6}T.{6}).csv/*', object_id)
if matches is not None:
sequence_id = matches.group(1)
print(f'Found sequence_id {sequence_id}')
else:
msg = f"Cannot find matching sequence_id in {object_id}"
print(msg)
return jsonify(success=False, msg=msg)

# Update state
state = 'observation_psc_created'
print(f'Updating state for {sequence_id} to {state}')
requests.post(update_state_url, json={'sequence_id': sequence_id, 'state': state})

publisher.publish(pubsub_topic,
b'cf-observation-psc-created finished',
sequence_id=sequence_id)

return jsonify(success=True, msg="Received file: {object_id}")
3 changes: 3 additions & 0 deletions cf-observation-psc-created/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Flask
google-cloud-pubsub
requests
24 changes: 12 additions & 12 deletions cf-update-state/README.md → cf-update-observation-state/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,35 @@ Get Sequence/Image State
This folder defines a [Google Cloud Function](https://cloud.google.com/functions/).

Small helper function to update the `state` column on either a sequence or an
image in the `metadata` db.
image in the `metadata.observations` db.

Endpoint: https://us-central1-panoptes-survey.cloudfunctions.net/update-state
Endpoint: https://us-central1-panoptes-survey.cloudfunctions.net/update-observation-state

Can be passed either a `sequence_id` or an `image_id`.

Payload: JSON message of the form:

```json
{
'state': str,
'sequence_id': str,
'image_id': str
}
```
```json
{
'state': str,
'sequence_id': str,
'image_id': str
}
```

Deploy
------

[Google Documentation](https://cloud.google.com/functions/docs/deploying/filesystem)

From the directory containing the cloud function. The `entry_point` is the
name of the function in `main.py` that we want called and `header-to-db`
name of the function in `main.py` that we want called and `update-observation-state`
is the name of the Cloud Function we want to create.

```bash
gcloud functions deploy \
update-state \
--entry-point update-state \
update-observation-state \
--entry-point update_state \
--runtime python37 \
--trigger-http
```
Expand Down
9 changes: 9 additions & 0 deletions cf-update-observation-state/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash -e

echo "Deploying cloud function: cf-update-observation-state"

gcloud functions deploy \
update-observation-state \
--entry-point update_state \
--runtime python37 \
--trigger-http
11 changes: 4 additions & 7 deletions cf-update-state/main.py → cf-update-observation-state/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
from psycopg2 import OperationalError
from psycopg2.pool import SimpleConnectionPool

PROJECT_ID = os.getenv('POSTGRES_USER', 'panoptes-survey')
BUCKET_NAME = os.getenv('BUCKET_NAME', 'panoptes-survey')

CONNECTION_NAME = os.getenv(
'INSTANCE_CONNECTION_NAME',
'panoptes-survey:us-central1:panoptes-meta'
'panoptes-exp:us-central1:panoptes-metadata'
)
DB_USER = os.getenv('POSTGRES_USER', 'panoptes')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD', None)
DB_NAME = os.getenv('POSTGRES_DATABASE', 'metadata')
DB_USER = os.getenv('DB_USER', 'panoptes')
DB_PASSWORD = os.getenv('DB_PASSWORD', None)
DB_NAME = os.getenv('DB_NAME', 'observations')

pg_config = {
'user': DB_USER,
Expand Down
9 changes: 0 additions & 9 deletions cf-update-state/deploy.sh

This file was deleted.

15 changes: 15 additions & 0 deletions df-make-observation-psc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Make Observation PSC

Create a dataflow [job template](https://cloud.google.com/dataflow/docs/guides/templates/overview) that, given a sequence_id, will gather CSV files from the panoptes-detected-sources bucket and consolidate them into one large master PSC collection.

This file is then uploaded to the `panoptes-observation-psc` bucket, which will trigger a pubsub message (see the similar source finder [readme](https://github.com/panoptes/panoptes-network/tree/master/gce-find-similar-sources) for details).

The `deploy.sh` script will make a new version of the template stored in a storage
bucket and should be run any time the `makepsc.py` file changes.

> :bulb: Note: The `deploy.sh` script requires a python2.7 environment and the `apache-beam[gcp]` module.

The `run_dataflow.sh` script will run the job template with the DataFlow runner
(i.e. in the cloud) and requires a `sequence_id` parameter to run.

`run_locally.sh` will attempt to run the job locally.
13 changes: 13 additions & 0 deletions df-make-observation-psc/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash -e

PROJECT_ID='panoptes-survey'
BUCKET_NAME='panoptes-dataflow'
JOB_NAME='makepsc'

echo "Sending DataFlow template to template bucket."

python ${JOB_NAME}.py --runner DataflowRunner \
--project ${PROJECT_ID} \
--stage_location gs://${BUCKET_NAME}/${JOB_NAME}/staging \
--temp_location gs://${BUCKET_NAME}/${JOB_NAME}/temp \
--template_location gs://${BUCKET_NAME}/${JOB_NAME}/${JOB_NAME}
Loading