You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
As reported in leap-stc/data-management#33 (comment) (and discussed further in that thread), under certain circumstances on Dataflow, the values of the PCollection items passed from GroupByKey -> MapTuple(combine_fragments) here
|beam.GroupByKey() # this has major performance implication
|beam.MapTuple(combine_fragments)
are not properly decoded into the correct Python type when they arrive in the body of combine_fragments. Specifically, the fragments argument is type-hinted as a List in the signature of combine_fragments, but it sometimes arrives as a _ConcatSequence. As noted in the PR linked above, it's unclear to me what circumstance causes this.
Discussion with @alxmrs elucidated the category of error as being related to Beam's coders, which generally are inferred from type hints, and handle serialization/deserialization of PCollection values for handoff between pipeline stages in distributed contexts (e.g., Dataflow), as described in this doc.
The commit history of #548 records various attempts I made to get the fragments coder to be correctly inferred (by trying a variety of different type-hinting strategies). Ultimately, I could not get type-hinting in any form (.with_input_types(...), PCollection[...], etc.) to fix this issue on Dataflow.
In its current form, #548 does solve this issue by simply explicitly casting fragments to list if it's found to be improperly decoded. This unblocks leap-stc/data-management#33, but doesn't seem like a great solution, insofar as it pushes deserialization concerns into the body of combine_fragments, which is the wrong place for that.
Edit: As reported in #553 (comment), the below is not a fix for this issue.
Reading between the lines of apache/beam#9344 (comment), I am left wondering if an alternative to #548 might be to return from split_fragment's iteration, rather than yielding...
It seems plausible that in the distributed context, yielding introduces some statefulness which prevents Dataflow from parsing to list. Having written this, it does seem a bit unlikely that this would work, given that the PCollection values arriving in combine_fragments are emitted from GroupByKey (not passed directly from split_fragment), but this is a cheap enough theory to test, so I will do that quickly before merging #548 as the patch.
Regardless of which approach works as a patch here, it would be great to distill what we're learning here into an upstream issue on Beam. A final note for now is that I did try to see if I could catch any typing issues related to his problem in local testing via #550, but while that PR did surface a few typing nits, none of them appeared relevant to the present issue.
The text was updated successfully, but these errors were encountered:
As reported in leap-stc/data-management#33 (comment) (and discussed further in that thread), under certain circumstances on Dataflow, the values of the PCollection items passed from
GroupByKey
->MapTuple(combine_fragments)
herepangeo-forge-recipes/pangeo_forge_recipes/transforms.py
Lines 322 to 323 in f0c7dac
are not properly decoded into the correct Python type when they arrive in the body of
combine_fragments
. Specifically, thefragments
argument is type-hinted as aList
in the signature ofcombine_fragments
, but it sometimes arrives as a_ConcatSequence
. As noted in the PR linked above, it's unclear to me what circumstance causes this.Discussion with @alxmrs elucidated the category of error as being related to Beam's coders, which generally are inferred from type hints, and handle serialization/deserialization of PCollection values for handoff between pipeline stages in distributed contexts (e.g., Dataflow), as described in this doc.
The commit history of #548 records various attempts I made to get the
fragments
coder to be correctly inferred (by trying a variety of different type-hinting strategies). Ultimately, I could not get type-hinting in any form (.with_input_types(...)
,PCollection[...]
, etc.) to fix this issue on Dataflow.In its current form, #548 does solve this issue by simply explicitly casting
fragments
tolist
if it's found to be improperly decoded. This unblocks leap-stc/data-management#33, but doesn't seem like a great solution, insofar as it pushes deserialization concerns into the body ofcombine_fragments
, which is the wrong place for that.Edit: As reported in #553 (comment), the below is not a fix for this issue.
Reading between the lines of apache/beam#9344 (comment), I am left wondering if an alternative to #548 might be toreturn
fromsplit_fragment
's iteration, rather thanyield
ing...It seems plausible that in the distributed context,yield
ing introduces some statefulness which prevents Dataflow from parsing tolist
. Having written this, it does seem a bit unlikely that this would work, given that the PCollection values arriving incombine_fragments
are emitted fromGroupByKey
(not passed directly fromsplit_fragment
), but this is a cheap enough theory to test, so I will do that quickly before merging #548 as the patch.Regardless of which approach works as a patch here, it would be great to distill what we're learning here into an upstream issue on Beam. A final note for now is that I did try to see if I could catch any typing issues related to his problem in local testing via #550, but while that PR did surface a few typing nits, none of them appeared relevant to the present issue.The text was updated successfully, but these errors were encountered: