From 8a7d5fd1ec566beb7b9396acefae8985ca19373c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 9 Dec 2024 22:00:24 +0530 Subject: [PATCH 01/13] Warn user to use ${resources.schemas...} syntax --- bundle/config/validate/schema_references.go | 130 ++++++++++ .../config/validate/schema_references_test.go | 228 ++++++++++++++++++ bundle/config/validate/validate.go | 1 + 3 files changed, 359 insertions(+) create mode 100644 bundle/config/validate/schema_references.go create mode 100644 bundle/config/validate/schema_references_test.go diff --git a/bundle/config/validate/schema_references.go b/bundle/config/validate/schema_references.go new file mode 100644 index 0000000000..063ea0be67 --- /dev/null +++ b/bundle/config/validate/schema_references.go @@ -0,0 +1,130 @@ +package validate + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" +) + +// Validate that any references to UC schemas defined in the DAB use the ${resources.schemas...} +// syntax to capture the deploy time dependency. +func SchemaReferences() bundle.ReadOnlyMutator { + return &schemaReferences{} +} + +type schemaReferences struct{} + +func (v *schemaReferences) Name() string { + return "validate:schema_dependency" +} + +func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string) ([]dyn.Location, dyn.Path, bool) { + for k, s := range rb.Config().Resources.Schemas { + if s.CatalogName != catalogName || s.Name != schemaName { + continue + } + return rb.Config().GetLocations("resources.schemas." + k), dyn.NewPath(dyn.Key("resources"), dyn.Key("schemas"), dyn.Key(k)), true + } + return nil, nil, false +} + +func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + for k, p := range rb.Config().Resources.Pipelines { + // Skip if the pipeline uses hive metastore. + if p.Catalog == "" { + continue + } + + schemaName := "" + fieldPath := dyn.Path{} + schemaLocation := []dyn.Location{} + switch { + case p.Schema == "" && p.Target == "": + diags = append(diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: "Unity Catalog pipeline should have a schema or target defined", + Detail: `The target or schema field is required for UC pipelines. Reason: DLT +requires specifying a target schema for UC pipelines. Please use the +TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING +TABLE statement if you do not wish to publish your dataset.`, + Locations: rb.Config().GetLocations("resources.pipelines." + k), + Paths: []dyn.Path{ + dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")), + dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")), + }, + }) + continue + case p.Schema != "" && p.Target != "": + locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema") + locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...) + + diags = append(diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.", + Locations: locations, + Paths: []dyn.Path{ + dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")), + dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")), + }, + }) + continue + case p.Schema != "": + schemaName = p.Schema + fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")) + schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".schema") + case p.Target != "": + schemaName = p.Target + fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")) + schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".target") + } + + // Check if the schema is defined in the bundle. + matchLocations, matchPath, found := findSchemaInBundle(rb, p.Catalog, schemaName) + if !found { + continue + } + + diags = append(diags, diag.Diagnostic{ + Severity: diag.Warning, + Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, schemaName), + Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this DLT pipeline +has on the schema %q and deploy changes to the schema before deploying the pipeline.`, matchPath, schemaName), + Locations: append(schemaLocation, matchLocations...), + Paths: []dyn.Path{ + fieldPath, + matchPath, + }, + }) + } + + for k, v := range rb.Config().Resources.Volumes { + if v.CatalogName == "" || v.SchemaName == "" { + continue + } + + matchLocations, matchPath, found := findSchemaInBundle(rb, v.CatalogName, v.SchemaName) + if !found { + continue + } + + fieldLocations := rb.Config().GetLocations("resources.volumes." + k + ".schema_name") + + diags = append(diags, diag.Diagnostic{ + Severity: diag.Warning, + Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, v.SchemaName), + Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this Volume +has on the schema %q and deploy changes to the schema before deploying the Volume.`, matchPath, v.SchemaName), + Locations: append(matchLocations, fieldLocations...), + Paths: []dyn.Path{ + dyn.NewPath(dyn.Key("resources"), dyn.Key("volumes"), dyn.Key(k), dyn.Key("schema")), + matchPath, + }, + }) + } + + return diags +} diff --git a/bundle/config/validate/schema_references_test.go b/bundle/config/validate/schema_references_test.go new file mode 100644 index 0000000000..0d7510a8fd --- /dev/null +++ b/bundle/config/validate/schema_references_test.go @@ -0,0 +1,228 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func TestValidateSchemaReferencesForPipelines(t *testing.T) { + pipelineTargetL := dyn.Location{File: "file1", Line: 1, Column: 1} + pipelineSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2} + pipelineL := dyn.Location{File: "file3", Line: 3, Column: 3} + schemaL := dyn.Location{File: "file4", Line: 4, Column: 4} + + for _, tc := range []struct { + schemaV string + targetV string + catalogV string + want diag.Diagnostics + }{ + { + schemaV: "", + targetV: "", + catalogV: "", + want: diag.Diagnostics{}, + }, + { + schemaV: "", + targetV: "", + catalogV: "main", + want: diag.Diagnostics{{ + Summary: "Unity Catalog pipeline should have a schema or target defined", + Severity: diag.Error, + Detail: `The target or schema field is required for UC pipelines. Reason: DLT +requires specifying a target schema for UC pipelines. Please use the +TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING +TABLE statement if you do not wish to publish your dataset.`, + Locations: []dyn.Location{pipelineL}, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.pipelines.p1.schema"), + dyn.MustPathFromString("resources.pipelines.p1.target"), + }, + }}, + }, + { + schemaV: "both", + targetV: "both", + catalogV: "main", + want: diag.Diagnostics{{ + Severity: diag.Error, + Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.", + Locations: []dyn.Location{pipelineSchemaL, pipelineTargetL}, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.pipelines.p1.schema"), + dyn.MustPathFromString("resources.pipelines.p1.target"), + }, + }}, + }, + { + schemaV: "schema1", + targetV: "", + catalogV: "other", + want: diag.Diagnostics{}, + }, + { + schemaV: "schema1", + targetV: "", + catalogV: "main", + want: diag.Diagnostics{{ + Severity: diag.Warning, + Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, + Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline +has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`, + Locations: []dyn.Location{pipelineSchemaL, schemaL}, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.pipelines.p1.schema"), + dyn.MustPathFromString("resources.schemas.s1"), + }, + }}, + }, + { + schemaV: "", + targetV: "schema1", + catalogV: "main", + want: diag.Diagnostics{{ + Severity: diag.Warning, + Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, + Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline +has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`, + Locations: []dyn.Location{pipelineTargetL, schemaL}, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.pipelines.p1.target"), + dyn.MustPathFromString("resources.schemas.s1"), + }, + }}, + }, + { + schemaV: "${resources.schemas.s1.name}", + targetV: "", + catalogV: "main", + want: diag.Diagnostics{}, + }, + { + schemaV: "", + targetV: "${resources.schemas.s1.name}", + catalogV: "main", + want: diag.Diagnostics{}, + }, + } { + + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Schemas: map[string]*resources.Schema{ + "s1": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "main", + Name: "schema1", + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "p1": { + PipelineSpec: &pipelines.PipelineSpec{ + Name: "abc", + Schema: tc.schemaV, + Target: tc.targetV, + Catalog: tc.catalogV, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.schemas.s1", []dyn.Location{schemaL}) + bundletest.SetLocation(b, "resources.pipelines.p1", []dyn.Location{pipelineL}) + if tc.schemaV != "" { + bundletest.SetLocation(b, "resources.pipelines.p1.schema", []dyn.Location{pipelineSchemaL}) + } + if tc.targetV != "" { + bundletest.SetLocation(b, "resources.pipelines.p1.target", []dyn.Location{pipelineTargetL}) + } + + diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), SchemaReferences()) + assert.Equal(t, tc.want, diags) + } +} + +func TestValidateSchemaReferencesForVolumes(t *testing.T) { + schemaL := dyn.Location{File: "file1", Line: 1, Column: 1} + volumeSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2} + for _, tc := range []struct { + catalogV string + schemaV string + want diag.Diagnostics + }{ + { + catalogV: "main", + schemaV: "schema1", + want: diag.Diagnostics{{ + Severity: diag.Warning, + Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, + Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this Volume +has on the schema "schema1" and deploy changes to the schema before deploying the Volume.`, + Locations: []dyn.Location{schemaL, volumeSchemaL}, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.volumes.v1.schema"), + dyn.MustPathFromString("resources.schemas.s1"), + }, + }}, + }, + { + catalogV: "main", + schemaV: "${resources.schemas.s1.name}", + want: diag.Diagnostics{}, + }, + { + catalogV: "main", + schemaV: "other", + want: diag.Diagnostics{}, + }, + { + catalogV: "other", + schemaV: "schema1", + want: diag.Diagnostics{}, + }, + } { + b := bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Schemas: map[string]*resources.Schema{ + "s1": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "main", + Name: "schema1", + }, + }, + }, + Volumes: map[string]*resources.Volume{ + "v1": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + SchemaName: tc.schemaV, + CatalogName: tc.catalogV, + Name: "my_volume", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(&b, "resources.schemas.s1", []dyn.Location{schemaL}) + bundletest.SetLocation(&b, "resources.volumes.v1.schema_name", []dyn.Location{volumeSchemaL}) + + diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(&b), SchemaReferences()) + assert.Equal(t, tc.want, diags) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index eb4c3c3cd2..d1420ee801 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -37,6 +37,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics JobTaskClusterSpec(), ValidateFolderPermissions(), SingleNodeCluster(), + SchemaReferences(), )) } From 332a6b01ed9d70c6a138a8914619d735c6bf0320 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 20 Dec 2024 11:44:28 +0530 Subject: [PATCH 02/13] add comments' --- bundle/config/validate/schema_references.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bundle/config/validate/schema_references.go b/bundle/config/validate/schema_references.go index 063ea0be67..733714d242 100644 --- a/bundle/config/validate/schema_references.go +++ b/bundle/config/validate/schema_references.go @@ -34,7 +34,8 @@ func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { diags := diag.Diagnostics{} for k, p := range rb.Config().Resources.Pipelines { - // Skip if the pipeline uses hive metastore. + // Skip if the pipeline uses hive metastore. The DLT API allows creating + // a pipeline without a schema or target when using hive metastore. if p.Catalog == "" { continue } @@ -44,6 +45,8 @@ func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) schemaLocation := []dyn.Location{} switch { case p.Schema == "" && p.Target == "": + // The error message is identical to the one DLT backend returns when + // a schema is not defined for a UC DLT pipeline (date: 20 Dec 2024). diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Unity Catalog pipeline should have a schema or target defined", @@ -62,6 +65,9 @@ TABLE statement if you do not wish to publish your dataset.`, locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema") locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...) + // The Databricks Terraform provider already has client side validation + // that does not allow this today. Having this here allows us to float + // this validation on `bundle validate` and provide location information. diags = append(diags, diag.Diagnostic{ Severity: diag.Error, Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.", From 893118e289cd8d62f04460e0da5e5b107438cf4b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 27 Dec 2024 19:02:08 +0530 Subject: [PATCH 03/13] fix silently --- .../mutator/resolve_schema_dependency.go | 80 ++++++ .../mutator/resolve_schema_dependency_test.go | 259 ++++++++++++++++++ bundle/config/validate/schema_references.go | 136 --------- .../config/validate/schema_references_test.go | 228 --------------- bundle/phases/initialize.go | 1 + 5 files changed, 340 insertions(+), 364 deletions(-) create mode 100644 bundle/config/mutator/resolve_schema_dependency.go create mode 100644 bundle/config/mutator/resolve_schema_dependency_test.go delete mode 100644 bundle/config/validate/schema_references.go delete mode 100644 bundle/config/validate/schema_references_test.go diff --git a/bundle/config/mutator/resolve_schema_dependency.go b/bundle/config/mutator/resolve_schema_dependency.go new file mode 100644 index 0000000000..fbb389fd2a --- /dev/null +++ b/bundle/config/mutator/resolve_schema_dependency.go @@ -0,0 +1,80 @@ +package mutator + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/diag" +) + +type resolveSchemeDependency struct{} + +// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines +// or UC Volumes using the `${resources.schemas..name}` syntax. Using this +// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume +// has on the schema and deploy changes to the schema before deploying the pipeline or volume. +// +// This mutator translates any implicit schema references in DLT pipelines or UC Volumes +// to the explicit syntax. +func ResolveSchemaDependency() bundle.Mutator { + return &resolveSchemeDependency{} +} + +func (m *resolveSchemeDependency) Name() string { + return "ResolveSchemaDependency" +} + +func findSchema(b *bundle.Bundle, catalogName, name string) (string, *resources.Schema) { + if catalogName == "" || name == "" { + return "", nil + } + + for k, s := range b.Config.Resources.Schemas { + if s.CatalogName == catalogName && s.Name == name { + return k, s + } + } + return "", nil +} + +func resolveVolume(v *resources.Volume, b *bundle.Bundle) { + schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName) + if schema == nil { + return + } + + v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) +} + +func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) { + // schema and target have the same semantics in the DLT API but are mutually + // exclusive. If schema is set, the pipeline is in direct publishing mode + // and can write tables to multiple schemas (vs target which is a single schema). + schemaName := p.Schema + if schemaName == "" { + schemaName = p.Target + } + + schemaK, schema := findSchema(b, p.Catalog, schemaName) + if schema == nil { + return + } + + if p.Schema != "" { + p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + } else if p.Target != "" { + p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + } +} + +func (m *resolveSchemeDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + for _, p := range b.Config.Resources.Pipelines { + resolvePipeline(p, b) + } + for _, v := range b.Config.Resources.Volumes { + resolveVolume(v, b) + } + return nil +} diff --git a/bundle/config/mutator/resolve_schema_dependency_test.go b/bundle/config/mutator/resolve_schema_dependency_test.go new file mode 100644 index 0000000000..feb466983d --- /dev/null +++ b/bundle/config/mutator/resolve_schema_dependency_test.go @@ -0,0 +1,259 @@ +package mutator + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestResolveSchemaDependencyForVolume(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Schemas: map[string]*resources.Schema{ + "schema1": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "foobar", + }, + }, + "schema2": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog2", + Name: "foobar", + }, + }, + "schema3": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "barfoo", + }, + }, + }, + Volumes: map[string]*resources.Volume{ + "volume1": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog1", + SchemaName: "foobar", + }, + }, + "volume2": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog2", + SchemaName: "foobar", + }, + }, + "volume3": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog1", + SchemaName: "barfoo", + }, + }, + "volume4": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalogX", + SchemaName: "foobar", + }, + }, + "volume5": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog1", + SchemaName: "schemaX", + }, + }, + }, + }, + }, + } + + d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + require.Nil(t, d) + assert.Equal(t, b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema1.name}") + assert.Equal(t, b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema2.name}") + assert.Equal(t, b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema3.name}") + assert.Equal(t, b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName, "foobar") + assert.Equal(t, b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName, "schemaX") +} + +func TestResolveSchemaDependencyForPipelinesWithTarget(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Schemas: map[string]*resources.Schema{ + "schema1": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "foobar", + }, + }, + "schema2": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog2", + Name: "foobar", + }, + }, + "schema3": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "barfoo", + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Schema: "foobar", + }, + }, + "pipeline2": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog2", + Schema: "foobar", + }, + }, + "pipeline3": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Schema: "barfoo", + }, + }, + "pipeline4": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalogX", + Schema: "foobar", + }, + }, + "pipeline5": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Schema: "schemaX", + }, + }, + "pipeline6": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "", + Schema: "foobar", + }, + }, + "pipeline7": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "", + Schema: "", + Name: "whatever", + }, + }, + }, + }, + }, + } + + d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + require.Nil(t, d) + assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Schema, "${resources.schemas.schema1.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Schema, "${resources.schemas.schema2.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Schema, "${resources.schemas.schema3.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Schema, "foobar") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Schema, "schemaX") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Schema, "foobar") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Schema, "") + + for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { + assert.Empty(t, b.Config.Resources.Pipelines[k].Target) + } +} + +func TestResolveSchemaDependencyForPipelinesWithSchema(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Schemas: map[string]*resources.Schema{ + "schema1": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "foobar", + }, + }, + "schema2": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog2", + Name: "foobar", + }, + }, + "schema3": { + CreateSchema: &catalog.CreateSchema{ + CatalogName: "catalog1", + Name: "barfoo", + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Target: "foobar", + }, + }, + "pipeline2": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog2", + Target: "foobar", + }, + }, + "pipeline3": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Target: "barfoo", + }, + }, + "pipeline4": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalogX", + Target: "foobar", + }, + }, + "pipeline5": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "catalog1", + Target: "schemaX", + }, + }, + "pipeline6": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "", + Target: "foobar", + }, + }, + "pipeline7": { + PipelineSpec: &pipelines.PipelineSpec{ + Catalog: "", + Target: "", + Name: "whatever", + }, + }, + }, + }, + }, + } + + d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + require.Nil(t, d) + assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Target, "${resources.schemas.schema1.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Target, "${resources.schemas.schema2.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Target, "${resources.schemas.schema3.name}") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Target, "foobar") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Target, "schemaX") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Target, "foobar") + assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Target, "") + + for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { + assert.Empty(t, b.Config.Resources.Pipelines[k].Schema) + } +} diff --git a/bundle/config/validate/schema_references.go b/bundle/config/validate/schema_references.go deleted file mode 100644 index 733714d242..0000000000 --- a/bundle/config/validate/schema_references.go +++ /dev/null @@ -1,136 +0,0 @@ -package validate - -import ( - "context" - "fmt" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/dyn" -) - -// Validate that any references to UC schemas defined in the DAB use the ${resources.schemas...} -// syntax to capture the deploy time dependency. -func SchemaReferences() bundle.ReadOnlyMutator { - return &schemaReferences{} -} - -type schemaReferences struct{} - -func (v *schemaReferences) Name() string { - return "validate:schema_dependency" -} - -func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string) ([]dyn.Location, dyn.Path, bool) { - for k, s := range rb.Config().Resources.Schemas { - if s.CatalogName != catalogName || s.Name != schemaName { - continue - } - return rb.Config().GetLocations("resources.schemas." + k), dyn.NewPath(dyn.Key("resources"), dyn.Key("schemas"), dyn.Key(k)), true - } - return nil, nil, false -} - -func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { - diags := diag.Diagnostics{} - for k, p := range rb.Config().Resources.Pipelines { - // Skip if the pipeline uses hive metastore. The DLT API allows creating - // a pipeline without a schema or target when using hive metastore. - if p.Catalog == "" { - continue - } - - schemaName := "" - fieldPath := dyn.Path{} - schemaLocation := []dyn.Location{} - switch { - case p.Schema == "" && p.Target == "": - // The error message is identical to the one DLT backend returns when - // a schema is not defined for a UC DLT pipeline (date: 20 Dec 2024). - diags = append(diags, diag.Diagnostic{ - Severity: diag.Error, - Summary: "Unity Catalog pipeline should have a schema or target defined", - Detail: `The target or schema field is required for UC pipelines. Reason: DLT -requires specifying a target schema for UC pipelines. Please use the -TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING -TABLE statement if you do not wish to publish your dataset.`, - Locations: rb.Config().GetLocations("resources.pipelines." + k), - Paths: []dyn.Path{ - dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")), - dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")), - }, - }) - continue - case p.Schema != "" && p.Target != "": - locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema") - locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...) - - // The Databricks Terraform provider already has client side validation - // that does not allow this today. Having this here allows us to float - // this validation on `bundle validate` and provide location information. - diags = append(diags, diag.Diagnostic{ - Severity: diag.Error, - Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.", - Locations: locations, - Paths: []dyn.Path{ - dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")), - dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")), - }, - }) - continue - case p.Schema != "": - schemaName = p.Schema - fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")) - schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".schema") - case p.Target != "": - schemaName = p.Target - fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")) - schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".target") - } - - // Check if the schema is defined in the bundle. - matchLocations, matchPath, found := findSchemaInBundle(rb, p.Catalog, schemaName) - if !found { - continue - } - - diags = append(diags, diag.Diagnostic{ - Severity: diag.Warning, - Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, schemaName), - Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this DLT pipeline -has on the schema %q and deploy changes to the schema before deploying the pipeline.`, matchPath, schemaName), - Locations: append(schemaLocation, matchLocations...), - Paths: []dyn.Path{ - fieldPath, - matchPath, - }, - }) - } - - for k, v := range rb.Config().Resources.Volumes { - if v.CatalogName == "" || v.SchemaName == "" { - continue - } - - matchLocations, matchPath, found := findSchemaInBundle(rb, v.CatalogName, v.SchemaName) - if !found { - continue - } - - fieldLocations := rb.Config().GetLocations("resources.volumes." + k + ".schema_name") - - diags = append(diags, diag.Diagnostic{ - Severity: diag.Warning, - Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, v.SchemaName), - Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this Volume -has on the schema %q and deploy changes to the schema before deploying the Volume.`, matchPath, v.SchemaName), - Locations: append(matchLocations, fieldLocations...), - Paths: []dyn.Path{ - dyn.NewPath(dyn.Key("resources"), dyn.Key("volumes"), dyn.Key(k), dyn.Key("schema")), - matchPath, - }, - }) - } - - return diags -} diff --git a/bundle/config/validate/schema_references_test.go b/bundle/config/validate/schema_references_test.go deleted file mode 100644 index 0d7510a8fd..0000000000 --- a/bundle/config/validate/schema_references_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package validate - -import ( - "context" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/internal/bundletest" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/dyn" - "github.com/databricks/databricks-sdk-go/service/catalog" - "github.com/databricks/databricks-sdk-go/service/pipelines" - "github.com/stretchr/testify/assert" -) - -func TestValidateSchemaReferencesForPipelines(t *testing.T) { - pipelineTargetL := dyn.Location{File: "file1", Line: 1, Column: 1} - pipelineSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2} - pipelineL := dyn.Location{File: "file3", Line: 3, Column: 3} - schemaL := dyn.Location{File: "file4", Line: 4, Column: 4} - - for _, tc := range []struct { - schemaV string - targetV string - catalogV string - want diag.Diagnostics - }{ - { - schemaV: "", - targetV: "", - catalogV: "", - want: diag.Diagnostics{}, - }, - { - schemaV: "", - targetV: "", - catalogV: "main", - want: diag.Diagnostics{{ - Summary: "Unity Catalog pipeline should have a schema or target defined", - Severity: diag.Error, - Detail: `The target or schema field is required for UC pipelines. Reason: DLT -requires specifying a target schema for UC pipelines. Please use the -TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING -TABLE statement if you do not wish to publish your dataset.`, - Locations: []dyn.Location{pipelineL}, - Paths: []dyn.Path{ - dyn.MustPathFromString("resources.pipelines.p1.schema"), - dyn.MustPathFromString("resources.pipelines.p1.target"), - }, - }}, - }, - { - schemaV: "both", - targetV: "both", - catalogV: "main", - want: diag.Diagnostics{{ - Severity: diag.Error, - Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.", - Locations: []dyn.Location{pipelineSchemaL, pipelineTargetL}, - Paths: []dyn.Path{ - dyn.MustPathFromString("resources.pipelines.p1.schema"), - dyn.MustPathFromString("resources.pipelines.p1.target"), - }, - }}, - }, - { - schemaV: "schema1", - targetV: "", - catalogV: "other", - want: diag.Diagnostics{}, - }, - { - schemaV: "schema1", - targetV: "", - catalogV: "main", - want: diag.Diagnostics{{ - Severity: diag.Warning, - Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, - Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline -has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`, - Locations: []dyn.Location{pipelineSchemaL, schemaL}, - Paths: []dyn.Path{ - dyn.MustPathFromString("resources.pipelines.p1.schema"), - dyn.MustPathFromString("resources.schemas.s1"), - }, - }}, - }, - { - schemaV: "", - targetV: "schema1", - catalogV: "main", - want: diag.Diagnostics{{ - Severity: diag.Warning, - Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, - Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline -has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`, - Locations: []dyn.Location{pipelineTargetL, schemaL}, - Paths: []dyn.Path{ - dyn.MustPathFromString("resources.pipelines.p1.target"), - dyn.MustPathFromString("resources.schemas.s1"), - }, - }}, - }, - { - schemaV: "${resources.schemas.s1.name}", - targetV: "", - catalogV: "main", - want: diag.Diagnostics{}, - }, - { - schemaV: "", - targetV: "${resources.schemas.s1.name}", - catalogV: "main", - want: diag.Diagnostics{}, - }, - } { - - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Schemas: map[string]*resources.Schema{ - "s1": { - CreateSchema: &catalog.CreateSchema{ - CatalogName: "main", - Name: "schema1", - }, - }, - }, - Pipelines: map[string]*resources.Pipeline{ - "p1": { - PipelineSpec: &pipelines.PipelineSpec{ - Name: "abc", - Schema: tc.schemaV, - Target: tc.targetV, - Catalog: tc.catalogV, - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(b, "resources.schemas.s1", []dyn.Location{schemaL}) - bundletest.SetLocation(b, "resources.pipelines.p1", []dyn.Location{pipelineL}) - if tc.schemaV != "" { - bundletest.SetLocation(b, "resources.pipelines.p1.schema", []dyn.Location{pipelineSchemaL}) - } - if tc.targetV != "" { - bundletest.SetLocation(b, "resources.pipelines.p1.target", []dyn.Location{pipelineTargetL}) - } - - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), SchemaReferences()) - assert.Equal(t, tc.want, diags) - } -} - -func TestValidateSchemaReferencesForVolumes(t *testing.T) { - schemaL := dyn.Location{File: "file1", Line: 1, Column: 1} - volumeSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2} - for _, tc := range []struct { - catalogV string - schemaV string - want diag.Diagnostics - }{ - { - catalogV: "main", - schemaV: "schema1", - want: diag.Diagnostics{{ - Severity: diag.Warning, - Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`, - Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this Volume -has on the schema "schema1" and deploy changes to the schema before deploying the Volume.`, - Locations: []dyn.Location{schemaL, volumeSchemaL}, - Paths: []dyn.Path{ - dyn.MustPathFromString("resources.volumes.v1.schema"), - dyn.MustPathFromString("resources.schemas.s1"), - }, - }}, - }, - { - catalogV: "main", - schemaV: "${resources.schemas.s1.name}", - want: diag.Diagnostics{}, - }, - { - catalogV: "main", - schemaV: "other", - want: diag.Diagnostics{}, - }, - { - catalogV: "other", - schemaV: "schema1", - want: diag.Diagnostics{}, - }, - } { - b := bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Schemas: map[string]*resources.Schema{ - "s1": { - CreateSchema: &catalog.CreateSchema{ - CatalogName: "main", - Name: "schema1", - }, - }, - }, - Volumes: map[string]*resources.Volume{ - "v1": { - CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ - SchemaName: tc.schemaV, - CatalogName: tc.catalogV, - Name: "my_volume", - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(&b, "resources.schemas.s1", []dyn.Location{schemaL}) - bundletest.SetLocation(&b, "resources.volumes.v1.schema_name", []dyn.Location{volumeSchemaL}) - - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(&b), SchemaReferences()) - assert.Equal(t, tc.want, diags) - } -} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 6fa0e5fede..913961f33d 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -63,6 +63,7 @@ func Initialize() bundle.Mutator { "workspace", "variables", ), + mutator.ResolveSchemaDependency(), // Provide permission config errors & warnings after initializing all variables permissions.PermissionDiagnostics(), mutator.SetRunAs(), From 1d337c857123ce846eb5180a2f7ba305e9b5cdf6 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 27 Dec 2024 19:10:33 +0530 Subject: [PATCH 04/13] - --- bundle/config/validate/validate.go | 1 - 1 file changed, 1 deletion(-) diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 6b52b920af..131566fc90 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,7 +36,6 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics JobTaskClusterSpec(), ValidateFolderPermissions(), SingleNodeCluster(), - SchemaReferences(), )) } From aa52b1dab3354c56886cae7740b2cb73fb5ade69 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 27 Dec 2024 19:18:14 +0530 Subject: [PATCH 05/13] - --- bundle/config/mutator/resolve_schema_dependency.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/config/mutator/resolve_schema_dependency.go b/bundle/config/mutator/resolve_schema_dependency.go index fbb389fd2a..e2468b78fb 100644 --- a/bundle/config/mutator/resolve_schema_dependency.go +++ b/bundle/config/mutator/resolve_schema_dependency.go @@ -51,7 +51,7 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) { // schema and target have the same semantics in the DLT API but are mutually // exclusive. If schema is set, the pipeline is in direct publishing mode - // and can write tables to multiple schemas (vs target which is a single schema). + // and can write tables to multiple schemas (vs target which is limited to a single schema). schemaName := p.Schema if schemaName == "" { schemaName = p.Target From fc8b5e757640ad9f8183f0630aaa8386b2879fde Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 7 Jan 2025 15:33:29 +0530 Subject: [PATCH 06/13] address comments --- ...ependency.go => capture_schema_dependency.go} | 16 ++++++++-------- ...test.go => capture_schema_dependency_test.go} | 4 ++++ 2 files changed, 12 insertions(+), 8 deletions(-) rename bundle/config/mutator/{resolve_schema_dependency.go => capture_schema_dependency.go} (81%) rename bundle/config/mutator/{resolve_schema_dependency_test.go => capture_schema_dependency_test.go} (98%) diff --git a/bundle/config/mutator/resolve_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go similarity index 81% rename from bundle/config/mutator/resolve_schema_dependency.go rename to bundle/config/mutator/capture_schema_dependency.go index e2468b78fb..979432185d 100644 --- a/bundle/config/mutator/resolve_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -9,7 +9,7 @@ import ( "github.com/databricks/cli/libs/diag" ) -type resolveSchemeDependency struct{} +type captureSchemaDependency struct{} // If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines // or UC Volumes using the `${resources.schemas..name}` syntax. Using this @@ -19,20 +19,20 @@ type resolveSchemeDependency struct{} // This mutator translates any implicit schema references in DLT pipelines or UC Volumes // to the explicit syntax. func ResolveSchemaDependency() bundle.Mutator { - return &resolveSchemeDependency{} + return &captureSchemaDependency{} } -func (m *resolveSchemeDependency) Name() string { - return "ResolveSchemaDependency" +func (m *captureSchemaDependency) Name() string { + return "CaptureSchemaDependency" } -func findSchema(b *bundle.Bundle, catalogName, name string) (string, *resources.Schema) { - if catalogName == "" || name == "" { +func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) { + if catalogName == "" || schemaName == "" { return "", nil } for k, s := range b.Config.Resources.Schemas { - if s.CatalogName == catalogName && s.Name == name { + if s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName { return k, s } } @@ -69,7 +69,7 @@ func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) { } } -func (m *resolveSchemeDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { +func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { for _, p := range b.Config.Resources.Pipelines { resolvePipeline(p, b) } diff --git a/bundle/config/mutator/resolve_schema_dependency_test.go b/bundle/config/mutator/capture_schema_dependency_test.go similarity index 98% rename from bundle/config/mutator/resolve_schema_dependency_test.go rename to bundle/config/mutator/capture_schema_dependency_test.go index feb466983d..4ef170405f 100644 --- a/bundle/config/mutator/resolve_schema_dependency_test.go +++ b/bundle/config/mutator/capture_schema_dependency_test.go @@ -36,6 +36,10 @@ func TestResolveSchemaDependencyForVolume(t *testing.T) { Name: "barfoo", }, }, + "nilschema": {}, + "emptyschema": { + CreateSchema: &catalog.CreateSchema{}, + }, }, Volumes: map[string]*resources.Volume{ "volume1": { From ed59500c1e1d753da9f6fa7e6b0e9c8cd302e121 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 7 Jan 2025 15:35:46 +0530 Subject: [PATCH 07/13] address comments --- bundle/config/mutator/capture_schema_dependency.go | 2 +- .../config/mutator/capture_schema_dependency_test.go | 12 ++++++------ bundle/phases/initialize.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index 979432185d..1bbc4e6ded 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -18,7 +18,7 @@ type captureSchemaDependency struct{} // // This mutator translates any implicit schema references in DLT pipelines or UC Volumes // to the explicit syntax. -func ResolveSchemaDependency() bundle.Mutator { +func CaptureSchemaDependency() bundle.Mutator { return &captureSchemaDependency{} } diff --git a/bundle/config/mutator/capture_schema_dependency_test.go b/bundle/config/mutator/capture_schema_dependency_test.go index 4ef170405f..335783c91e 100644 --- a/bundle/config/mutator/capture_schema_dependency_test.go +++ b/bundle/config/mutator/capture_schema_dependency_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestResolveSchemaDependencyForVolume(t *testing.T) { +func TestCaptureSchemaDependencyForVolume(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -77,7 +77,7 @@ func TestResolveSchemaDependencyForVolume(t *testing.T) { }, } - d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) assert.Equal(t, b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema1.name}") assert.Equal(t, b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema2.name}") @@ -86,7 +86,7 @@ func TestResolveSchemaDependencyForVolume(t *testing.T) { assert.Equal(t, b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName, "schemaX") } -func TestResolveSchemaDependencyForPipelinesWithTarget(t *testing.T) { +func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -159,7 +159,7 @@ func TestResolveSchemaDependencyForPipelinesWithTarget(t *testing.T) { }, } - d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Schema, "${resources.schemas.schema1.name}") assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Schema, "${resources.schemas.schema2.name}") @@ -174,7 +174,7 @@ func TestResolveSchemaDependencyForPipelinesWithTarget(t *testing.T) { } } -func TestResolveSchemaDependencyForPipelinesWithSchema(t *testing.T) { +func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -247,7 +247,7 @@ func TestResolveSchemaDependencyForPipelinesWithSchema(t *testing.T) { }, } - d := bundle.Apply(context.Background(), b, ResolveSchemaDependency()) + d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Target, "${resources.schemas.schema1.name}") assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Target, "${resources.schemas.schema2.name}") diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 913961f33d..72e15b5806 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -63,7 +63,7 @@ func Initialize() bundle.Mutator { "workspace", "variables", ), - mutator.ResolveSchemaDependency(), + mutator.CaptureSchemaDependency(), // Provide permission config errors & warnings after initializing all variables permissions.PermissionDiagnostics(), mutator.SetRunAs(), From fcc47de4fe4a6ba90c265be0d25cd5bf2482dabb Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 7 Jan 2025 15:48:51 +0530 Subject: [PATCH 08/13] address comments --- .../mutator/capture_schema_dependency.go | 6 ++ .../mutator/capture_schema_dependency_test.go | 62 +++++++++++++------ 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index 1bbc4e6ded..4cc59143d6 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -40,6 +40,9 @@ func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *reso } func resolveVolume(v *resources.Volume, b *bundle.Bundle) { + if v.CreateVolumeRequestContent == nil { + return + } schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName) if schema == nil { return @@ -49,6 +52,9 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { } func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) { + if p.PipelineSpec == nil { + return + } // schema and target have the same semantics in the DLT API but are mutually // exclusive. If schema is set, the pipeline is in direct publishing mode // and can write tables to multiple schemas (vs target which is limited to a single schema). diff --git a/bundle/config/mutator/capture_schema_dependency_test.go b/bundle/config/mutator/capture_schema_dependency_test.go index 335783c91e..b26b1eacad 100644 --- a/bundle/config/mutator/capture_schema_dependency_test.go +++ b/bundle/config/mutator/capture_schema_dependency_test.go @@ -72,6 +72,10 @@ func TestCaptureSchemaDependencyForVolume(t *testing.T) { SchemaName: "schemaX", }, }, + "nilVolume": {}, + "emptyVolume": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{}, + }, }, }, }, @@ -79,11 +83,15 @@ func TestCaptureSchemaDependencyForVolume(t *testing.T) { d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) - assert.Equal(t, b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema1.name}") - assert.Equal(t, b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema2.name}") - assert.Equal(t, b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema3.name}") - assert.Equal(t, b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName, "foobar") - assert.Equal(t, b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName, "schemaX") + + assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName) + assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName) + assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName) + assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName) + assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName) + + assert.Nil(t, b.Config.Resources.Volumes["nilVolume"].CreateVolumeRequestContent) + assert.Empty(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent) } func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { @@ -109,6 +117,10 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { Name: "barfoo", }, }, + "nilschema": {}, + "emptyschema": { + CreateSchema: &catalog.CreateSchema{}, + }, }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { @@ -154,6 +166,10 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { Name: "whatever", }, }, + "nilPipeline": {}, + "emptyPipeline": { + PipelineSpec: &pipelines.PipelineSpec{}, + }, }, }, }, @@ -161,13 +177,17 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) - assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Schema, "${resources.schemas.schema1.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Schema, "${resources.schemas.schema2.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Schema, "${resources.schemas.schema3.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Schema, "foobar") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Schema, "schemaX") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Schema, "foobar") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Schema, "") + + assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Schema) + assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Schema) + assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Schema) + assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Schema) + assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Schema) + assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema) + assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema) + + assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"].PipelineSpec) + assert.Empty(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec) for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { assert.Empty(t, b.Config.Resources.Pipelines[k].Target) @@ -197,6 +217,10 @@ func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) { Name: "barfoo", }, }, + "nilschema": {}, + "emptyschema": { + CreateSchema: &catalog.CreateSchema{}, + }, }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { @@ -249,13 +273,13 @@ func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) { d := bundle.Apply(context.Background(), b, CaptureSchemaDependency()) require.Nil(t, d) - assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Target, "${resources.schemas.schema1.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Target, "${resources.schemas.schema2.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Target, "${resources.schemas.schema3.name}") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Target, "foobar") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Target, "schemaX") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Target, "foobar") - assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Target, "") + assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Target) + assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Target) + assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Target) + assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Target) + assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Target) + assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Target) + assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Target) for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { assert.Empty(t, b.Config.Resources.Pipelines[k].Schema) From 4d09201588ad35ee850d0d44775976ea2ce32e8e Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 7 Jan 2025 15:54:33 +0530 Subject: [PATCH 09/13] two functions --- .../mutator/capture_schema_dependency.go | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index 4cc59143d6..c42eaadcb4 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -51,33 +51,43 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) } -func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) { +func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { if p.PipelineSpec == nil { return } - // schema and target have the same semantics in the DLT API but are mutually - // exclusive. If schema is set, the pipeline is in direct publishing mode - // and can write tables to multiple schemas (vs target which is limited to a single schema). - schemaName := p.Schema - if schemaName == "" { - schemaName = p.Target + if p.Schema == "" { + return } - - schemaK, schema := findSchema(b, p.Catalog, schemaName) + schemaK, schema := findSchema(b, p.Catalog, p.Schema) if schema == nil { return } - if p.Schema != "" { - p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) - } else if p.Target != "" { - p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) +} + +func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { + if p.PipelineSpec == nil { + return + } + if p.Target == "" { + return + } + schemaK, schema := findSchema(b, p.Catalog, p.Target) + if schema == nil { + return } + p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) } func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { for _, p := range b.Config.Resources.Pipelines { - resolvePipeline(p, b) + // "schema" and "target" have the same semantics in the DLT API but are mutually + // exclusive i.e. only one can be set at a time. If schema is set, the pipeline + // is in direct publishing mode and can write tables to multiple schemas + // (vs target which is limited to a single schema). + resolvePipelineTarget(p, b) + resolvePipelineSchema(p, b) } for _, v := range b.Config.Resources.Volumes { resolveVolume(v, b) From 69d477ff0b7ff7a8c78c17509f656045823a329b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 7 Jan 2025 15:58:07 +0530 Subject: [PATCH 10/13] - --- bundle/config/mutator/capture_schema_dependency.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index c42eaadcb4..2a96fc6e20 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -51,7 +51,7 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) } -func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { +func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { if p.PipelineSpec == nil { return } @@ -66,7 +66,7 @@ func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) } -func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { +func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { if p.PipelineSpec == nil { return } From 58aca2a058dd333dc1d4555e70351d9417a2caab Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 16 Jan 2025 13:02:21 +0100 Subject: [PATCH 11/13] address comments --- .../mutator/capture_schema_dependency.go | 11 +++++- .../mutator/capture_schema_dependency_test.go | 38 +++++++------------ 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index 2a96fc6e20..e910857907 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -32,7 +32,7 @@ func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *reso } for k, s := range b.Config.Resources.Schemas { - if s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName { + if s != nil && s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName { return k, s } } @@ -40,6 +40,9 @@ func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *reso } func resolveVolume(v *resources.Volume, b *bundle.Bundle) { + if v == nil { + return + } if v.CreateVolumeRequestContent == nil { return } @@ -52,6 +55,9 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { } func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { + if p == nil { + return + } if p.PipelineSpec == nil { return } @@ -67,6 +73,9 @@ func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { } func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { + if p == nil { + return + } if p.PipelineSpec == nil { return } diff --git a/bundle/config/mutator/capture_schema_dependency_test.go b/bundle/config/mutator/capture_schema_dependency_test.go index b26b1eacad..0a94e77487 100644 --- a/bundle/config/mutator/capture_schema_dependency_test.go +++ b/bundle/config/mutator/capture_schema_dependency_test.go @@ -36,10 +36,8 @@ func TestCaptureSchemaDependencyForVolume(t *testing.T) { Name: "barfoo", }, }, - "nilschema": {}, - "emptyschema": { - CreateSchema: &catalog.CreateSchema{}, - }, + "nilschema": nil, + "emptyschema": {}, }, Volumes: map[string]*resources.Volume{ "volume1": { @@ -72,10 +70,8 @@ func TestCaptureSchemaDependencyForVolume(t *testing.T) { SchemaName: "schemaX", }, }, - "nilVolume": {}, - "emptyVolume": { - CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{}, - }, + "nilVolume": nil, + "emptyVolume": {}, }, }, }, @@ -90,8 +86,8 @@ func TestCaptureSchemaDependencyForVolume(t *testing.T) { assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName) assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName) - assert.Nil(t, b.Config.Resources.Volumes["nilVolume"].CreateVolumeRequestContent) - assert.Empty(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent) + assert.Nil(t, b.Config.Resources.Volumes["nilVolume"]) + assert.Nil(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent) } func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { @@ -117,10 +113,8 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { Name: "barfoo", }, }, - "nilschema": {}, - "emptyschema": { - CreateSchema: &catalog.CreateSchema{}, - }, + "nilschema": nil, + "emptyschema": {}, }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { @@ -166,10 +160,8 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { Name: "whatever", }, }, - "nilPipeline": {}, - "emptyPipeline": { - PipelineSpec: &pipelines.PipelineSpec{}, - }, + "nilPipeline": nil, + "emptyPipeline": {}, }, }, }, @@ -186,8 +178,8 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema) assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema) - assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"].PipelineSpec) - assert.Empty(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec) + assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"]) + assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec) for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { assert.Empty(t, b.Config.Resources.Pipelines[k].Target) @@ -217,10 +209,8 @@ func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) { Name: "barfoo", }, }, - "nilschema": {}, - "emptyschema": { - CreateSchema: &catalog.CreateSchema{}, - }, + "nilschema": nil, + "emptyschema": {}, }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { From ba3a36ea0d279e41cb87003556e21124d086580d Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 16 Jan 2025 13:31:36 +0100 Subject: [PATCH 12/13] fix test --- bundle/phases/initialize.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index c5b8751961..b21b6d8e7a 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -66,6 +66,11 @@ func Initialize() bundle.Mutator { "workspace", "variables", ), + mutator.ResolveVariableReferences( + "bundle", + "workspace", + "variables", + ), mutator.MergeJobClusters(), mutator.MergeJobParameters(), From 5eb7f4ab2f9bb2eef1ead677567a39772d426d91 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 16 Jan 2025 14:08:22 +0100 Subject: [PATCH 13/13] address comments --- .../mutator/capture_schema_dependency.go | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index e910857907..5025c9a0d5 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -26,6 +26,10 @@ func (m *captureSchemaDependency) Name() string { return "CaptureSchemaDependency" } +func schemaNameRef(key string) string { + return fmt.Sprintf("${resources.schemas.%s.name}", key) +} + func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) { if catalogName == "" || schemaName == "" { return "", nil @@ -40,10 +44,7 @@ func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *reso } func resolveVolume(v *resources.Volume, b *bundle.Bundle) { - if v == nil { - return - } - if v.CreateVolumeRequestContent == nil { + if v == nil || v.CreateVolumeRequestContent == nil { return } schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName) @@ -51,14 +52,11 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { return } - v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + v.SchemaName = schemaNameRef(schemaK) } func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { - if p == nil { - return - } - if p.PipelineSpec == nil { + if p == nil || p.PipelineSpec == nil { return } if p.Schema == "" { @@ -69,14 +67,11 @@ func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { return } - p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + p.Schema = schemaNameRef(schemaK) } func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { - if p == nil { - return - } - if p.PipelineSpec == nil { + if p == nil || p.PipelineSpec == nil { return } if p.Target == "" { @@ -86,7 +81,7 @@ func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { if schema == nil { return } - p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK) + p.Target = schemaNameRef(schemaK) } func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {