Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch references to UC schemas to capture dependencies automatically #1989

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
96 changes: 96 additions & 0 deletions bundle/config/mutator/capture_schema_dependency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)

type captureSchemaDependency struct{}

// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
func CaptureSchemaDependency() bundle.Mutator {
return &captureSchemaDependency{}
}

func (m *captureSchemaDependency) Name() string {
return "CaptureSchemaDependency"
}

func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) {
if catalogName == "" || schemaName == "" {
return "", nil
}

for k, s := range b.Config.Resources.Schemas {
if s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName {
return k, s
}
}
return "", nil
}

func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
if v.CreateVolumeRequestContent == nil {
return
}
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}

v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}

func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
if p.PipelineSpec == nil {
return
}
if p.Schema == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Schema)
if schema == nil {
return
}

p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}

func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) {
if p.PipelineSpec == nil {
return
}
if p.Target == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Target)
if schema == nil {
return
}
p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}

func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, p := range b.Config.Resources.Pipelines {
// "schema" and "target" have the same semantics in the DLT API but are mutually
// exclusive i.e. only one can be set at a time. If schema is set, the pipeline
// is in direct publishing mode and can write tables to multiple schemas
// (vs target which is limited to a single schema).
resolvePipelineTarget(p, b)
resolvePipelineSchema(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}
287 changes: 287 additions & 0 deletions bundle/config/mutator/capture_schema_dependency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package mutator

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCaptureSchemaDependencyForVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": {},
"emptyschema": {
CreateSchema: &catalog.CreateSchema{},
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nilschema should be nil as in actual nil and empty as in what's now the nilschema.

},
Volumes: map[string]*resources.Volume{
"volume1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "foobar",
},
},
"volume2": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog2",
SchemaName: "foobar",
},
},
"volume3": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "barfoo",
},
},
"volume4": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalogX",
SchemaName: "foobar",
},
},
"volume5": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "schemaX",
},
},
"nilVolume": {},
"emptyVolume": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{},
},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName)

assert.Nil(t, b.Config.Resources.Volumes["nilVolume"].CreateVolumeRequestContent)
assert.Empty(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent)
}

func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": {},
"emptyschema": {
CreateSchema: &catalog.CreateSchema{},
},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Schema: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Schema: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "",
Name: "whatever",
},
},
"nilPipeline": {},
"emptyPipeline": {
PipelineSpec: &pipelines.PipelineSpec{},
},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)

assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Schema)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Schema)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Schema)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema)

assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"].PipelineSpec)
assert.Empty(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
}
}

func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": {},
"emptyschema": {
CreateSchema: &catalog.CreateSchema{},
},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Target: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Target: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "",
Name: "whatever",
},
},
},
},
},
}

d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Target)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Target)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Target)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Target)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Target)

for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Schema)
}
}
1 change: 1 addition & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func Initialize() bundle.Mutator {
"workspace",
"variables",
),
mutator.CaptureSchemaDependency(),
// Provide permission config errors & warnings after initializing all variables
permissions.PermissionDiagnostics(),
mutator.SetRunAs(),
Expand Down
Loading