Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish integrating new parser into pipeline #186

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions async/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ go_library(
importpath = "github.com/RMI/pacta/async",
visibility = ["//visibility:public"],
deps = [
"//async/parsed",
"//blob",
"//pacta",
"//task",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap_exp//zapfield",
],
)
159 changes: 99 additions & 60 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
package async

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -17,11 +17,13 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventgrid/publisher"
"github.com/RMI/pacta/async/parsed"
"github.com/RMI/pacta/blob"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapfield"
)

type Config struct {
Expand Down Expand Up @@ -70,69 +72,125 @@ func New(cfg *Config) (*Handler, error) {

// TODO: Send a notification when parsing fails.
func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest, destPortfolioContainer string) error {
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where

// Make the directories we require first. We use these instead of
// /mnt/{input,output} because the base image (quite reasonably) uses a non-root
// user, so we can't be creating directories in the root filesystem all willy
// nilly.
inputDir := filepath.Join("/", "home", "portfolio-parser", "input")
outputDir := filepath.Join("/", "home", "portfolio-parser", "output")

if err := os.MkdirAll(inputDir, 0700); err != nil {
return fmt.Errorf("failed to create input dir to store input CSVs: %w", err)
}
if err := os.MkdirAll(outputDir, 0700); err != nil {
return fmt.Errorf("failed to create output dir to store output CSVs: %w", err)
}

// Load the portfolio from blob storage, place it in /mnt/inputs, where
// the `process_portfolios.R` script expects it to be.
localCSVToBlob := make(map[string]pacta.BlobURI)
for _, srcURI := range req.BlobURIs {
id := uuid.New().String()
// TODO: Probably set the CSV extension in the signed upload URL instead.
destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id))
fn := fmt.Sprintf("%s.csv", id)
destPath := filepath.Join(inputDir, fn)
if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil {
return fmt.Errorf("failed to download raw portfolio blob: %w", err)
}
localCSVToBlob[fn] = srcURI
}

processedDir := filepath.Join("/", "mnt", "processed_portfolios")
if err := os.MkdirAll(processedDir, 0600); err != nil {
return fmt.Errorf("failed to create directory to download blob to: %w", err)
}
cmd := exec.CommandContext(ctx,
"/usr/local/bin/Rscript",
"-e", "logger::log_threshold(Sys.getenv('LOG_LEVEL', 'INFO'));workflow.portfolio.parsing::process_directory('"+inputDir+"', '"+outputDir+"')",
)

// We don't expect log output to be particularly large, it's fine to write them to an in-memory buffer.
// TODO(#185): Find a good place to put these in storage, such that it can be correlated with the input file(s)
var stdout, stderr bytes.Buffer
cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/process_portfolios.R")
cmd.Stdout = io.MultiWriter(os.Stdout, &stdout)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderr)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to run process_portfolios script: %w", err)
}

sc := bufio.NewScanner(&stderr)
// After successful execution, the API contract is that there should be a 'processed_portfolios.json' file in the output directory.
outManifestPath := filepath.Join(outputDir, "processed_portfolios.json")
omf, err := os.Open(outManifestPath)
if err != nil {
return fmt.Errorf("failed to open output processed_portfolios.json file: %w", err)
}
defer omf.Close()

// TODO: Load from the output database file (or similar, like reading the processed_portfolios dir) instead of parsing stderr
var paths []string
for sc.Scan() {
line := sc.Text()
idx := strings.Index(line, "writing to file: " /* 17 chars */)
if idx == -1 {
continue
}
paths = append(paths, strings.TrimSpace(line[idx+17:]))
var sourceFiles []parsed.SourceFile
if err := json.NewDecoder(omf).Decode(&sourceFiles); err != nil {
return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this step fails, we probably want a more complete accounting of why. Would logging the full manifest as a string if this fails be inappropriate? Maybe upload it to a cloud bucket?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great call. The R code actually already does log the output processed_porfolios.json file to stdout (or stderr), so this will be covered by #185

}

// We keep track of the outputs we processed, and then check if there are any files in the output directory that we weren't expecting.
knownOutputFiles := map[string]bool{
"processed_portfolios.json": true,
}

// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
var out []*task.ParsePortfolioResponseItem
for _, p := range paths {
lineCount, err := countCSVLines(p)
if err != nil {
return fmt.Errorf("failed to count lines in file %q: %w", p, err)
for _, sf := range sourceFiles {
sourceURI, ok := localCSVToBlob[sf.InputFilename]
if !ok {
return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob)
}
fileName := filepath.Base(p)
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, fileName))
if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)

// TODO(#187): There's lots of metadata associated with the input files (e.g.
// sf.Errors, sf.GroupCols, etc), we should likely store that info somewhere.

for _, p := range sf.Portfolios {
outPath := filepath.Join(outputDir, p.OutputFilename)

// We generate a fresh UUID here for uploading the file to blob storage, so that
// we don't depend on the R code generating truly unique UUIDs.
uploadName := fmt.Sprintf("%s.csv", uuid.New().String())

blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, uploadName))

if err := h.uploadBlob(ctx, outPath, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
h.logger.Info("uploaded output CSV to blob storage", zap.Any("portfolio", p), zapfield.Str("blob_uri", blobURI))

extension := filepath.Ext(p.OutputFilename)
fileType, err := pacta.ParseFileType(extension)
bcspragu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err)
}
if fileType != pacta.FileType_CSV {
return fmt.Errorf("output portfolio %q was not of type CSV, was %q", p.OutputFilename, fileType)
}

knownOutputFiles[p.OutputFilename] = true
out = append(out, &task.ParsePortfolioResponseItem{
Source: sourceURI,
Blob: pacta.Blob{
FileName: p.OutputFilename,
FileType: fileType,
BlobURI: blobURI,
},
Portfolio: p,
})
}
extension := filepath.Ext(fileName)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err)
}

// Now that we're done uploading files, check the output directory and make sure
// there aren't any unaccounted for files.
dirEntries, err := os.ReadDir(outputDir)
if err != nil {
return fmt.Errorf("failed to read the output directory: %w", err)
}
for _, de := range dirEntries {
if !knownOutputFiles[de.Name()] {
h.logger.Error("output directory contained files not present in the generated 'processed_portfolios.json' manifest", zap.String("filename", de.Name()))
}
out = append(out, &task.ParsePortfolioResponseItem{
Blob: pacta.Blob{
FileName: fileName,
FileType: fileType,
BlobURI: blobURI,
},
LineCount: lineCount,
})
}

events := []publisher.Event{
Expand All @@ -159,25 +217,6 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.
return nil
}

// TODO(grady): Move this line counting into the image to prevent having our code do any read of the actual underlying data.
func countCSVLines(path string) (int, error) {
file, err := os.Open(path)
if err != nil {
return 0, fmt.Errorf("opening file failed: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("scanner.error returned: %w", err)
}
// Subtract 1 for the header row
return lineCount - 1, nil
}

func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error {
return errors.New("not implemented")
}
Expand Down Expand Up @@ -261,7 +300,7 @@ func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.Cr

func (h *Handler) downloadBlob(ctx context.Context, srcURI, destPath string) error {
// Make sure the destination exists
if err := os.MkdirAll(filepath.Dir(destPath), 0600); err != nil {
if err := os.MkdirAll(filepath.Dir(destPath), 0700); err != nil {
return fmt.Errorf("failed to create directory to download blob to: %w", err)
}

Expand Down
8 changes: 8 additions & 0 deletions async/parsed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "parsed",
srcs = ["parsed.go"],
importpath = "github.com/RMI/pacta/async/parsed",
visibility = ["//visibility:public"],
)
38 changes: 38 additions & 0 deletions async/parsed/parsed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Package parsed just holds the domain types for dealing with the output of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps link to the repo that this relies upon? It's not obvious from this comment that this describes the contours of an external dependency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GC, done

// ParsePortfolio async task. The code that generates output in this structure
// lives in [1], which provides the base image for our parser binary.
//
// [1] https://github.com/RMI-PACTA/workflow.portfolio.parsing
package parsed

type SourceFile struct {
InputFilename string `json:"input_filename"`
InputMD5 string `json:"input_md5"`
SystemInfo SystemInfo `json:"system_info"`
InputEntries int `json:"input_entries"`
GroupCols []string `json:"group_cols"`
SubportfoliosCount int `json:"subportfolios_count"`
Portfolios []Portfolio `json:"portfolios"`
Errors [][]string `json:"errors"`
}

type SystemInfo struct {
Timestamp string `json:"timestamp"`
Package string `json:"package"`
PackageVersion string `json:"packageVersion"`
RVersion string `json:"RVersion"`
Dependencies []Dependency `json:"dependencies"`
}

type Dependency struct {
Package string `json:"package"`
Version string `json:"version"`
}

type Portfolio struct {
OutputMD5 string `json:"output_md5"`
OutputFilename string `json:"output_filename"`
OutputRows int `json:"output_rows"`
PortfolioName string `json:"portfolio_name"`
InvestorName string `json:"investor_name"`
}
5 changes: 4 additions & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,13 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo
if err != nil {
return fmt.Errorf("creating blob %d: %w", i, err)
}

// TODO(#187): There's other metadata in output.Portfolio, like `InvestorName`, that
// we aren't currently storing.
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
NumberOfRows: output.Portfolio.OutputRows,
Blob: &pacta.Blob{ID: blobID},
Properties: *properties,
})
Expand Down
13 changes: 0 additions & 13 deletions cmd/runner/taskrunner/BUILD.bazel

This file was deleted.

Loading
Loading