-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Finish integrating new parser into pipeline #186
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Psyched to see the complete coupling come into focus!
async/async.go
Outdated
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err) | ||
for _, sf := range sourceFiles { | ||
|
||
// TODO: There's lots of metadata associated with the input files (e.g. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a TODO to track this? I feel like capturing this data and storing it seems like it is more likely than not to save us time and energy down the road if/when something goes wrong. Storing it as a blob (using our existing infrastructure) and then either attaching that blob to the incomplete_upload
or creating a new table for it is probably the right call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see #187
async/async.go
Outdated
for _, p := range sf.Portfolios { | ||
outPath := filepath.Join(outputDir, p.OutputFilename) | ||
|
||
// XXX: One risk here is that we're depending on the R code to generate truly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why rely on that at all? The info we actually need for semantics is in the portfolio metadata. Let's create UUIDs or similar here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was mostly just because there's already a bunch of UUIDs flying around and being mapped to things and I didn't want to introduce more cognitive load with another opaque UUID -> UUID mapping, but agreed that we shouldn't rely on this behavior since we don't have to.
Done, and also simpler than I thought, we don't need to maintain the mapping as we just store the blob URI as is.
async/async.go
Outdated
return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err) | ||
} | ||
|
||
sourceURI, ok := localCSVToBlob[sf.InputFilename] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: consider adding another check that no additional output files are present that aren't mentioned in the manifest. That kind of defensive programming is probably warranted over a trusted but blackbox interface like this one.
(after the loop completes, obviously. A map/set should be sufficient)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this and then got lazy, this is done now.
paths = append(paths, strings.TrimSpace(line[idx+17:])) | ||
var sourceFiles []parsed.SourceFile | ||
if err := json.NewDecoder(omf).Decode(&sourceFiles); err != nil { | ||
return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this step fails, we probably want a more complete accounting of why. Would logging the full manifest as a string if this fails be inappropriate? Maybe upload it to a cloud bucket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great call. The R code actually already does log the output processed_porfolios.json
file to stdout (or stderr), so this will be covered by #185
@@ -0,0 +1,35 @@ | |||
// Package parsed just holds the domain types for dealing with the output of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps link to the repo that this relies upon? It's not obvious from this comment that this describes the contours of an external dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GC, done
azure/azevents/azevents.go
Outdated
@@ -327,10 +327,13 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo | |||
if err != nil { | |||
return fmt.Errorf("creating blob %d: %w", i, err) | |||
} | |||
|
|||
// TODO: There's other metadata in output.Portfolio, like `InvestorName`, that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a bug for this? I don't want to lose track of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
83b748a
to
799a333
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good calls across the board. You've once again caught all the small validations and other things I cheaped out on, and the code is better for it!
paths = append(paths, strings.TrimSpace(line[idx+17:])) | ||
var sourceFiles []parsed.SourceFile | ||
if err := json.NewDecoder(omf).Decode(&sourceFiles); err != nil { | ||
return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great call. The R code actually already does log the output processed_porfolios.json
file to stdout (or stderr), so this will be covered by #185
async/async.go
Outdated
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err) | ||
for _, sf := range sourceFiles { | ||
|
||
// TODO: There's lots of metadata associated with the input files (e.g. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, see #187
async/async.go
Outdated
for _, p := range sf.Portfolios { | ||
outPath := filepath.Join(outputDir, p.OutputFilename) | ||
|
||
// XXX: One risk here is that we're depending on the R code to generate truly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was mostly just because there's already a bunch of UUIDs flying around and being mapped to things and I didn't want to introduce more cognitive load with another opaque UUID -> UUID mapping, but agreed that we shouldn't rely on this behavior since we don't have to.
Done, and also simpler than I thought, we don't need to maintain the mapping as we just store the blob URI as is.
async/async.go
Outdated
return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err) | ||
} | ||
|
||
sourceURI, ok := localCSVToBlob[sf.InputFilename] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this and then got lazy, this is done now.
@@ -0,0 +1,35 @@ | |||
// Package parsed just holds the domain types for dealing with the output of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GC, done
azure/azevents/azevents.go
Outdated
@@ -327,10 +327,13 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo | |||
if err != nil { | |||
return fmt.Errorf("creating blob %d: %w", i, err) | |||
} | |||
|
|||
// TODO: There's other metadata in output.Portfolio, like `InvestorName`, that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ffe410c
to
d894cbc
Compare
799a333
to
9fafd18
Compare
d894cbc
to
4410253
Compare
9fafd18
to
5cac6c8
Compare
This PR finishes integrating the new parser logic from [1] into our pipeline. It parses the `processed_portfolios.json` file from the output directory (in this case `/home/portfolio-parser/output`) and uses that to both correlate input + output files as well as upload the output CSV files. Since the R code now includes a row count, we no longer need to parse the files manually. This all mostly works as expected. A few sharp edges (relying on UUIDs from the R code) are noted in the PR, and there's metadata produced by the new code (both at the input file level and the output file level) that we aren't currently recording anywhere. Adjacent changes: - In creating the `parser`, I also duplicated the `taskrunner` package. That has been hoisted to the top level and de-duped - Assorted refactorings and renamings to make sure the `pactaparser` image gets invoked correctly [1] https://github.com/RMI-PACTA/workflow.portfolio.parsing
5cac6c8
to
9ed862c
Compare
This PR finishes integrating the new parser logic from [1] into our pipeline.
It parses the
processed_portfolios.json
file from the output directory (in this case/home/portfolio-parser/output
) and uses that to both correlate input + output files as well as upload the output CSV files. Since the R code now includes a row count, we no longer need to parse the files manually.This all mostly works as expected. A few sharp edges (relying on UUIDs from the R code) are noted in the PR, and there's metadata produced by the new code (both at the input file level and the output file level) that we aren't currently recording anywhere.
Adjacent changes:
parser
, I also duplicated thetaskrunner
package. That has been hoisted to the top level and de-dupedpactaparser
image gets invoked correctly[1] https://github.com/RMI-PACTA/workflow.portfolio.parsing