Skip to content

Commit

Permalink
[Spec] gcp dataproc facet (OpenLineage#2987)
Browse files Browse the repository at this point in the history
* gcp dataproc facet

Signed-off-by: tnazarew <[email protected]>

* change filenames

Signed-off-by: tnazarew <[email protected]>

---------

Signed-off-by: tnazarew <[email protected]>
  • Loading branch information
tnazarew authored Aug 28, 2024
1 parent b34218d commit 1a64c0e
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import io.openlineage.spark.agent.facets.builder.DatabricksEnvironmentFacetBuilder;
import io.openlineage.spark.agent.facets.builder.DebugRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.ErrorFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GCPJobFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GCPRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GcpRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.LogicalPlanRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.OutputStatisticsOutputDatasetFacetBuilder;
import io.openlineage.spark.agent.facets.builder.OwnershipJobFacetBuilder;
Expand Down Expand Up @@ -215,7 +215,7 @@ public Collection<PartialFunction<Object, List<OutputDataset>>> createOutputData
if (DatabricksEnvironmentFacetBuilder.isDatabricksRuntime()) {
listBuilder.add(new DatabricksEnvironmentFacetBuilder(context));
} else if (GCPUtils.isDataprocRuntime()) {
listBuilder.add(new GCPRunFacetBuilder(context));
listBuilder.add(new GcpRunFacetBuilder(context));
} else if (context.getCustomEnvironmentVariables() != null) {
listBuilder.add(new CustomEnvironmentFacetBuilder(context));
}
Expand All @@ -234,7 +234,7 @@ public Collection<PartialFunction<Object, List<OutputDataset>>> createOutputData

listBuilder.add(new OwnershipJobFacetBuilder(context));
if (GCPUtils.isDataprocRuntime()) {
listBuilder.add(new GCPJobFacetBuilder(context));
listBuilder.add(new GcpJobFacetBuilder(context));
}
return listBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import io.openlineage.spark.agent.OpenLineageSparkListener;
import io.openlineage.spark.agent.Versions;
import io.openlineage.spark.agent.facets.ErrorFacet;
import io.openlineage.spark.agent.facets.builder.GCPJobFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GCPRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GcpJobFacetBuilder;
import io.openlineage.spark.agent.facets.builder.GcpRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.SparkJobDetailsFacetBuilder;
import io.openlineage.spark.agent.facets.builder.SparkProcessingEngineRunFacetBuilderDelegate;
import io.openlineage.spark.agent.facets.builder.SparkPropertyFacetBuilder;
Expand Down Expand Up @@ -280,7 +280,7 @@ protected OpenLineage.RunFacets buildRunFacets(ErrorFacet jobError, SparkListene

addProcessingEventFacet(runFacetsBuilder);
addSparkPropertyFacet(runFacetsBuilder, event);
addGCPRunFacet(runFacetsBuilder, event);
addGcpRunFacet(runFacetsBuilder, event);
addSparkJobDetailsFacet(runFacetsBuilder, event);

return runFacetsBuilder.build();
Expand All @@ -299,11 +299,11 @@ private void addSparkPropertyFacet(OpenLineage.RunFacetsBuilder b0, SparkListene
b0.put("spark_properties", new SparkPropertyFacetBuilder().buildFacet(event));
}

private void addGCPRunFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) {
private void addGcpRunFacet(OpenLineage.RunFacetsBuilder b0, SparkListenerEvent event) {
if (!GCPUtils.isDataprocRuntime()) return;
sparkContextOption.ifPresent(
context -> {
GCPRunFacetBuilder b1 = new GCPRunFacetBuilder(context);
GcpRunFacetBuilder b1 = new GcpRunFacetBuilder(context);
b1.accept(event, b0::put);
});
}
Expand All @@ -321,15 +321,15 @@ private OpenLineage.ParentRunFacet buildApplicationParentFacet() {

protected OpenLineage.JobFacets buildJobFacets(SparkListenerEvent sparkListenerEvent) {
OpenLineage.JobFacetsBuilder jobFacetsBuilder = openLineage.newJobFacetsBuilder();
addGCPJobFacets(jobFacetsBuilder, sparkListenerEvent);
addGcpJobFacets(jobFacetsBuilder, sparkListenerEvent);
return jobFacetsBuilder.build();
}

private void addGCPJobFacets(OpenLineage.JobFacetsBuilder b0, SparkListenerEvent event) {
private void addGcpJobFacets(OpenLineage.JobFacetsBuilder b0, SparkListenerEvent event) {
if (!GCPUtils.isDataprocRuntime()) return;
sparkContextOption.ifPresent(
context -> {
GCPJobFacetBuilder b1 = new GCPJobFacetBuilder(context);
GcpJobFacetBuilder b1 = new GcpJobFacetBuilder(context);
b1.accept(event, b0::put);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import java.util.LinkedHashMap;
import java.util.Map;

public class GCPDataprocRunFacet implements OpenLineage.RunFacet {
public class GcpDataprocRunFacet implements OpenLineage.RunFacet {

private final URI _producer;
private final URI _schemaURL;
@JsonAnySetter private final Map<String, Object> additionalProperties;

public GCPDataprocRunFacet(Map<String, Object> properties) {
public GcpDataprocRunFacet(Map<String, Object> properties) {
this._producer = Versions.OPEN_LINEAGE_PRODUCER_URI;
this._schemaURL =
URI.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@
import java.util.LinkedHashMap;
import java.util.Map;

public class GCPCommonJobFacet implements OpenLineage.JobFacet {
public class GcpLineageJobFacet implements OpenLineage.JobFacet {

private final URI _producer;
private final URI _schemaURL;
private final Boolean _deleted;
@JsonAnySetter private final Map<String, Object> additionalProperties;

public GCPCommonJobFacet(Map<String, Object> properties) {
public GcpLineageJobFacet(Map<String, Object> properties) {
this(properties, null);
}

public GCPCommonJobFacet(Map<String, Object> properties, Boolean _deleted) {
public GcpLineageJobFacet(Map<String, Object> properties, Boolean _deleted) {
this._producer = Versions.OPEN_LINEAGE_PRODUCER_URI;
this._schemaURL =
URI.create(
"https://openlineage.io/spec/facets/1-0-0/GcpCommonJobFacet.json#/$defs/GcpCommonJobFacet");
"https://openlineage.io/spec/facets/1-0-0/GcpLineageJobFacet.json#/$defs/GcpLineageJobFacet");
this._deleted = _deleted;
this.additionalProperties = new LinkedHashMap<>(properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.openlineage.spark.agent.facets.builder;

import io.openlineage.client.OpenLineage.JobFacet;
import io.openlineage.spark.agent.facets.GCPCommonJobFacet;
import io.openlineage.spark.agent.facets.GcpLineageJobFacet;
import io.openlineage.spark.agent.util.GCPUtils;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
Expand All @@ -20,24 +20,24 @@
* {@link CustomFacetBuilder} responsible for generating GCP-specific job facets when using
* OpenLineage on Google Cloud Platform (GCP).
*/
public class GCPJobFacetBuilder extends CustomFacetBuilder<SparkListenerEvent, JobFacet> {
public class GcpJobFacetBuilder extends CustomFacetBuilder<SparkListenerEvent, JobFacet> {

private final SparkContext sparkContext;

public GCPJobFacetBuilder(OpenLineageContext openLineageContext) {
public GcpJobFacetBuilder(OpenLineageContext openLineageContext) {
this.sparkContext = openLineageContext.getSparkContext().get();
}

public GCPJobFacetBuilder(SparkContext sparkContext) {
public GcpJobFacetBuilder(SparkContext sparkContext) {
this.sparkContext = sparkContext;
}

@Override
protected void build(SparkListenerEvent event, BiConsumer<String, ? super JobFacet> consumer) {
consumer.accept("gcp_common", new GCPCommonJobFacet(getCommonAttributes()));
consumer.accept("gcp_lineage", new GcpLineageJobFacet(getGcpLineageAttributes()));
}

private Map<String, Object> getCommonAttributes() {
private Map<String, Object> getGcpLineageAttributes() {

Map<String, Object> commonProperties = new HashMap<>();
commonProperties.put("origin", GCPUtils.getOriginFacetMap(sparkContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.openlineage.spark.agent.facets.builder;

import io.openlineage.client.OpenLineage.RunFacet;
import io.openlineage.spark.agent.facets.GCPDataprocRunFacet;
import io.openlineage.spark.agent.facets.GcpDataprocRunFacet;
import io.openlineage.spark.agent.util.GCPUtils;
import io.openlineage.spark.api.CustomFacetBuilder;
import io.openlineage.spark.api.OpenLineageContext;
Expand All @@ -20,25 +20,25 @@
* {@link CustomFacetBuilder} responsible for generating GCP-specific run facets when using
* OpenLineage on Google Cloud Platform (GCP).
*/
public class GCPRunFacetBuilder extends CustomFacetBuilder<SparkListenerEvent, RunFacet> {
public class GcpRunFacetBuilder extends CustomFacetBuilder<SparkListenerEvent, RunFacet> {

private final SparkContext sparkContext;
private final Optional<OpenLineageContext> maybeOLContext;

public GCPRunFacetBuilder(OpenLineageContext openLineageContext) {
public GcpRunFacetBuilder(OpenLineageContext openLineageContext) {
this.sparkContext = openLineageContext.getSparkContext().get();
this.maybeOLContext = Optional.of(openLineageContext);
}

public GCPRunFacetBuilder(SparkContext sparkContext) {
public GcpRunFacetBuilder(SparkContext sparkContext) {
this.sparkContext = sparkContext;
this.maybeOLContext = Optional.empty();
}

@Override
protected void build(SparkListenerEvent event, BiConsumer<String, ? super RunFacet> consumer) {
if (GCPUtils.isDataprocRuntime())
consumer.accept("gcp_dataproc", new GCPDataprocRunFacet(getDataprocAttributes()));
consumer.accept("gcp_dataproc_spark", new GcpDataprocRunFacet(getDataprocAttributes()));
}

private Map<String, Object> getDataprocAttributes() {
Expand Down
10 changes: 5 additions & 5 deletions proposals/2161/registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ OpenLineage
Producer:
Producer root doc URL: https://…
Produced facets:
{ facets: [ {“URI}”, “{URI}”, … ]}
{ produced_facets: [ {“URI}”, “{URI}”, … ]}
Consumer:
Consumer root doc URL: https://…
Consumed facets:
{ facets: [ “{URI}”, “{URI}”, … ]}
{ consumed_facets: [ “{URI}”, “{URI}”, … ]}
/facets/ <- where custom facet schemas are stored
/facets/examples/{FacetName}/{number}.json <- where facet examples are stored
```
Expand Down Expand Up @@ -127,7 +127,7 @@ core/
producer: {
root_doc_URL: "https://openlineage.io/spec/facets/",
sample_URL: "https://github.com/OpenLineage/OpenLineage/tree/main/spec/tests/",
facets: [
produced_facets: [
"ColumnLineageDatasetFacet.json": {
"owner": "core"
},
Expand All @@ -145,7 +145,7 @@ egeria/
producer: {
root_doc_URL: … ,
sample_URL: … ,
facets: [
produced_facets: [
"ColumnLineageDatasetFacet.json": {
"owner": "core"
},
Expand All @@ -156,7 +156,7 @@ egeria/
},
consumer: {
root_doc_URL: …
facets: [
consumed_facets: [
"NewCustomFacet.json": {
"owner": "egeria"
}
Expand Down
75 changes: 75 additions & 0 deletions spec/registry/gcp/dataproc/facets/GcpDataprocSparkRunFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/GcpDataprocSparkRunFacet.json",
"$defs": {
"GcpDataprocSparkRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"appId": {
"description": "Application ID set in the spark configuration of the current context. Its format depends on the resource manager.",
"type": "string"
},
"appName": {
"description": "App name set in the spark configuration of the current context. It may be provided by the user.",
"type": "string"
},
"batchId": {
"description": "Populated only for Dataproc serverless batches. The resource id of the batch.",
"type": "string"
},
"batchUuid": {
"description": "Populated only for Dataproc serverless batches. A UUID generated by the service when it creates the batch.",
"type": "string"
},
"clusterName": {
"description": "Populated only for Dataproc GCE workloads. The cluster name is unique within a GCP project.",
"type": "string"
},
"clusterUuid": {
"description": "Populated only for Dataproc GCE workloads. A UUID generated by the service at the time of cluster creation.",
"type": "string"
},
"jobId": {
"description": "Populated only for Dataproc GCE workloads. If not specified by the user, the job ID will be provided by the service.",
"type": "string"
},
"jobUuid": {
"description": "Populated only for Dataproc GCE workloads. A UUID that uniquely identifies a job within the project over time.",
"type": "string"
},
"projectId": {
"description": "The GCP project ID that the resource belongs to.",
"type": "string"
},
"queryNodeName": {
"description": "The name of the query node in the executed Spark Plan. Often used to describe the command being executed.",
"type": "string"
},
"sessionId": {
"description": "Populated only for Dataproc serverless interactive sessions. The resource id of the session, used for URL generation.",
"type": "string"
},
"sessionUuid": {
"description": "Populated only for Dataproc serverless interactive sessions. A UUID generated by the service when it creates the session.",
"type": "string"
}
},
"required": ["appId", "appName", "projectId"],
"additionalProperties": true
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"gcp_dataproc_spark": {
"$ref": "#/$defs/GcpDataprocSparkRunFacet"
}
}
}
6 changes: 6 additions & 0 deletions spec/registry/gcp/dataproc/registry.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"producer": {
"root_doc_URL": "https://cloud.google.com/dataproc/docs/guides/lineage",
"produced_facets": ["ol:gcp:dataproc:GcpDataprocSparkRunFacet.json"]
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/GcpCommonJobFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-0/GcpLineageJobFacet.json",
"$defs": {
"GcpCommonJobFacet": {
"GcpLineageJobFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/JobFacet"
Expand Down Expand Up @@ -36,8 +36,8 @@
},
"type": "object",
"properties": {
"gcp_common": {
"$ref": "#/$defs/GcpCommonJobFacet"
"gcp_lineage": {
"$ref": "#/$defs/GcpLineageJobFacet"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"consumer": {
"root_doc_URL": "https://cloud.google.com/data-catalog/docs/reference/data-lineage/rpc/google.cloud.datacatalog.lineage.v1#google.cloud.datacatalog.lineage.v1.Lineage.ProcessOpenLineageRunEvent",
"produced_facets": ["ol:gcp:GcpCommonJobFacet.json"]
"consumed_facets": ["ol:gcp:lineage:GcpLineageJobFacet.json", "ol:gcp:dataproc:GcpDataprocSparkRunFacet.json"]
}
}

0 comments on commit 1a64c0e

Please sign in to comment.