Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): configurable log level for driver / launcher images #11278

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion backend/src/v2/cmd/driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,21 @@ var (
// the value stored in the paths will be either 'true' or 'false'
cachedDecisionPath = flag.String("cached_decision_path", "", "Cached Decision output path")
conditionPath = flag.String("condition_path", "", "Condition output path")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

// func RootDAG(pipelineName string, runID string, component *pipelinespec.ComponentSpec, task *pipelinespec.PipelineTaskSpec, mlmd *metadata.Client) (*Execution, error) {

func main() {
flag.Parse()
err := drive()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

err = drive()
if err != nil {
glog.Exitf("%v", err)
}
Expand Down
7 changes: 7 additions & 0 deletions backend/src/v2/cmd/launcher-v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.")
mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.")
mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.")
logLevel = flag.String("log_level", "1", "The verbosity level to log.")
)

func main() {
Expand All @@ -54,6 +55,12 @@ func run() error {
flag.Parse()
ctx := context.Background()

glog.Infof("Setting log level to: '%s'", *logLevel)
err := flag.Set("v", *logLevel)
if err != nil {
glog.Warningf("Failed to set log level: %s", err.Error())
}

if *copy != "" {
// copy is used to copy this binary to a shared volume
// this is a special command, ignore all other flags by returning
Expand Down
32 changes: 32 additions & 0 deletions backend/src/v2/compiler/argocompiler/argo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"io/ioutil"
"os"
"strings"
"testing"

Expand All @@ -36,6 +37,7 @@ func Test_argo_compiler(t *testing.T) {
jobPath string // path of input PipelineJob to compile
platformSpecPath string // path of possible input PlatformSpec to compile
argoYAMLPath string // path of expected output argo workflow YAML
envVars []string
}{
{
jobPath: "../testdata/hello_world.json",
Expand All @@ -57,10 +59,40 @@ func Test_argo_compiler(t *testing.T) {
platformSpecPath: "../testdata/create_pod_metadata.json",
argoYAMLPath: "testdata/create_pod_metadata.yaml",
},
// With envOptions
{
jobPath: "../testdata/hello_world.json",
platformSpecPath: "",
argoYAMLPath: "testdata/with_logging/hello_world.yaml",
envVars: []string{"DRIVER_LOG_LEVEL=5", "LAUNCHER_LOG_LEVEL=5"},
},
{
jobPath: "../testdata/importer.json",
platformSpecPath: "",
argoYAMLPath: "testdata/with_logging/importer.yaml",
envVars: []string{"DRIVER_LOG_LEVEL=5", "LAUNCHER_LOG_LEVEL=5"},
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%+v", tt), func(t *testing.T) {
job, platformSpec := load(t, tt.jobPath, tt.platformSpecPath)
if tt.envVars != nil {
for _, envVar := range tt.envVars {
parts := strings.Split(strings.ReplaceAll(envVar, " ", ""), "=")
err := os.Setenv(parts[0], parts[1])
if err != nil {
t.Fatalf("Failed to set environment variable '%s' with error: %s", parts[0], err.Error())
}

// Unset after test cases has ended
defer func() {
err := os.Unsetenv(parts[0])
if err != nil {
t.Fatalf("Failed to unset env variable '%s' with error: %s", parts[0], err.Error())
}
}()
}
}
if *update {
wf, err := argocompiler.Compile(job, platformSpec, nil)
if err != nil {
Expand Down
52 changes: 34 additions & 18 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
LauncherImageEnvVar = "V2_LAUNCHER_IMAGE"
DefaultDriverImage = "gcr.io/ml-pipeline/kfp-driver@sha256:dc8b56a2eb071f30409828a8884d621092e68385af11a6c06aa9e9fbcfbb19de"
DriverImageEnvVar = "V2_DRIVER_IMAGE"
DriverLogLevelEnvVar = "DRIVER_LOG_LEVEL"
LauncherLogLevelEnvVar = "LAUNCHER_LOG_LEVEL"
gcsScratchLocation = "/gcs"
gcsScratchName = "gcs-scratch"
s3ScratchLocation = "/s3"
Expand Down Expand Up @@ -130,6 +132,25 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
}
if value, ok := os.LookupEnv(DriverLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -150,22 +171,9 @@ func (c *workflowCompiler) addContainerDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: GetDriverImage(),
Command: []string{"driver"},
Args: []string{
"--type", "CONTAINER",
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--container", inputValue(paramContainer),
"--iteration_index", inputValue(paramIterationIndex),
"--cached_decision_path", outputPath(paramCachedDecision),
"--pod_spec_patch_path", outputPath(paramPodSpecPatch),
"--condition_path", outputPath(paramCondition),
"--kubernetes_config", inputValue(paramKubernetesConfig),
},
Image: c.driverImage,
Command: []string{"driver"},
Args: args,
Resources: driverResources,
},
}
Expand Down Expand Up @@ -245,6 +253,13 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
c.templates[nameContainerExecutor] = container

args := []string{
"--copy", component.KFPLauncherPath,
}
if value, ok := os.LookupEnv(LauncherLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
executor := &wfapi.Template{
Name: nameContainerImpl,
Inputs: wfapi.Inputs{
Expand Down Expand Up @@ -303,8 +318,9 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
InitContainers: []wfapi.UserContainer{{
Container: k8score.Container{
Name: "kfp-launcher",
Image: GetLauncherImage(),
Command: []string{"launcher-v2", "--copy", component.KFPLauncherPath},
Image: c.launcherImage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this change besides optimization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No not really, just made it a bit concise to add flags and follows the pattern used in the other Container definitions for driver / launcher

Command: []string{"launcher-v2"},
Args: args,
VolumeMounts: []k8score.VolumeMount{
{
Name: volumeNameKFPLauncher,
Expand Down
37 changes: 22 additions & 15 deletions backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package argocompiler

import (
"fmt"
"os"
"sort"
"strings"

Expand Down Expand Up @@ -406,6 +407,24 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
if ok {
return name
}

args := []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
}
if value, ok := os.LookupEnv(DriverLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}

t := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -426,21 +445,9 @@ func (c *workflowCompiler) addDAGDriverTemplate() string {
},
},
Container: &k8score.Container{
Image: c.driverImage,
Command: []string{"driver"},
Args: []string{
"--type", inputValue(paramDriverType),
"--pipeline_name", c.spec.GetPipelineInfo().GetName(),
"--run_id", runID(),
"--dag_execution_id", inputValue(paramParentDagID),
"--component", inputValue(paramComponent),
"--task", inputValue(paramTask),
"--runtime_config", inputValue(paramRuntimeConfig),
"--iteration_index", inputValue(paramIterationIndex),
"--execution_id_path", outputPath(paramExecutionID),
"--iteration_count_path", outputPath(paramIterationCount),
"--condition_path", outputPath(paramCondition),
},
Image: c.driverImage,
Command: []string{"driver"},
Args: args,
Resources: driverResources,
},
}
Expand Down
8 changes: 6 additions & 2 deletions backend/src/v2/compiler/argocompiler/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package argocompiler

import (
"fmt"
"os"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
launcherArgs := []string{
args := []string{
"--executor_type", "importer",
"--task_spec", inputValue(paramTask),
"--component_spec", inputValue(paramComponent),
Expand All @@ -81,6 +82,9 @@ func (c *workflowCompiler) addImporterTemplate() string {
"--mlmd_server_port",
fmt.Sprintf("$(%s)", component.EnvMetadataPort),
}
if value, ok := os.LookupEnv(LauncherLogLevelEnvVar); ok {
args = append(args, "--log_level", value)
}
importerTemplate := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -94,7 +98,7 @@ func (c *workflowCompiler) addImporterTemplate() string {
Container: &k8score.Container{
Image: c.launcherImage,
Command: []string{"launcher-v2"},
Args: launcherArgs,
Args: args,
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
Env: commonEnvs,
Resources: driverResources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
command:
- launcher-v2
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
command:
- launcher-v2
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ spec:
- mountPath: /.config
name: dot-config-scratch
initContainers:
- command:
- launcher-v2
- args:
- --copy
- /kfp-launcher/launch
command:
- launcher-v2
image: gcr.io/ml-pipeline/kfp-launcher
name: kfp-launcher
resources:
Expand Down
Loading
Loading