Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] dsl.importer can't use with dsl.ParallelFor #10860

Open
hexiaoliangRick opened this issue Jun 2, 2024 · 4 comments
Open

[backend] dsl.importer can't use with dsl.ParallelFor #10860

hexiaoliangRick opened this issue Jun 2, 2024 · 4 comments

Comments

@hexiaoliangRick
Copy link

Environment

  • How did you deploy Kubeflow Pipelines (KFP)?
    stand deploy with only kubeflow pipelines
  • KFP version:
  • KFP SDK version:

Steps to reproduce

1、in my pipeline,i try to download a list of artifact first , and then use the list of artifacts in a container component. so i use dsl.importer with with dsl.ParallelFor.

from typing import List

from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, Artifact, Output


#
#
# @dsl.container_component
# def mosaic_satellite_image_gdal(raster_files: str, out_raster_file: Output[Artifact]):
#     container = dsl.ContainerSpec(
#         image='harbor.host.com/bdh/remote-sensing-data-preprocessing:v1.10',
#         command=['python', 'main.py', 'MOSAIC'],
#         args=[raster_files]
#     )
#     return container


@dsl.component()
def get_artifact_local_path(local_raster_artifacts: List[Artifact]) -> str:
    values = []
    for artifact in local_raster_artifacts:
        values.append(artifact.path)
    return ".".join(values)


@dsl.component()
def fake_op(s: str):
    print(s)



@dsl.pipeline
def raster_mosaic_pipeline() -> str:
    rasters = ["minio://wh-gis-dev/thenorth_files/2011/2024/4/1794971570411155457/lhztestShape.geojson",
               "minio://wh-gis-dev/remote-sense-image/S2MSI2A/2024/5/15/S2A_MSIL2A_20240515T024551_N0510_R132_T51UXQ_20240515T055751.SAFE.zip"]
    with dsl.ParallelFor(
            items=rasters, parallelism=10
    ) as raster_file:
        importer_file_task = dsl.importer(artifact_uri=raster_file, artifact_class=dsl.Artifact, reimport=False)
    artifacts = dsl.Collected(importer_file_task.output)
    get_op_task = get_artifact_local_path(local_raster_artifacts=artifacts)
    return get_op_task.output



compiler.Compiler().compile(raster_mosaic_pipeline, 'RasterMosaic.yaml')

  • compile the code to a yaml.
# PIPELINE DEFINITION
# Name: raster-mosaic-pipeline
# Outputs:
#    Output: str
components:
  comp-for-loop-2:
    dag:
      outputs:
        artifacts:
          pipelinechannel--importer-artifact:
            artifactSelectors:
            - outputArtifactKey: artifact
              producerSubtask: importer
      tasks:
        importer:
          cachingOptions:
            enableCache: true
          componentRef:
            name: comp-importer
          inputs:
            parameters:
              uri:
                componentInputParameter: pipelinechannel--loop-item-param-1
          taskInfo:
            name: importer
    inputDefinitions:
      parameters:
        pipelinechannel--loop-item-param-1:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        pipelinechannel--importer-artifact:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
          isArtifactList: true
  comp-get-artifact-local-path:
    executorLabel: exec-get-artifact-local-path
    inputDefinitions:
      artifacts:
        local_raster_artifacts:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
          isArtifactList: true
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRING
  comp-importer:
    executorLabel: exec-importer
    inputDefinitions:
      parameters:
        uri:
          parameterType: STRING
    outputDefinitions:
      artifacts:
        artifact:
          artifactType:
            schemaTitle: system.Artifact
            schemaVersion: 0.0.1
deploymentSpec:
  executors:
    exec-get-artifact-local-path:
      container:
        args:
        - --executor_input
        - '{{$}}'
        - --function_to_execute
        - get_artifact_local_path
        command:
        - sh
        - -c
        - "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip ||\
          \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
          \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
          \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
          $0\" \"$@\"\n"
        - sh
        - -ec
        - 'program_path=$(mktemp -d)


          printf "%s" "$0" > "$program_path/ephemeral_component.py"

          _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"

          '
        - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
          \ *\n\ndef get_artifact_local_path(local_raster_artifacts: List[Artifact])\
          \ -> str:\n    values = []\n    for artifact in local_raster_artifacts:\n\
          \        values.append(artifact.path)\n    return \".\".join(values)\n\n"
        image: python:3.7
    exec-importer:
      importer:
        artifactUri:
          runtimeParameter: uri
        typeSchema:
          schemaTitle: system.Artifact
          schemaVersion: 0.0.1
pipelineInfo:
  name: raster-mosaic-pipeline
root:
  dag:
    outputs:
      parameters:
        Output:
          valueFromParameter:
            outputParameterKey: Output
            producerSubtask: get-artifact-local-path
    tasks:
      for-loop-2:
        componentRef:
          name: comp-for-loop-2
        iteratorPolicy:
          parallelismLimit: 10
        parameterIterator:
          itemInput: pipelinechannel--loop-item-param-1
          items:
            raw: '["minio://wh-gis-dev/thenorth_files/2011/2024/4/1794971570411155457/lhztestShape.geojson",
              "minio://wh-gis-dev/remote-sense-image/S2MSI2A/2024/5/15/S2A_MSIL2A_20240515T024551_N0510_R132_T51UXQ_20240515T055751.SAFE.zip"]'
        taskInfo:
          name: for-loop-2
      get-artifact-local-path:
        cachingOptions:
          enableCache: true
        componentRef:
          name: comp-get-artifact-local-path
        dependentTasks:
        - for-loop-2
        inputs:
          artifacts:
            local_raster_artifacts:
              taskOutputArtifact:
                outputArtifactKey: pipelinechannel--importer-artifact
                producerTask: for-loop-2
        taskInfo:
          name: get-artifact-local-path
  outputDefinitions:
    parameters:
      Output:
        parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
  • test one the KFP:upload the yaml to create a pipeline.and create a run.
    image

Expected result

run pipeline successful.

Materials and Reference


Impacted by this bug? Give it a 👍.

Copy link

github-actions bot commented Aug 2, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 2, 2024
@gregsheremeta
Copy link
Contributor

@gmfrasca looks just like the error I found in #10798 (comment)

Hmm 🤔

@stale stale bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Aug 4, 2024
Copy link

github-actions bot commented Oct 4, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Oct 4, 2024
@HumairAK
Copy link
Collaborator

HumairAK commented Oct 4, 2024

/remove-lifecycle stale

@google-oss-prow google-oss-prow bot removed the lifecycle/stale The issue / pull request is stale, any activities remove this label. label Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants