diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java index 017b3970b5..604650f3ce 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java @@ -18,8 +18,8 @@ import io.openlineage.spark.agent.facets.builder.DatabricksEnvironmentFacetBuilder; import io.openlineage.spark.agent.facets.builder.DebugRunFacetBuilder; import io.openlineage.spark.agent.facets.builder.ErrorFacetBuilder; -import io.openlineage.spark.agent.facets.builder.GCPJobFacetBuilder; -import io.openlineage.spark.agent.facets.builder.GCPRunFacetBuilder; +import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder; +import io.openlineage.spark.agent.facets.builder.GcpRunFacetBuilder; import io.openlineage.spark.agent.facets.builder.LogicalPlanRunFacetBuilder; import io.openlineage.spark.agent.facets.builder.OutputStatisticsOutputDatasetFacetBuilder; import io.openlineage.spark.agent.facets.builder.OwnershipJobFacetBuilder; @@ -215,7 +215,7 @@ public Collection>> createOutputData if (DatabricksEnvironmentFacetBuilder.isDatabricksRuntime()) { listBuilder.add(new DatabricksEnvironmentFacetBuilder(context)); } else if (GCPUtils.isDataprocRuntime()) { - listBuilder.add(new GCPRunFacetBuilder(context)); + listBuilder.add(new GcpRunFacetBuilder(context)); } else if (context.getCustomEnvironmentVariables() != null) { listBuilder.add(new CustomEnvironmentFacetBuilder(context)); } @@ -234,7 +234,7 @@ public Collection>> createOutputData listBuilder.add(new OwnershipJobFacetBuilder(context)); if (GCPUtils.isDataprocRuntime()) { - listBuilder.add(new GCPJobFacetBuilder(context)); + listBuilder.add(new GcpJobFacetBuilder(context)); } return listBuilder.build(); } diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java index e5c587c876..b3a0e2d266 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java @@ -14,8 +14,8 @@ import io.openlineage.spark.agent.OpenLineageSparkListener; import io.openlineage.spark.agent.Versions; import io.openlineage.spark.agent.facets.ErrorFacet; -import io.openlineage.spark.agent.facets.builder.GCPJobFacetBuilder; -import io.openlineage.spark.agent.facets.builder.GCPRunFacetBuilder; +import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder; +import io.openlineage.spark.agent.facets.builder.GcpRunFacetBuilder; import io.openlineage.spark.agent.facets.builder.SparkJobDetailsFacetBuilder; import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate; import io.openlineage.spark.agent.facets.builder.SparkPropertyFacetBuilder; @@ -280,7 +280,7 @@ protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListene addProcessingEventFacet(runFacetsBuilder); addSparkPropertyFacet(runFacetsBuilder, event); - addGCPRunFacet(runFacetsBuilder, event); + addGcpRunFacet(runFacetsBuilder, event); addSparkJobDetailsFacet(runFacetsBuilder, event); return runFacetsBuilder.build(); @@ -299,11 +299,11 @@ private void addSparkPropertyFacet(OpenLineage.RunFacetsBuilder b0, SparkListene b0.put("spark_properties", new SparkPropertyFacetBuilder().buildFacet(event)); } - private void addGCPRunFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) { + private void addGcpRunFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) { if (!GCPUtils.isDataprocRuntime()) return; sparkContextOption.ifPresent( context -> { - GCPRunFacetBuilder b1 = new GCPRunFacetBuilder(context); + GcpRunFacetBuilder b1 = new GcpRunFacetBuilder(context); b1.accept(event, b0::put); }); } @@ -321,15 +321,15 @@ private OpenLineage.ParentRunFacet buildApplicationParentFacet() { protected OpenLineage.JobFacets buildJobFacets(SparkListenerEvent sparkListenerEvent) { OpenLineage.JobFacetsBuilder jobFacetsBuilder = openLineage.newJobFacetsBuilder(); - addGCPJobFacets(jobFacetsBuilder, sparkListenerEvent); + addGcpJobFacets(jobFacetsBuilder, sparkListenerEvent); return jobFacetsBuilder.build(); } - private void addGCPJobFacets(OpenLineage.JobFacetsBuilder b0, SparkListenerEvent event) { + private void addGcpJobFacets(OpenLineage.JobFacetsBuilder b0, SparkListenerEvent event) { if (!GCPUtils.isDataprocRuntime()) return; sparkContextOption.ifPresent( context -> { - GCPJobFacetBuilder b1 = new GCPJobFacetBuilder(context); + GcpJobFacetBuilder b1 = new GcpJobFacetBuilder(context); b1.accept(event, b0::put); }); } diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPDataprocRunFacet.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpDataprocRunFacet.java similarity index 89% rename from integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPDataprocRunFacet.java rename to integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpDataprocRunFacet.java index ef7154bdc9..5df9f6f179 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPDataprocRunFacet.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpDataprocRunFacet.java @@ -13,13 +13,13 @@ import java.util.LinkedHashMap; import java.util.Map; -public class GCPDataprocRunFacet implements OpenLineage.RunFacet { +public class GcpDataprocRunFacet implements OpenLineage.RunFacet { private final URI _producer; private final URI _schemaURL; @JsonAnySetter private final Map additionalProperties; - public GCPDataprocRunFacet(Map properties) { + public GcpDataprocRunFacet(Map properties) { this._producer = Versions.OPEN_LINEAGE_PRODUCER_URI; this._schemaURL = URI.create( diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPCommonJobFacet.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpLineageJobFacet.java similarity index 79% rename from integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPCommonJobFacet.java rename to integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpLineageJobFacet.java index 0d8c4c391f..dfafa57255 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GCPCommonJobFacet.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/GcpLineageJobFacet.java @@ -13,22 +13,22 @@ import java.util.LinkedHashMap; import java.util.Map; -public class GCPCommonJobFacet implements OpenLineage.JobFacet { +public class GcpLineageJobFacet implements OpenLineage.JobFacet { private final URI _producer; private final URI _schemaURL; private final Boolean _deleted; @JsonAnySetter private final Map additionalProperties; - public GCPCommonJobFacet(Map properties) { + public GcpLineageJobFacet(Map properties) { this(properties, null); } - public GCPCommonJobFacet(Map properties, Boolean _deleted) { + public GcpLineageJobFacet(Map properties, Boolean _deleted) { this._producer = Versions.OPEN_LINEAGE_PRODUCER_URI; this._schemaURL = URI.create( - "https://openlineage.io/spec/facets/1-0-0/GcpCommonJobFacet.json#/$defs/GcpCommonJobFacet"); + "https://openlineage.io/spec/facets/1-0-0/GcpLineageJobFacet.json#/$defs/GcpLineageJobFacet"); this._deleted = _deleted; this.additionalProperties = new LinkedHashMap<>(properties); } diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPJobFacetBuilder.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpJobFacetBuilder.java similarity index 75% rename from integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPJobFacetBuilder.java rename to integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpJobFacetBuilder.java index ad5481f345..d0813f850a 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPJobFacetBuilder.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpJobFacetBuilder.java @@ -6,7 +6,7 @@ package io.openlineage.spark.agent.facets.builder; import io.openlineage.client.OpenLineage.JobFacet; -import io.openlineage.spark.agent.facets.GCPCommonJobFacet; +import io.openlineage.spark.agent.facets.GcpLineageJobFacet; import io.openlineage.spark.agent.util.GCPUtils; import io.openlineage.spark.api.CustomFacetBuilder; import io.openlineage.spark.api.OpenLineageContext; @@ -20,24 +20,24 @@ * {@link CustomFacetBuilder} responsible for generating GCP-specific job facets when using * OpenLineage on Google Cloud Platform (GCP). */ -public class GCPJobFacetBuilder extends CustomFacetBuilder { +public class GcpJobFacetBuilder extends CustomFacetBuilder { private final SparkContext sparkContext; - public GCPJobFacetBuilder(OpenLineageContext openLineageContext) { + public GcpJobFacetBuilder(OpenLineageContext openLineageContext) { this.sparkContext = openLineageContext.getSparkContext().get(); } - public GCPJobFacetBuilder(SparkContext sparkContext) { + public GcpJobFacetBuilder(SparkContext sparkContext) { this.sparkContext = sparkContext; } @Override protected void build(SparkListenerEvent event, BiConsumer consumer) { - consumer.accept("gcp_common", new GCPCommonJobFacet(getCommonAttributes())); + consumer.accept("gcp_lineage", new GcpLineageJobFacet(getGcpLineageAttributes())); } - private Map getCommonAttributes() { + private Map getGcpLineageAttributes() { Map commonProperties = new HashMap<>(); commonProperties.put("origin", GCPUtils.getOriginFacetMap(sparkContext)); diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPRunFacetBuilder.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpRunFacetBuilder.java similarity index 81% rename from integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPRunFacetBuilder.java rename to integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpRunFacetBuilder.java index d3ca517613..46c7baa87c 100644 --- a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GCPRunFacetBuilder.java +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/GcpRunFacetBuilder.java @@ -6,7 +6,7 @@ package io.openlineage.spark.agent.facets.builder; import io.openlineage.client.OpenLineage.RunFacet; -import io.openlineage.spark.agent.facets.GCPDataprocRunFacet; +import io.openlineage.spark.agent.facets.GcpDataprocRunFacet; import io.openlineage.spark.agent.util.GCPUtils; import io.openlineage.spark.api.CustomFacetBuilder; import io.openlineage.spark.api.OpenLineageContext; @@ -20,17 +20,17 @@ * {@link CustomFacetBuilder} responsible for generating GCP-specific run facets when using * OpenLineage on Google Cloud Platform (GCP). */ -public class GCPRunFacetBuilder extends CustomFacetBuilder { +public class GcpRunFacetBuilder extends CustomFacetBuilder { private final SparkContext sparkContext; private final Optional maybeOLContext; - public GCPRunFacetBuilder(OpenLineageContext openLineageContext) { + public GcpRunFacetBuilder(OpenLineageContext openLineageContext) { this.sparkContext = openLineageContext.getSparkContext().get(); this.maybeOLContext = Optional.of(openLineageContext); } - public GCPRunFacetBuilder(SparkContext sparkContext) { + public GcpRunFacetBuilder(SparkContext sparkContext) { this.sparkContext = sparkContext; this.maybeOLContext = Optional.empty(); } @@ -38,7 +38,7 @@ public GCPRunFacetBuilder(SparkContext sparkContext) { @Override protected void build(SparkListenerEvent event, BiConsumer consumer) { if (GCPUtils.isDataprocRuntime()) - consumer.accept("gcp_dataproc", new GCPDataprocRunFacet(getDataprocAttributes())); + consumer.accept("gcp_dataproc_spark", new GcpDataprocRunFacet(getDataprocAttributes())); } private Map getDataprocAttributes() { diff --git a/proposals/2161/registry.md b/proposals/2161/registry.md index cc28319356..e58c9c2bb4 100644 --- a/proposals/2161/registry.md +++ b/proposals/2161/registry.md @@ -91,11 +91,11 @@ OpenLineage Producer: Producer root doc URL: https://… Produced facets: - { facets: [ {“URI}”, “{URI}”, … ]} + { produced_facets: [ {“URI}”, “{URI}”, … ]} Consumer: Consumer root doc URL: https://… Consumed facets: - { facets: [ “{URI}”, “{URI}”, … ]} + { consumed_facets: [ “{URI}”, “{URI}”, … ]} /facets/ <- where custom facet schemas are stored /facets/examples/{FacetName}/{number}.json <- where facet examples are stored ``` @@ -127,7 +127,7 @@ core/ producer: { root_doc_URL: "https://openlineage.io/spec/facets/", sample_URL: "https://github.com/OpenLineage/OpenLineage/tree/main/spec/tests/", - facets: [ + produced_facets: [ "ColumnLineageDatasetFacet.json": { "owner": "core" }, @@ -145,7 +145,7 @@ egeria/ producer: { root_doc_URL: … , sample_URL: … , - facets: [ + produced_facets: [ "ColumnLineageDatasetFacet.json": { "owner": "core" }, @@ -156,7 +156,7 @@ egeria/ }, consumer: { root_doc_URL: … - facets: [ + consumed_facets: [ "NewCustomFacet.json": { "owner": "egeria" } diff --git a/spec/registry/gcp/dataproc/facets/GcpDataprocSparkRunFacet.json b/spec/registry/gcp/dataproc/facets/GcpDataprocSparkRunFacet.json new file mode 100644 index 0000000000..81bf9d77f7 --- /dev/null +++ b/spec/registry/gcp/dataproc/facets/GcpDataprocSparkRunFacet.json @@ -0,0 +1,75 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://openlineage.io/spec/facets/1-0-0/GcpDataprocSparkRunFacet.json", + "$defs": { + "GcpDataprocSparkRunFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "appId": { + "description": "Application ID set in the spark configuration of the current context. Its format depends on the resource manager.", + "type": "string" + }, + "appName": { + "description": "App name set in the spark configuration of the current context. It may be provided by the user.", + "type": "string" + }, + "batchId": { + "description": "Populated only for Dataproc serverless batches. The resource id of the batch.", + "type": "string" + }, + "batchUuid": { + "description": "Populated only for Dataproc serverless batches. A UUID generated by the service when it creates the batch.", + "type": "string" + }, + "clusterName": { + "description": "Populated only for Dataproc GCE workloads. The cluster name is unique within a GCP project.", + "type": "string" + }, + "clusterUuid": { + "description": "Populated only for Dataproc GCE workloads. A UUID generated by the service at the time of cluster creation.", + "type": "string" + }, + "jobId": { + "description": "Populated only for Dataproc GCE workloads. If not specified by the user, the job ID will be provided by the service.", + "type": "string" + }, + "jobUuid": { + "description": "Populated only for Dataproc GCE workloads. A UUID that uniquely identifies a job within the project over time.", + "type": "string" + }, + "projectId": { + "description": "The GCP project ID that the resource belongs to.", + "type": "string" + }, + "queryNodeName": { + "description": "The name of the query node in the executed Spark Plan. Often used to describe the command being executed.", + "type": "string" + }, + "sessionId": { + "description": "Populated only for Dataproc serverless interactive sessions. The resource id of the session, used for URL generation.", + "type": "string" + }, + "sessionUuid": { + "description": "Populated only for Dataproc serverless interactive sessions. A UUID generated by the service when it creates the session.", + "type": "string" + } + }, + "required": ["appId", "appName", "projectId"], + "additionalProperties": true + } + ], + "type": "object" + } + }, + "type": "object", + "properties": { + "gcp_dataproc_spark": { + "$ref": "#/$defs/GcpDataprocSparkRunFacet" + } + } +} diff --git a/spec/registry/gcp/dataproc/registry.json b/spec/registry/gcp/dataproc/registry.json new file mode 100644 index 0000000000..cabd2d89b5 --- /dev/null +++ b/spec/registry/gcp/dataproc/registry.json @@ -0,0 +1,6 @@ +{ + "producer": { + "root_doc_URL": "https://cloud.google.com/dataproc/docs/guides/lineage", + "produced_facets": ["ol:gcp:dataproc:GcpDataprocSparkRunFacet.json"] + } +} diff --git a/spec/registry/gcp/facets/GcpCommonJobFacet.json b/spec/registry/gcp/lineage/facets/GcpLineageJobFacet.json similarity index 89% rename from spec/registry/gcp/facets/GcpCommonJobFacet.json rename to spec/registry/gcp/lineage/facets/GcpLineageJobFacet.json index 9f37dc6f68..ae478c4b32 100644 --- a/spec/registry/gcp/facets/GcpCommonJobFacet.json +++ b/spec/registry/gcp/lineage/facets/GcpLineageJobFacet.json @@ -1,8 +1,8 @@ { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://openlineage.io/spec/facets/1-0-0/GcpCommonJobFacet.json", + "$id": "https://openlineage.io/spec/facets/1-0-0/GcpLineageJobFacet.json", "$defs": { - "GcpCommonJobFacet": { + "GcpLineageJobFacet": { "allOf": [ { "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/JobFacet" @@ -36,8 +36,8 @@ }, "type": "object", "properties": { - "gcp_common": { - "$ref": "#/$defs/GcpCommonJobFacet" + "gcp_lineage": { + "$ref": "#/$defs/GcpLineageJobFacet" } } } diff --git a/spec/registry/gcp/registry.json b/spec/registry/gcp/lineage/registry.json similarity index 66% rename from spec/registry/gcp/registry.json rename to spec/registry/gcp/lineage/registry.json index 63f75a3c3a..2fb625611e 100644 --- a/spec/registry/gcp/registry.json +++ b/spec/registry/gcp/lineage/registry.json @@ -1,6 +1,6 @@ { "consumer": { "root_doc_URL": "https://cloud.google.com/data-catalog/docs/reference/data-lineage/rpc/google.cloud.datacatalog.lineage.v1#google.cloud.datacatalog.lineage.v1.Lineage.ProcessOpenLineageRunEvent", - "produced_facets": ["ol:gcp:GcpCommonJobFacet.json"] + "consumed_facets": ["ol:gcp:lineage:GcpLineageJobFacet.json", "ol:gcp:dataproc:GcpDataprocSparkRunFacet.json"] } }