Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate pipeline to luigi. #454

Open
stepan-anokhin opened this issue Dec 15, 2021 · 3 comments
Open

Migrate pipeline to luigi. #454

stepan-anokhin opened this issue Dec 15, 2021 · 3 comments
Assignees
Labels
python Pull requests that update Python code tech debt

Comments

@stepan-anokhin
Copy link
Collaborator

Experience from #444 shows that Luigi works really great for defining and managing the data-processing pipeline.

It has multiple advantages:

  • Provides clear way to split pipeline into mutually-dependent subtasks, which is easy to use and understand
  • Facilitates lazy subtask execution
  • Provides very flexible framework, e.g. it allows to group tasks and reuse existing logic (e.g. via task inheritance).
  • It will be straightforward to convert existing pipeline routines into Luigi tasks (almost 1-to-1 correspondence).

Limitations:

  • Luigi doesn't support too fine-grain tasks. Luigi supports ~10k tasks in the same run, but not millions of tasks. We will not be able to have have e.g. 1 task per 1 file. But that's not a big problem, as we will simply need to batch required work, and we already do that.

Required effort:

  • Investigate how well luigi will integrate with Celery.
  • Implement progress-monitoring framework compatible with Luigi (each task should have its progress monitor which emits messages using Luigi's callback API).
  • Convert existing pipeline routins to Luigi tasks.

ETA is 2d.

@stepan-anokhin stepan-anokhin added enhancement New feature or request python Pull requests that update Python code tech debt and removed enhancement New feature or request labels Dec 15, 2021
@stepan-anokhin stepan-anokhin self-assigned this Dec 15, 2021
@johnhbenetech johnhbenetech reopened this Mar 18, 2022
@johnhbenetech
Copy link
Member

johnhbenetech commented Mar 18, 2022

Error in current version when attempting to run process directory. error is logged in rpc container:

[2022-03-18 17:09:05,565: INFO] [luigi-interface] Loaded []
[2022-03-18 17:09:05,821: ERROR] [task_queue.winnow_task] Task raised exception: cannot import name 'remote'
Traceback (most recent call last):
  File "/project/task_queue/winnow_task.py", line 55, in wrapper
    return task(*task_args, **task_kwargs)
  File "/project/task_queue/tasks.py", line 31, in process_directory
    from .luigi_support import LuigiRootProgressMonitor, run_luigi
  File "/project/task_queue/luigi_support.py", line 6, in <module>
    from winnow.pipeline.luigi.platform import JusticeAITask
  File "/project/winnow/pipeline/luigi/platform.py", line 10, in <module>
    from winnow.pipeline.pipeline_context import PipelineContext
  File "/project/winnow/pipeline/pipeline_context.py", line 18, in <module>
    from winnow import remote
ImportError: cannot import name 'remote'

@johnhbenetech
Copy link
Member

johnhbenetech commented Mar 19, 2022

@stepan-anokhin process-directory indicates sucessful run but doesnt actually extract features. It seems to only run EXIF extraction:


[2022-03-19 21:34:45,601: INFO] [task_queue.winnow_task] Initiating task 'task_queue.tasks.process_directory[e0f322cd-9250-4d56-88d3-4fad4f904d09]': args=(<@task: task_queue.tasks.process_directory of winnow-pipeline at 0x7f0e8f143860>,), kwargs={'directory': '.', 'frame_sampling': None, 'save_frames': None, 'filter_dark': True, 'dark_threshold': None, 'extensions': ['mp4', 'ogv', 'webm', 'avi', 'mkv'], 'match_distance': None, 'min_duration': None}
[2022-03-19 21:34:45,605: WARNING] [py.warnings] /anaconda/envs/winnow/lib/python3.6/site-packages/luigi/configuration/core.py:65: UserWarning: Config file does not exist: /project/config/luigi.cfg
  warnings.warn("Config file does not exist: {path}".format(path=path))

[2022-03-19 21:34:45,654: INFO] [luigi-interface] Loaded []
[2022-03-19 21:34:47,896: INFO] [task_queue.tasks] Loading config file
[2022-03-19 21:34:47,906: INFO] [luigi-interface] Informed scheduler that task   DBMatchesTask____project_config__500_df370cec9e   has status   DONE
[2022-03-19 21:34:47,912: INFO] [luigi-interface] Informed scheduler that task   ExifTask__project_config____6869f0d921   has status   PENDING
[2022-03-19 21:34:47,913: INFO] [luigi-interface] Done scheduling tasks
[2022-03-19 21:34:47,913: INFO] [luigi-interface] Running Worker with 1 processes
[2022-03-19 21:34:47,914: INFO] [luigi-interface] [pid 37] Worker Worker(salt=945385268, workers=1, host=83c5957786cd, username=root, pid=37) running   ExifTask(config_path=/project/config/config.yaml, prefix=.)
[2022-03-19 21:34:47,914: INFO] [task.ExifTask] Extracting EXIF metadata from files with prefix '.' created after None
[2022-03-19 21:35:04,826: INFO] [task.ExifTask] Starting EXIF extraction for 306 files
[2022-03-19 21:35:12,617: INFO] [task.ExifTask] Extracted EXIF for 306 files
[2022-03-19 21:35:12,617: INFO] [task.ExifTask] Converting EXIF to DataFrame
[2022-03-19 21:35:12,780: INFO] [task.ExifTask] Done converting EXIF to DataFrame
[2022-03-19 21:35:12,781: INFO] [task.ExifTask] Saving EXIF metadata to data/representations/exif/./exif__2022_03_19_213447914679.csv
[2022-03-19 21:35:12,812: INFO] [luigi-interface] [pid 37] Worker Worker(salt=945385268, workers=1, host=83c5957786cd, username=root, pid=37) done      ExifTask(config_path=/project/config/config.yaml, prefix=.)
[2022-03-19 21:35:12,813: INFO] [luigi-interface] Informed scheduler that task   ExifTask__project_config____6869f0d921   has status   DONE
[2022-03-19 21:35:12,814: INFO] [luigi-interface] 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 DBMatchesTask(...)
* 1 ran successfully:
    - 1 ExifTask(config_path=/project/config/config.yaml, prefix=.)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

image

@johnhbenetech
Copy link
Member

johnhbenetech commented Mar 19, 2022

When manually attempting to run via terminal:

(winnow) tf-docker /project > python extract_features.py [2022-03-19 21:40:30,768: INFO] Loaded config file [2022-03-19 21:40:30,768: INFO] Searching for Dataset Video Files [2022-03-19 21:40:30,778: INFO] Cannot detect storage type in /project/data/representations/frames. Using default factory instead. [2022-03-19 21:40:30,779: INFO] Creating intermediate representations directory: /project/data/representations/frames [2022-03-19 21:40:30,779: INFO] Writing a new storage manifset StorageManifest(type='simple', version=1), manifest path: /project/data/representations/frames/.storage.json [2022-03-19 21:40:30,779: INFO] Cannot detect storage type in /project/data/representations/frame_level. Using default factory instead. [2022-03-19 21:40:30,779: INFO] Creating intermediate representations directory: /project/data/representations/frame_level [2022-03-19 21:40:30,779: INFO] Writing a new storage manifset StorageManifest(type='simple', version=1), manifest path: /project/data/representations/frame_level/.storage.json [2022-03-19 21:40:30,779: INFO] Cannot detect storage type in /project/data/representations/video_level. Using default factory instead. [2022-03-19 21:40:30,779: INFO] Creating intermediate representations directory: /project/data/representations/video_level [2022-03-19 21:40:30,779: INFO] Writing a new storage manifset StorageManifest(type='simple', version=1), manifest path: /project/data/representations/video_level/.storage.json [2022-03-19 21:40:30,780: INFO] Cannot detect storage type in /project/data/representations/video_signatures. Using default factory instead. [2022-03-19 21:40:30,780: INFO] Creating intermediate representations directory: /project/data/representations/video_signatures [2022-03-19 21:40:30,780: INFO] Writing a new storage manifset StorageManifest(type='simple', version=1), manifest path: /project/data/representations/video_signatures/.storage.json Traceback (most recent call last): File "extract_features.py", line 55, in <module> main() File "/anaconda/envs/winnow/lib/python3.6/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/anaconda/envs/winnow/lib/python3.6/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/anaconda/envs/winnow/lib/python3.6/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/anaconda/envs/winnow/lib/python3.6/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "extract_features.py", line 50, in main extract_video_signatures(files=videos, pipeline=pipeline) File "/project/winnow/pipeline/extract_video_signatures.py", line 22, in extract_video_signatures remaining_video_paths = list(missing_video_signatures(files, pipeline)) File "/project/winnow/pipeline/extract_video_signatures.py", line 49, in missing_video_signatures if not signatures.exists(pipeline.filekey(file_path)): AttributeError: 'PipelineContext' object has no attribute 'filekey'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
python Pull requests that update Python code tech debt
Projects
None yet
Development

No branches or pull requests

2 participants