Skip to content

Commit

Permalink
feat(dual-write): add new ingestSkipPreUpdates resource endpoints (#430)
Browse files Browse the repository at this point in the history
* feat(dual-write): add new ingestSkipPreIngestionUpdates resource endpoints

* refactor ingest* methods to address comments

* add javadocs for new method

* add javadocs for new method
 typo

* update method names to rawIngest and rawAdd

* update some comments
  • Loading branch information
jsdonn authored Sep 19, 2024
1 parent aabb3e9 commit 0d7727e
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return addSkipPreIngestionUpdates(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
}

/**
Expand All @@ -862,7 +862,7 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT addSkipPreIngestionUpdates(@Nonnull URN urn, @Nonnull ASPECT newValue,
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
final IngestionParams nonNullIngestionParams =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ private Task<BackfillResult> backfillWithNewValue(@ActionParam(PARAM_URNS) @Nonn
});
}

/**
* Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing
* GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write)
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
@Override
protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
Expand All @@ -308,46 +317,44 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternal, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternal for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error(
String.format("Couldn't ingest routing aspect %s for %s", aspect.getClass().getSimpleName(), urn),
exception);
}
} else {
getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams);
}
}
});
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false));
return null;
});
}

/**
* Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Route any aspects which
* have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect
* locally as well (i.e. dual write)
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
// TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused.
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true));
return null;
});
}

/**
* Internal ingest method for assets. First execute any pre-ingestion updates. Then, route any aspects which have a registered routing
* GMS client to the respective GMS for ingestion. Finally, continue to save the aspect locally as well (i.e. dual write)
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
@Override
protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
Expand All @@ -359,43 +366,90 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext trackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting(urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass())
.ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// since we already called any pre-update lambdas earlier, call a simple version of BaseLocalDAO::add
// which skips pre-update lambdas.
getLocalDAO().addSkipPreIngestionUpdates(urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception);
ModelUtils.getAspectsFromAsset(asset).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, false));
return null;
});
}

/**
* Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Route any aspects which
* have a registered routing GMS client to the respective GMS for ingestion. Finally, continue to save the aspect
* locally as well (i.e. dual write)
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternalAsset(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
@Nullable IngestionParams ingestionParams) {
// TODO: META-18950: add trackingContext to BaseAspectRoutingResource. currently the param is unused.
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromAsset(asset);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext trackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).forEach(aspect ->
ingestAspect(aspectsToIgnore, urn, aspect, trackingContext, ingestionParams, auditStamp, true));
return null;
});
}

/**
* Helper function to ingest an aspect either via routing or locally (or both). There is a flag that can be toggled
* to indicate whether to execute pre-, intra-, or post-ingestion updates if they exist.
* @param aspectsToIgnore set of aspect classes to ignore, if any
* @param urn urn associated with the aspect to ingest
* @param aspect aspect to ingest
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @param auditStamp audit information of the update
* @param skipExtraProcessing flag to indicate whether to execute pre-, intra-, or post-ingestion updates
*/
private void ingestAspect(Set<Class<? extends RecordTemplate>> aspectsToIgnore, Urn urn, RecordTemplate aspect,
IngestionTrackingContext trackingContext, IngestionParams ingestionParams, AuditStamp auditStamp,
boolean skipExtraProcessing) {
if (!aspectsToIgnore.contains(aspect.getClass())) {
if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) {
try {
// get the updated aspect if there is a preupdate routing lambda registered
RestliPreUpdateAspectRegistry registry = getLocalDAO().getRestliPreUpdateAspectRegistry();
if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) {
log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass()));
aspect = preUpdateRouting((URN) urn, aspect, registry);
log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect);
// Get the fqcn of the aspect class
String aspectFQCN = aspect.getClass().getCanonicalName();
//TODO: META-21112: Remove this check after adding annotations at model level; to handle SKIP/PROCEED for preUpdateRouting
if (SKIP_INGESTION_FOR_ASPECTS.contains(aspectFQCN)) {
log.info("Skip ingestion in ingestInternalAsset for urn: {}, aspectFQCN: {}", urn, aspectFQCN);
return;
}
}
if (trackingContext != null) {
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass())
.ingestWithTracking(urn, aspect, trackingContext, ingestionParams);
} else {
getLocalDAO().add(urn, aspect, auditStamp, trackingContext, ingestionParams);
getAspectRoutingGmsClientManager().getRoutingGmsClient(aspect.getClass()).ingest(urn, aspect);
}
// here, always call a simple version of BaseLocalDAO::add which skips pre-update lambdas regardless of
// the value of param skipExtraProcessing since any pre-update lambdas would have already been executed
// in the code above.
getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
} catch (Exception exception) {
log.error("Couldn't ingest routing aspect {} for {}", aspect.getClass().getSimpleName(), urn, exception);
}
});
return null;
});
} else {
if (skipExtraProcessing) {
// call a simple version of BaseLocalDAO::add which skips pre-update lambdas.
getLocalDAO().rawAdd((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
} else {
getLocalDAO().add((URN) urn, aspect, auditStamp, trackingContext, ingestionParams);
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,23 @@ public Task<Void> ingestWithTracking(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPS
}

/**
* An action method for automated ingestion pipeline.
* Deprecated to use {@link #rawIngestAsset(RecordTemplate, IngestionParams)} instead.
* Same as {@link #ingestWithTracking(RecordTemplate, IngestionTrackingContext, IngestionParams)} but skips any pre-ingestion updates.
* @param snapshot Snapshot of the metadata change to be ingested
* @param trackingContext {@link IngestionTrackingContext} to 1) track DAO-level metrics and 2) to pass on to MAE emission
* @return ingest task
*/
@Deprecated
@Action(name = ACTION_RAW_INGEST)
@Nonnull
public Task<Void> rawIngest(@ActionParam(PARAM_SNAPSHOT) @Nonnull SNAPSHOT snapshot,
@ActionParam(PARAM_TRACKING_CONTEXT) @Nonnull IngestionTrackingContext trackingContext,
@Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) {
return rawIngestInternal(snapshot, Collections.emptySet(), trackingContext, ingestionParams);
}

/**
* An action method for automated ingestion pipeline, also called high-level write.
* @param asset Asset of the metadata change to be ingested
* @return ingest task
*/
Expand All @@ -342,6 +358,26 @@ public Task<Void> ingestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset,
return ingestInternalAsset(asset, Collections.emptySet(), ingestionParams);
}

/**
* An action method for automated ingestion pipeline which skips any pre-ingestion updates, also called low-level write.
* @param asset Asset of the metadata change to be ingested
* @return ingest task
*/
@Action(name = ACTION_RAW_INGEST_ASSET)
@Nonnull
public Task<Void> rawIngestAsset(@ActionParam(PARAM_ASSET) @Nonnull ASSET asset,
@Optional @ActionParam(PARAM_INGESTION_PARAMS) IngestionParams ingestionParams) {
return rawIngestAssetInternal(asset, Collections.emptySet(), ingestionParams);
}

/**
* Internal ingest method for snapshots. First execute any pre-ingestion updates. Then, save the aspect locally.
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
Expand All @@ -358,6 +394,37 @@ protected Task<Void> ingestInternal(@Nonnull SNAPSHOT snapshot,
});
}

/**
* Raw internal ingest method for snapshots which skips any pre-, intra-, or post-processing. Save the aspect locally.
* @param snapshot snapshot to process
* @param aspectsToIgnore aspects to ignore
* @param trackingContext context for tracking ingestion health
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestInternal(@Nonnull SNAPSHOT snapshot,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromSnapshot(snapshot);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
ModelUtils.getAspectsFromSnapshot(snapshot).stream().forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
getLocalDAO().rawAdd(urn, aspect, auditStamp, trackingContext, ingestionParams);
}
});
return null;
});
}

/**
* Internal ingest method for assets. First execute any pre-ingestion updates. Then, save the aspect locally.
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
Expand All @@ -376,6 +443,31 @@ protected Task<Void> ingestInternalAsset(@Nonnull ASSET asset,
});
}

/**
* Raw internal ingest method for assets which skips any pre-, intra-, or post-processing. Save the aspect locally.
* @param asset asset to process
* @param aspectsToIgnore aspects to ignore
* @param ingestionParams optional ingestion parameters
* @return Restli Task for metadata ingestion
*/
@Nonnull
protected Task<Void> rawIngestAssetInternal(@Nonnull ASSET asset,
@Nonnull Set<Class<? extends RecordTemplate>> aspectsToIgnore,
@Nullable IngestionParams ingestionParams) {
return RestliUtils.toTask(() -> {
final URN urn = (URN) ModelUtils.getUrnFromAsset(asset);
final AuditStamp auditStamp = getAuditor().requestAuditStamp(getContext().getRawRequestContext());
IngestionTrackingContext ingestionTrackingContext =
ingestionParams != null ? ingestionParams.getIngestionTrackingContext() : null;
ModelUtils.getAspectsFromAsset(asset).stream().forEach(aspect -> {
if (!aspectsToIgnore.contains(aspect.getClass())) {
getLocalDAO().rawAdd(urn, aspect, auditStamp, ingestionTrackingContext, ingestionParams);
}
});
return null;
});
}

/**
* Deprecated to use {@link #getAsset(String, String[])} instead.
* An action method for getting a snapshot of aspects for an entity.
Expand Down
Loading

0 comments on commit 0d7727e

Please sign in to comment.