Skip to content

Commit

Permalink
Refactor to have a common interface for InUpdate Callback (#451)
Browse files Browse the repository at this point in the history
* Refactoring to InUpdate

* Update BaseAspectRoutingResource

* Fixed errors

* Addressed comments

* Addressed comments

* Removed preupdate

* Removed pre-update name

* Missed updating a method name

* Comment back

* Addressed comments

* Added java doc

* Added comments and fixed method names

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Oct 17, 2024
1 parent 9f93435 commit cf1e5ee
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 280 deletions.
123 changes: 89 additions & 34 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import com.linkedin.metadata.dao.equality.DefaultEqualityTester;
import com.linkedin.metadata.dao.equality.EqualityTester;
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.AspectCallbackResponse;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse;
import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry;
import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -160,6 +159,13 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
}
}

@Data
@AllArgsConstructor
protected static class AspectUpdateResult<ASPECT extends RecordTemplate> {
private ASPECT updatedAspect;
private boolean skipProcessing;
}

private static final String DEFAULT_ID_NAMESPACE = "global";

private static final String BACKFILL_EMITTER = "dao_backfill_endpoint";
Expand All @@ -183,7 +189,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private PreUpdateAspectRegistry _preUpdateAspectRegistry = null;
private AspectCallbackRegistry _aspectCallbackRegistry = null;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -216,6 +222,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {

private Clock _clock = Clock.systemUTC();


/**
* Constructor for BaseLocalDAO.
*
Expand Down Expand Up @@ -314,6 +321,7 @@ public void setClock(@Nonnull Clock clock) {
_clock = clock;
}


/**
* Sets {@link Retention} for a specific aspect type.
*/
Expand Down Expand Up @@ -401,21 +409,20 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
}

/**
* Set pre ingestion aspect registry.
* Set aspect callback registry.
*/
public void setPreUpdateAspectRegistry(
@Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) {
_preUpdateAspectRegistry = preUpdateAspectRegistry;
public void setAspectCallbackRegistry(
@Nullable AspectCallbackRegistry aspectCallbackRegistry) {
_aspectCallbackRegistry = aspectCallbackRegistry;
}

/**
* Get pre ingestion aspect registry.
* Get aspect callback registry.
*/
public PreUpdateAspectRegistry getPreUpdateAspectRegistry() {
return _preUpdateAspectRegistry;
public AspectCallbackRegistry getAspectCallbackRegistry() {
return _aspectCallbackRegistry;
}


/**
* Enables or disables atomic updates of multiple aspects.
*/
Expand Down Expand Up @@ -611,6 +618,11 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn, @Nonnull List<? extends Reco

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
return aspectUpdateHelper(urn, updateTuple, auditStamp, trackingContext, false);
}

private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN urn, AspectUpdateLambda<ASPECT> updateTuple,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) {
AspectEntry<ASPECT> latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode());

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
Expand All @@ -629,6 +641,15 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
if (_lambdaFunctionRegistry != null && _lambdaFunctionRegistry.isRegistered(updateTuple.getAspectClass())) {
newValue = updatePreIngestionLambdas(urn, oldValue, newValue);
}
// this will skip the pre/in update callbacks
if (!isRawUpdate) {
AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue);
newValue = (ASPECT) result.getUpdatedAspect();
// skip the normal ingestion to the DAO
if (result.isSkipProcessing()) {
return null;
}
}

checkValidAspect(newValue.getClass());

Expand Down Expand Up @@ -738,6 +759,18 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext);
}

/**
* Same as above {@link #add(Urn, Class, Function, AuditStamp, int, IngestionTrackingContext, IngestionParams)} but to skip aspect callback routing.
* DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext, true);
}

/**
* Adds a new version of an aspect for an entity.
*
Expand Down Expand Up @@ -766,13 +799,22 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
return add(urn, updateLambda, auditStamp, maxTransactionRetry, trackingContext, false);
}

/**
* Same as above {@link #add(Urn, AspectUpdateLambda, AuditStamp, int, IngestionTrackingContext)} but with a flag to skip aspect callback routing.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdateLambda<ASPECT> updateLambda,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) {
final Class<ASPECT> aspectClass = updateLambda.getAspectClass();
checkValidAspect(aspectClass);

// default test mode is false being set in
// {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}}
final AddResult<ASPECT> result =
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext),
runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext, isRawUpdate),
maxTransactionRetry);

// skip MAE producing and post update hook in test mode
Expand Down Expand Up @@ -835,6 +877,18 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull Cla
return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
}

/**
* Same as above {@link #add(Urn, Class, Function, AuditStamp, IngestionTrackingContext, IngestionParams)} but skips any aspect callbacks.
* DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull Function<Optional<ASPECT>, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {
return rawAdd(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams);
}

/**
* Similar to {@link #add(Urn, Class, Function, AuditStamp)} but takes the new value directly.
*/
Expand All @@ -847,20 +901,22 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional)} as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext,
@Nullable IngestionParams ingestionParams) {
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams);
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)} but
* skips any pre-update lambdas. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* skips any aspect callbacks. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM.
* Please use the regular add method linked above.
*/
@Nonnull
Expand All @@ -870,7 +926,7 @@ public <ASPECT extends RecordTemplate> ASPECT rawAdd(@Nonnull URN urn, @Nonnull
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
return rawAdd(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
Expand Down Expand Up @@ -1658,22 +1714,21 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* This method routes the aspect updates to the appropriate aspect callback clients and get the updated aspect as response.
* @param urn the urn of the asset
* @param newAspect the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newAspect) {
if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered(
newAspect.getClass())) {
PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass());
PreUpdateRoutingClient client =
preUpdateRoutingAccessor.getPreUpdateClient();
PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect);
ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect();
log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return (ASPECT) updatedAspect;
* @param newAspectValue the new aspect value
* @param oldAspectValue the old aspect value
* @return AspectUpdateResult which contains updated aspect value
*/
protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, Optional<ASPECT> oldAspectValue) {
if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered(
newAspectValue.getClass())) {
AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass());
AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue);
ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect();
log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return new AspectUpdateResult(updatedAspect, client.isSkipProcessing());
}
return newAspect;
return new AspectUpdateResult(newAspectValue, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


/**
* A registry which maintains mapping of aspects and their Aspect Routing Client.
*/
@Slf4j
public class AspectCallbackRegistry {

private final Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap;

/**
* Constructor to register aspect callback routing clients for aspects.
* @param aspectCallbackMap map containing aspect classes and their corresponding cleints
*/
public AspectCallbackRegistry(@Nonnull Map<Class<? extends RecordTemplate>, AspectCallbackRoutingClient> aspectCallbackMap) {
this.aspectCallbackMap = new HashMap<>(aspectCallbackMap);
log.info("Registered aspect callback clients for aspects: {}", aspectCallbackMap.keySet());
}

/**
* Get Aspect Callback Routing Client for an aspect class.
* @param aspectClass the class of the aspect to retrieve the client
* @return AspectCallbackRoutingClient for the given aspect class, or null if not found
*/
public <ASPECT extends RecordTemplate> AspectCallbackRoutingClient getAspectCallbackRoutingClient(
@Nonnull Class<ASPECT> aspectClass) {
return aspectCallbackMap.get(aspectClass);
}

/**
* Check if Aspect Callback Routing Client is registered for an aspect.
*/
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass) {
return aspectCallbackMap.containsKey(aspectClass);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import lombok.Data;

/**
* Response of in-update process that includes the updated aspect. It can be extended to include additional information.
*/
@Data
public class AspectCallbackResponse<ASPECT extends RecordTemplate> {
private final ASPECT updatedAspect;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import java.util.Optional;


/**
* An interface that defines the client for aspect callback routing.
*/
public interface AspectCallbackRoutingClient<ASPECT extends RecordTemplate> {
/**
* A method that routes the updates request to the appropriate custom API.
* @param urn the urn of the asset
* @param newAspectValue the aspect to be updated
* @param existingAspectValue the existing aspect value
* @return AspectCallbackResponse containing the updated aspect
*/
AspectCallbackResponse<ASPECT> routeAspectCallback(Urn urn, ASPECT newAspectValue, Optional<ASPECT> existingAspectValue);

/**
* A method that returns whether to skip processing further ingestion.
* @return true if the ingestion should be skipped, false otherwise
*/
default boolean isSkipProcessing() {
return false;
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit cf1e5ee

Please sign in to comment.