Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow: Completion Service sample data / metadata #487

Merged
merged 14 commits into from
Mar 25, 2024
Merged
30 changes: 30 additions & 0 deletions app/helpers/blob_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

# Blob service helper to handle interactions with blobs
module BlobHelper
def download_decompress_parse_gziped_json(blob_file_path)
JSON.parse(
ActiveSupport::Gzip.decompress(
ActiveStorage::Blob.service.download(blob_file_path)
)
)
end

def download_and_make_new_blob(blob_file_path:)
blob_id = nil
Tempfile.open do |tempfile|
# chunked download of blob file so mem doesn't get overwhelmed
ActiveStorage::Blob.service.download(blob_file_path) do |chunk|
tempfile.write(chunk.force_encoding('UTF-8'))
end
tempfile.rewind
file_blob = ActiveStorage::Blob.create_and_upload!(
io: tempfile,
filename: File.basename(blob_file_path)
)
blob_id = file_blob.signed_id
end

blob_id
end
end
1 change: 1 addition & 0 deletions app/models/samples_workflow_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ class SamplesWorkflowExecution < ApplicationRecord
belongs_to :workflow_execution
belongs_to :sample
has_many_attached :inputs
has_many :outputs, dependent: :destroy, class_name: 'Attachment', as: :attachable
end
110 changes: 80 additions & 30 deletions app/services/workflow_executions/completion_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,35 @@
module WorkflowExecutions
# Service used to complete a WorkflowExecution
class CompletionService < BaseService
include BlobHelper

def initialize(workflow_execution, params = {})
super(workflow_execution.submitter, params)

@workflow_execution = workflow_execution
@storage_service = ActiveStorage::Blob.service
@file_blob_list = []
@attachable_blobs_tuple_list = []
@output_base_path = "#{@workflow_execution.blob_run_directory}/output/"
end

def execute
return false unless @workflow_execution.completed?

run_output_data = parse_base_output_file
run_output_data = download_decompress_parse_gziped_json("#{@output_base_path}iridanext.output.json.gz")

output_file_paths = get_output_file_paths(run_output_data:)
# global run output files
output_global_file_paths = get_output_global_file_paths(run_output_data:)
process_global_file_paths(output_global_file_paths:)

output_file_paths&.each do |file_path|
download_and_make_blob(file_path:)
end
# per sample output files
output_samples_file_paths = get_output_samples_file_paths(run_output_data:)
process_sample_file_paths(output_samples_file_paths:)

unless @file_blob_list.empty?
Attachments::CreateService.new(
current_user, @workflow_execution, { files: @file_blob_list }
).execute
end
# per sample metadata
process_samples_metadata(run_output_data:)

# attach blob lists to attachables
attach_blobs_to_attachables

@workflow_execution.state = 'finalized'

Expand All @@ -38,32 +42,78 @@ def execute

private

def parse_base_output_file
JSON.parse(
ActiveSupport::Gzip.decompress(
ActiveStorage::Blob.service.download("#{@output_base_path}iridanext.output.json.gz")
)
)
def get_output_global_file_paths(run_output_data:)
return nil unless run_output_data['files']['global']

get_path_mapping(run_output_data['files']['global'])
end

def get_output_file_paths(run_output_data:)
return nil unless run_output_data['files']['global']
def get_output_samples_file_paths(run_output_data:)
return nil unless run_output_data['files']['samples']

samples_paths = []
run_output_data['files']['samples'].each do |sample_puid, sample_data_paths|
data_paths = get_path_mapping(sample_data_paths)

samples_paths.append({ sample_puid:, data_paths: })
end
samples_paths
end

run_output_data['files']['global'].map { |entry| @output_base_path + entry['path'] }
def get_path_mapping(data_paths)
data_paths.map { |entry| @output_base_path + entry['path'] }
end

def download_and_make_blob(file_path:)
Tempfile.open do |tempfile|
# chunked download of blob file so mem doesn't get overwhelmed
@storage_service.download(file_path) do |chunk|
tempfile.write(chunk.force_encoding('UTF-8'))
def process_global_file_paths(output_global_file_paths:)
# Handle ouput files for workflow execution
global_file_blob_list = []
output_global_file_paths&.each do |blob_file_path|
global_file_blob_list.append(download_and_make_new_blob(blob_file_path:))
end
@attachable_blobs_tuple_list.append({ attachable: @workflow_execution,
blob_id_list: global_file_blob_list })
end

def process_sample_file_paths(output_samples_file_paths:)
# Handle output files for samples workflow execution
output_samples_file_paths&.each do |sample_file_paths_tuple| # :sample_puid, :data_paths
sample_file_blob_list = []
sample_file_paths_tuple[:data_paths]&.each do |blob_file_path|
sample_file_blob_list.append(download_and_make_new_blob(blob_file_path:))
end
tempfile.rewind
file_blob = ActiveStorage::Blob.create_and_upload!(
io: tempfile,
filename: File.basename(file_path)

# This assumes the sample puid matches, i.e. happy path
samples_workflow_execution = get_samples_workflow_executions_by_sample_puid(
puid: sample_file_paths_tuple[:sample_puid]
)
@file_blob_list.append(file_blob.signed_id)

@attachable_blobs_tuple_list.append({ attachable: samples_workflow_execution,
blob_id_list: sample_file_blob_list })
end
end

def get_samples_workflow_executions_by_sample_puid(puid:)
@workflow_execution.samples_workflow_executions.joins(:sample).find_by(sample: { puid: })
end

def attach_blobs_to_attachables
return if @attachable_blobs_tuple_list.empty?

@attachable_blobs_tuple_list&.each do |tuple| # :attachable, :blob_id_list
Attachments::CreateService.new(
current_user, tuple[:attachable], { files: tuple[:blob_id_list] }
).execute
end
end

def process_samples_metadata(run_output_data:)
return nil unless run_output_data['metadata']['samples']

run_output_data['metadata']['samples']&.each do |sample_puid, sample_metadata|
# This assumes the sample puid matches, i.e. happy path
samples_workflow_execution = get_samples_workflow_executions_by_sample_puid(puid: sample_puid)
samples_workflow_execution.metadata = sample_metadata
samples_workflow_execution.save!
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true

# Migration to add metadata column to samples_workflow_executions table
class AddMetadataToSamplesWorkflowExecutions < ActiveRecord::Migration[7.1]
def change
add_column :samples_workflow_executions, :metadata, :jsonb, null: false, default: {}
end
end
3 changes: 2 additions & 1 deletion db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions test/active_storage_test_case.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# frozen_string_literal: true

require 'test_helper'
require 'test_helpers/blob_helpers'
require 'test_helpers/blob_test_helpers'

class ActiveStorageTestCase < ActiveSupport::TestCase
include BlobHelpers
include BlobTestHelpers
end
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
even more text here
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"files": {
"global": [
{"path": "summary.txt"}
],
"samples": {
"workflow_execution_completion_test_puid_2": [
{"path": "analysis3.txt"}
]
}
},
"metadata": {
"samples": {
"workflow_execution_completion_test_puid_1": {
"organism": "an organism",
"number": 1
}
}
}
}
1 change: 1 addition & 0 deletions test/fixtures/files/blob_outputs/missing_entry/summary.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is a text file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"files": {
},
"metadata": {
}
}
Binary file not shown.
9 changes: 9 additions & 0 deletions test/fixtures/files/blob_outputs/normal/iridanext.output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"files": {
"global": [
{"path": "summary.txt"}
]
},
"metadata": {
}
}
Binary file not shown.
1 change: 1 addition & 0 deletions test/fixtures/files/blob_outputs/normal2/analysis1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
some text in here
1 change: 1 addition & 0 deletions test/fixtures/files/blob_outputs/normal2/analysis2.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
and more in here
1 change: 1 addition & 0 deletions test/fixtures/files/blob_outputs/normal2/analysis3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
even more text here
28 changes: 28 additions & 0 deletions test/fixtures/files/blob_outputs/normal2/iridanext.output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"files": {
"global": [
{"path": "summary.txt"}
],
"samples": {
"workflow_execution_completion_test_puid_1": [
{"path": "analysis1.txt"},
{"path": "analysis2.txt"}
],
"workflow_execution_completion_test_puid_2": [
{"path": "analysis3.txt"}
]
}
},
"metadata": {
"samples": {
"workflow_execution_completion_test_puid_1": {
"organism": "an organism",
"number": 1
},
"workflow_execution_completion_test_puid_2": {
"organism": "a different organism",
"number": 2
}
}
}
}
1 change: 1 addition & 0 deletions test/fixtures/files/blob_outputs/normal2/summary.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is a text file.
16 changes: 16 additions & 0 deletions test/fixtures/samples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,19 @@ sample40:
puid: <%= Irida::PersistentUniqueId.generate(object_class: Sample, time: 66.days.ago) %>
created_at: <%= 2.weeks.ago %>
updated_at: <%= 2.days.ago %>

sample41:
name: WorkflowExecutions test sample 1
description: Sample description.
project_id: <%= ActiveRecord::FixtureSet.identify(:project35, :uuid) %>
puid: "workflow_execution_completion_test_puid_1"
created_at: <%= 2.weeks.ago %>
updated_at: <%= 2.days.ago %>

sample42:
name: WorkflowExecutions test sample 2
description: Sample description.
project_id: <%= ActiveRecord::FixtureSet.identify(:project35, :uuid) %>
puid: "workflow_execution_completion_test_puid_2"
created_at: <%= 2.weeks.ago %>
updated_at: <%= 2.days.ago %>
36 changes: 36 additions & 0 deletions test/fixtures/samples_workflow_executions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,39 @@ sample1_irida_next_example_new:
"fastq_1": <%= "gid://irida/Attachment/#{ActiveRecord::FixtureSet.identify(:attachment1, :uuid)}" %>,
"fastq_2": ""
}

sample41_irida_next_example_completed_c:
sample_id: <%= ActiveRecord::FixtureSet.identify(:sample41, :uuid) %>
workflow_execution_id: <%= ActiveRecord::FixtureSet.identify(:irida_next_example_completed_c, :uuid) %>
samplesheet_params: {
"sample": <%= "Sample_#{ActiveRecord::FixtureSet.identify(:sample41, :uuid)}" %>,
"fastq_1": <%= "gid://irida/Attachment/#{ActiveRecord::FixtureSet.identify(:attachment1, :uuid)}" %>,
"fastq_2": ""
}

sample42_irida_next_example_completed_c:
sample_id: <%= ActiveRecord::FixtureSet.identify(:sample42, :uuid) %>
workflow_execution_id: <%= ActiveRecord::FixtureSet.identify(:irida_next_example_completed_c, :uuid) %>
samplesheet_params: {
"sample": <%= "Sample_#{ActiveRecord::FixtureSet.identify(:sample42, :uuid)}" %>,
"fastq_1": <%= "gid://irida/Attachment/#{ActiveRecord::FixtureSet.identify(:attachment2, :uuid)}" %>,
"fastq_2": ""
}

sample41_irida_next_example_completed_d:
sample_id: <%= ActiveRecord::FixtureSet.identify(:sample41, :uuid) %>
workflow_execution_id: <%= ActiveRecord::FixtureSet.identify(:irida_next_example_completed_d, :uuid) %>
samplesheet_params: {
"sample": <%= "Sample_#{ActiveRecord::FixtureSet.identify(:sample41, :uuid)}" %>,
"fastq_1": <%= "gid://irida/Attachment/#{ActiveRecord::FixtureSet.identify(:attachment1, :uuid)}" %>,
"fastq_2": ""
}

sample42_irida_next_example_completed_d:
sample_id: <%= ActiveRecord::FixtureSet.identify(:sample42, :uuid) %>
workflow_execution_id: <%= ActiveRecord::FixtureSet.identify(:irida_next_example_completed_d, :uuid) %>
samplesheet_params: {
"sample": <%= "Sample_#{ActiveRecord::FixtureSet.identify(:sample42, :uuid)}" %>,
"fastq_1": <%= "gid://irida/Attachment/#{ActiveRecord::FixtureSet.identify(:attachment2, :uuid)}" %>,
"fastq_2": ""
}
Loading
Loading