diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index d5016b644..5fe1a3463 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -22,12 +22,11 @@ import com.linkedin.metadata.dao.equality.DefaultEqualityTester; import com.linkedin.metadata.dao.equality.EqualityTester; import com.linkedin.metadata.dao.exception.ModelValidationException; +import com.linkedin.metadata.dao.ingestion.AspectCallbackResponse; import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction; import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.IndefiniteRetention; @@ -160,6 +159,13 @@ public static class AspectUpdateLambda { } } + @Data + @AllArgsConstructor + protected static class AspectUpdateResult { + private ASPECT updatedAspect; + private boolean skipProcessing; + } + private static final String DEFAULT_ID_NAMESPACE = "global"; private static final String BACKFILL_EMITTER = "dao_backfill_endpoint"; @@ -183,7 +189,7 @@ public static class AspectUpdateLambda { protected UrnPathExtractor _urnPathExtractor; private LambdaFunctionRegistry _lambdaFunctionRegistry; - private PreUpdateAspectRegistry _preUpdateAspectRegistry = null; + private AspectCallbackRegistry _aspectCallbackRegistry = null; // Maps an aspect class to the corresponding retention policy private final Map, Retention> _aspectRetentionMap = new HashMap<>(); @@ -216,6 +222,7 @@ public static class AspectUpdateLambda { private Clock _clock = Clock.systemUTC(); + /** * Constructor for BaseLocalDAO. * @@ -314,6 +321,7 @@ public void setClock(@Nonnull Clock clock) { _clock = clock; } + /** * Sets {@link Retention} for a specific aspect type. */ @@ -401,21 +409,20 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun } /** - * Set pre ingestion aspect registry. + * Set aspect callback registry. */ - public void setPreUpdateAspectRegistry( - @Nullable PreUpdateAspectRegistry preUpdateAspectRegistry) { - _preUpdateAspectRegistry = preUpdateAspectRegistry; + public void setAspectCallbackRegistry( + @Nullable AspectCallbackRegistry aspectCallbackRegistry) { + _aspectCallbackRegistry = aspectCallbackRegistry; } /** - * Get pre ingestion aspect registry. + * Get aspect callback registry. */ - public PreUpdateAspectRegistry getPreUpdateAspectRegistry() { - return _preUpdateAspectRegistry; + public AspectCallbackRegistry getAspectCallbackRegistry() { + return _aspectCallbackRegistry; } - /** * Enables or disables atomic updates of multiple aspects. */ @@ -611,6 +618,11 @@ public List addMany(@Nonnull URN urn, @Nonnull List AddResult aspectUpdateHelper(URN urn, AspectUpdateLambda updateTuple, @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) { + return aspectUpdateHelper(urn, updateTuple, auditStamp, trackingContext, false); + } + + private AddResult aspectUpdateHelper(URN urn, AspectUpdateLambda updateTuple, + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) { AspectEntry latest = getLatest(urn, updateTuple.getAspectClass(), updateTuple.getIngestionParams().isTestMode()); // TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards @@ -629,6 +641,15 @@ private AddResult aspectUpdateHelper(URN if (_lambdaFunctionRegistry != null && _lambdaFunctionRegistry.isRegistered(updateTuple.getAspectClass())) { newValue = updatePreIngestionLambdas(urn, oldValue, newValue); } + // this will skip the pre/in update callbacks + if (!isRawUpdate) { + AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue); + newValue = (ASPECT) result.getUpdatedAspect(); + // skip the normal ingestion to the DAO + if (result.isSkipProcessing()) { + return null; + } + } checkValidAspect(newValue.getClass()); @@ -738,6 +759,18 @@ public ASPECT add(@Nonnull URN urn, @Nonnull Cla return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext); } + /** + * Same as above {@link #add(Urn, Class, Function, AuditStamp, int, IngestionTrackingContext, IngestionParams)} but to skip aspect callback routing. + * DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM. + * Please use the regular add method linked above. + */ + @Nonnull + public ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class aspectClass, + @Nonnull Function, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp, + int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) { + return add(urn, new AspectUpdateLambda<>(aspectClass, updateLambda, ingestionParams), auditStamp, maxTransactionRetry, trackingContext, true); + } + /** * Adds a new version of an aspect for an entity. * @@ -766,13 +799,22 @@ public ASPECT add(@Nonnull URN urn, AspectUpdate @Nonnull public ASPECT add(@Nonnull URN urn, AspectUpdateLambda updateLambda, @Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) { + return add(urn, updateLambda, auditStamp, maxTransactionRetry, trackingContext, false); + } + + /** + * Same as above {@link #add(Urn, AspectUpdateLambda, AuditStamp, int, IngestionTrackingContext)} but with a flag to skip aspect callback routing. + */ + @Nonnull + public ASPECT add(@Nonnull URN urn, AspectUpdateLambda updateLambda, + @Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext, boolean isRawUpdate) { final Class aspectClass = updateLambda.getAspectClass(); checkValidAspect(aspectClass); // default test mode is false being set in // {@link #rawAdd(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)}} final AddResult result = - runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext), + runInTransactionWithRetry(() -> aspectUpdateHelper(urn, updateLambda, auditStamp, trackingContext, isRawUpdate), maxTransactionRetry); // skip MAE producing and post update hook in test mode @@ -835,6 +877,18 @@ public ASPECT add(@Nonnull URN urn, @Nonnull Cla return add(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams); } + /** + * Same as above {@link #add(Urn, Class, Function, AuditStamp, IngestionTrackingContext, IngestionParams)} but skips any aspect callbacks. + * DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM. + * Please use the regular add method linked above. + */ + @Nonnull + public ASPECT rawAdd(@Nonnull URN urn, @Nonnull Class aspectClass, + @Nonnull Function, ASPECT> updateLambda, @Nonnull AuditStamp auditStamp, + @Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) { + return rawAdd(urn, aspectClass, updateLambda, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext, ingestionParams); + } + /** * Similar to {@link #add(Urn, Class, Function, AuditStamp)} but takes the new value directly. */ @@ -847,20 +901,22 @@ public ASPECT add(@Nonnull URN urn, @Nonnull ASP /** * Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context. - * Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well + * Note: If you update the lambda function (ignored - newValue), make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional)} as well * to avoid any inconsistency between the lambda function and the add method. */ @Nonnull public ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue, @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext, @Nullable IngestionParams ingestionParams) { - ASPECT updatedAspect = preUpdateRouting(urn, newValue); - return rawAdd(urn, updatedAspect, auditStamp, trackingContext, ingestionParams); + final IngestionParams nonNullIngestionParams = + ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode( + IngestionMode.LIVE).setTestMode(false) : ingestionParams; + return add(urn, (Class) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams); } /** * Same as above {@link #add(Urn, RecordTemplate, AuditStamp, IngestionTrackingContext, IngestionParams)} but - * skips any pre-update lambdas. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM. + * skips any aspect callbacks. DO NOT USE THIS METHOD WITHOUT EXPLICIT PERMISSION FROM THE METADATA GRAPH TEAM. * Please use the regular add method linked above. */ @Nonnull @@ -870,7 +926,7 @@ public ASPECT rawAdd(@Nonnull URN urn, @Nonnull final IngestionParams nonNullIngestionParams = ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode( IngestionMode.LIVE).setTestMode(false) : ingestionParams; - return add(urn, (Class) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams); + return rawAdd(urn, (Class) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams); } /** @@ -1658,22 +1714,21 @@ protected ASPECT updatePreIngestionLambdas(@Nonn } /** - * This method routes the update request to the appropriate custom API for pre-ingestion processing. + * This method routes the aspect updates to the appropriate aspect callback clients and get the updated aspect as response. * @param urn the urn of the asset - * @param newAspect the new aspect value - * @return the updated aspect - */ - protected ASPECT preUpdateRouting(URN urn, ASPECT newAspect) { - if (_preUpdateAspectRegistry != null && _preUpdateAspectRegistry.isRegistered( - newAspect.getClass())) { - PreUpdateRoutingAccessor preUpdateRoutingAccessor = _preUpdateAspectRegistry.getPreUpdateRoutingAccessor(newAspect.getClass()); - PreUpdateRoutingClient client = - preUpdateRoutingAccessor.getPreUpdateClient(); - PreUpdateResponse preUpdateResponse = client.preUpdate(urn, newAspect); - ASPECT updatedAspect = (ASPECT) preUpdateResponse.getUpdatedAspect(); - log.info("PreUpdateRouting completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect); - return (ASPECT) updatedAspect; + * @param newAspectValue the new aspect value + * @param oldAspectValue the old aspect value + * @return AspectUpdateResult which contains updated aspect value + */ + protected AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, Optional oldAspectValue) { + if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered( + newAspectValue.getClass())) { + AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass()); + AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue); + ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect(); + log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect); + return new AspectUpdateResult(updatedAspect, client.isSkipProcessing()); } - return newAspect; + return new AspectUpdateResult(newAspectValue, false); } } diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRegistry.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRegistry.java new file mode 100644 index 000000000..268cebaed --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRegistry.java @@ -0,0 +1,44 @@ +package com.linkedin.metadata.dao.ingestion; + +import com.linkedin.data.template.RecordTemplate; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; + + +/** + * A registry which maintains mapping of aspects and their Aspect Routing Client. + */ +@Slf4j +public class AspectCallbackRegistry { + + private final Map, AspectCallbackRoutingClient> aspectCallbackMap; + + /** + * Constructor to register aspect callback routing clients for aspects. + * @param aspectCallbackMap map containing aspect classes and their corresponding cleints + */ + public AspectCallbackRegistry(@Nonnull Map, AspectCallbackRoutingClient> aspectCallbackMap) { + this.aspectCallbackMap = new HashMap<>(aspectCallbackMap); + log.info("Registered aspect callback clients for aspects: {}", aspectCallbackMap.keySet()); + } + + /** + * Get Aspect Callback Routing Client for an aspect class. + * @param aspectClass the class of the aspect to retrieve the client + * @return AspectCallbackRoutingClient for the given aspect class, or null if not found + */ + public AspectCallbackRoutingClient getAspectCallbackRoutingClient( + @Nonnull Class aspectClass) { + return aspectCallbackMap.get(aspectClass); + } + + /** + * Check if Aspect Callback Routing Client is registered for an aspect. + */ + public boolean isRegistered(@Nonnull final Class aspectClass) { + return aspectCallbackMap.containsKey(aspectClass); + } + +} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponse.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponse.java new file mode 100644 index 000000000..dafd519a3 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponse.java @@ -0,0 +1,12 @@ +package com.linkedin.metadata.dao.ingestion; + +import com.linkedin.data.template.RecordTemplate; +import lombok.Data; + +/** + * Response of in-update process that includes the updated aspect. It can be extended to include additional information. + */ +@Data +public class AspectCallbackResponse { + private final ASPECT updatedAspect; +} diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java new file mode 100644 index 000000000..ba0cc9a08 --- /dev/null +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/AspectCallbackRoutingClient.java @@ -0,0 +1,28 @@ +package com.linkedin.metadata.dao.ingestion; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import java.util.Optional; + + +/** + * An interface that defines the client for aspect callback routing. +*/ +public interface AspectCallbackRoutingClient { + /** + * A method that routes the updates request to the appropriate custom API. + * @param urn the urn of the asset + * @param newAspectValue the aspect to be updated + * @param existingAspectValue the existing aspect value + * @return AspectCallbackResponse containing the updated aspect + */ + AspectCallbackResponse routeAspectCallback(Urn urn, ASPECT newAspectValue, Optional existingAspectValue); + + /** + * A method that returns whether to skip processing further ingestion. + * @return true if the ingestion should be skipped, false otherwise + */ + default boolean isSkipProcessing() { + return false; + } +} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java deleted file mode 100644 index ef39b5df9..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateAspectRegistry.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; - -import com.linkedin.data.template.RecordTemplate; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nonnull; -import lombok.extern.slf4j.Slf4j; - - -/** - * A registry which maintains mapping of aspects and their PreUpdateRoutingClient. - */ -@Slf4j -public class PreUpdateAspectRegistry { - - private final Map, PreUpdateRoutingAccessor> _preUpdateLambdaMap; - - /** - * Constructor to register pre-update routing accessors for multiple aspects at once. - * @param preUpdateMap map containing aspect classes and their corresponding accessors - */ - public PreUpdateAspectRegistry(@Nonnull Map, PreUpdateRoutingAccessor> preUpdateMap) { - _preUpdateLambdaMap = new HashMap<>(preUpdateMap); - log.info("Registered pre-update routing accessors for aspects: {}", _preUpdateLambdaMap.keySet()); - } - - /** - * Get Pre Update Routing Accessor for an aspect class. - * @param aspectClass the class of the aspect to retrieve the accessor for - * @return PreUpdateRoutingAccessor for the given aspect class, or null if not found - */ - public PreUpdateRoutingAccessor getPreUpdateRoutingAccessor( - @Nonnull Class aspectClass) { - return _preUpdateLambdaMap.get(aspectClass); - } - - /** - * Check if Pre Update Routing Accessor is registered for an aspect. - */ - public boolean isRegistered(@Nonnull final Class aspectClass) { - return _preUpdateLambdaMap.containsKey(aspectClass); - } - -} \ No newline at end of file diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java deleted file mode 100644 index b77d12a45..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponse.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; - -import com.linkedin.data.template.RecordTemplate; -import lombok.Data; - -/** - * Response of pre-update process that includes the updated aspect. - */ -@Data -public class PreUpdateResponse { - private final ASPECT updatedAspect; -} diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java deleted file mode 100644 index 2fa6fda58..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessor.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; - -import com.linkedin.data.template.RecordTemplate; -import lombok.Data; - - -@Data -public class PreUpdateRoutingAccessor { - - public PreUpdateRoutingClient preUpdateClient; - - public enum RoutingAction { - PROCEED, SKIP - } -} diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java b/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java deleted file mode 100644 index 90441798f..000000000 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingClient.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; - -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; - - -/** - * An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process. - */ - -public interface PreUpdateRoutingClient { - /** - * A method that routes the update request to the appropriate custom API. - * @param urn the urn of the asset - * @param aspect the aspect to be updated - * @return the updated aspect - */ - PreUpdateResponse preUpdate(Urn urn, ASPECT aspect); -} \ No newline at end of file diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index d23db0b2f..a8e371374 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -5,10 +5,10 @@ import com.linkedin.data.template.SetMode; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient; +import com.linkedin.metadata.dao.ingestion.SampleAspectCallbackRoutingClient; import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl; -import com.linkedin.metadata.dao.ingestion.SamplePreUpdateRoutingClient; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; import com.linkedin.metadata.dao.retention.TimeBasedRetention; @@ -656,40 +656,32 @@ public void testPreIngestionLambda() throws URISyntaxException { } @Test - public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException { + public void testAspectCallbackHelperFromFooToBar() throws URISyntaxException { // Setup test data FooUrn urn = new FooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); + Map, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>(); + aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); - - PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); - _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); - - AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo); - assertEquals(result, bar); + AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap); + _dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry); + BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null); + AspectFoo newAspect = (AspectFoo) result.getUpdatedAspect(); + assertEquals(newAspect, bar); } @Test - public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException { + public void testMAEEmissionForAspectCallbackHelper() throws URISyntaxException { FooUrn urn = new FooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); AspectFoo bar = new AspectFoo().setValue("bar"); _dummyLocalDAO.setAlwaysEmitAuditEvent(true); - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); - - PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); - - _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); + Map, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>(); + aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); + AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap); + _dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry); expectGetLatest(urn, AspectFoo.class, Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo, _dummyAuditStamp))); @@ -701,25 +693,21 @@ public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException { } @Test - public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxException { + public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxException { // Setup test data FooUrn urn = new FooUrn(1); AspectBar foo = new AspectBar().setValue("foo"); // Inject RestliPreIngestionAspectRegistry with no registered aspect - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); - - PreUpdateAspectRegistry preUpdateAspectRegistry = new PreUpdateAspectRegistry(preUpdateMap); - _dummyLocalDAO.setPreUpdateAspectRegistry(preUpdateAspectRegistry); + Map, AspectCallbackRoutingClient> aspectCallbackMap = new HashMap<>(); + aspectCallbackMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); + AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap); + _dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry); // Call the add method - AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo); + BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null); // Verify that the result is the same as the input aspect since it's not registered - assertEquals(result, foo); + assertEquals(result.getUpdatedAspect(), foo); } } diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponseTest.java similarity index 63% rename from dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java rename to dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponseTest.java index 74c7195ff..06cad5800 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateResponseTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/AspectCallbackResponseTest.java @@ -1,4 +1,4 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; +package com.linkedin.metadata.dao.ingestion; import com.linkedin.data.template.RecordTemplate; import org.testng.annotations.Test; @@ -7,15 +7,15 @@ import static org.testng.AssertJUnit.*; -public class PreUpdateResponseTest { +public class AspectCallbackResponseTest { @Test public void testConstructorAndGetter() { // Create a mock instance of RecordTemplate RecordTemplate mockAspect = mock(RecordTemplate.class); - // Create an instance of PreUpdateResponse with the mock aspect - PreUpdateResponse response = new PreUpdateResponse<>(mockAspect); + // Create an instance of AspectCallbackResponse with the mock aspect + AspectCallbackResponse response = new AspectCallbackResponse<>(mockAspect); // Verify that the getter returns the correct value assertEquals(mockAspect, response.getUpdatedAspect()); diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SampleAspectCallbackRoutingClient.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SampleAspectCallbackRoutingClient.java new file mode 100644 index 000000000..cd68511ee --- /dev/null +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SampleAspectCallbackRoutingClient.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.dao.ingestion; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.testing.AspectFoo; +import java.util.Optional; + + +public class SampleAspectCallbackRoutingClient implements AspectCallbackRoutingClient { + @Override + public AspectCallbackResponse routeAspectCallback(Urn urn, RecordTemplate newAspectValue, Optional existingAspectValue) { + AspectFoo aspectFoo = (AspectFoo) newAspectValue; + aspectFoo.setValue("bar"); + return new AspectCallbackResponse(aspectFoo); + } +} diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java deleted file mode 100644 index af5973f8b..000000000 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/SamplePreUpdateRoutingClient.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.linkedin.metadata.dao.ingestion; - -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; -import com.linkedin.testing.AspectFoo; - - -public class SamplePreUpdateRoutingClient implements PreUpdateRoutingClient { - - @Override - public PreUpdateResponse preUpdate(Urn urn, RecordTemplate recordTemplate) { - AspectFoo aspectFoo = (AspectFoo) recordTemplate; - aspectFoo.setValue("bar"); - return new PreUpdateResponse(aspectFoo); - } -} diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java deleted file mode 100644 index 584be50a3..000000000 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/ingestion/preupdate/PreUpdateRoutingAccessorTest.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.linkedin.metadata.dao.ingestion.preupdate; - -import com.google.protobuf.Message; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import static org.testng.AssertJUnit.*; -import static org.mockito.Mockito.*; - - -public class PreUpdateRoutingAccessorTest { - private PreUpdateRoutingAccessor routingInfo; - private PreUpdateRoutingClient mockPreUpdateClient; - - @BeforeMethod - public void setUp() { - routingInfo = new PreUpdateRoutingAccessor(); - mockPreUpdateClient = mock(PreUpdateRoutingClient.class); - } - - @Test - public void testPreUpdateClientSetterAndGetter() { - routingInfo.setPreUpdateClient(mockPreUpdateClient); - assertEquals(mockPreUpdateClient, routingInfo.getPreUpdateClient()); - } - - @Test - public void testRoutingActionEnum() { - assertEquals("PROCEED", PreUpdateRoutingAccessor.RoutingAction.PROCEED.name()); - assertEquals("SKIP", PreUpdateRoutingAccessor.RoutingAction.SKIP.name()); - } -} diff --git a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java index 49633ad5a..43878b821 100644 --- a/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java +++ b/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseAspectRoutingResource.java @@ -7,10 +7,9 @@ import com.linkedin.data.template.StringArray; import com.linkedin.data.template.UnionTemplate; import com.linkedin.metadata.dao.AspectKey; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry; +import com.linkedin.metadata.dao.ingestion.AspectCallbackResponse; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.internal.IngestionParams; @@ -417,10 +416,10 @@ private void ingestAspect(Set> aspectsToIgnore, if (getAspectRoutingGmsClientManager().hasRegistered(aspect.getClass())) { try { // get the updated aspect if there is a preupdate routing lambda registered - PreUpdateAspectRegistry registry = getLocalDAO().getPreUpdateAspectRegistry(); + AspectCallbackRegistry registry = getLocalDAO().getAspectCallbackRegistry(); if (!skipExtraProcessing && registry != null && registry.isRegistered(aspect.getClass())) { log.info(String.format("Executing registered pre-update routing lambda for aspect class %s.", aspect.getClass())); - aspect = preUpdateRouting((URN) urn, aspect, registry); + aspect = aspectCallbackHelper((URN) urn, aspect, registry); log.info("PreUpdateRouting completed in ingestInternalAsset, urn: {}, updated aspect: {}", urn, aspect); // Get the fqcn of the aspect class String aspectFQCN = aspect.getClass().getCanonicalName(); @@ -688,12 +687,12 @@ private List getValueFromRoutingGms(@Nonnull URN urn, * This method routes the update request to the appropriate custom API for pre-ingestion processing. * @param urn the urn of the asset * @param aspect the new aspect value + * @param registry the aspect callback registry * @return the updated aspect */ - private RecordTemplate preUpdateRouting(URN urn, RecordTemplate aspect, PreUpdateAspectRegistry registry) { - PreUpdateRoutingAccessor preUpdateRoutingAccessor = registry.getPreUpdateRoutingAccessor(aspect.getClass()); - PreUpdateRoutingClient preUpdateClient = preUpdateRoutingAccessor.getPreUpdateClient(); - PreUpdateResponse preUpdateResponse = preUpdateClient.preUpdate(urn, aspect); - return preUpdateResponse.getUpdatedAspect(); + private RecordTemplate aspectCallbackHelper(URN urn, RecordTemplate aspect, AspectCallbackRegistry registry) { + AspectCallbackRoutingClient preUpdateClient = registry.getAspectCallbackRoutingClient(aspect.getClass()); + AspectCallbackResponse aspectCallbackResponse = preUpdateClient.routeAspectCallback(urn, aspect, null); + return aspectCallbackResponse.getUpdatedAspect(); } } diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java index c082c61a4..7b0cc09a3 100644 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/BaseAspectRoutingResourceTest.java @@ -8,12 +8,12 @@ import com.linkedin.metadata.dao.BaseBrowseDAO; import com.linkedin.metadata.dao.BaseLocalDAO; import com.linkedin.metadata.dao.BaseSearchDAO; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingAccessor; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateAspectRegistry; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRegistry; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient; import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.events.IngestionTrackingContext; -import com.linkedin.metadata.restli.ingestion.SamplePreUpdateRoutingClient; +import com.linkedin.metadata.restli.ingestion.SampleAspectCallbackRoutingClient; import com.linkedin.parseq.BaseEngineTest; import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; @@ -308,7 +308,7 @@ public void testIngestWithRoutingAspect() { verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(null), eq(null)); verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectAttributeGmsClient, times(1)).ingest(eq(urn), eq(attributes)); - verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getAspectCallbackRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @@ -328,7 +328,7 @@ public void testIngestWithTrackingWithRoutingAspect() { verify(_mockLocalDAO, times(1)).add(eq(urn), eq(bar), any(), eq(trackingContext), eq(null)); verify(_mockAspectFooGmsClient, times(1)).ingestWithTracking(eq(urn), eq(foo), eq(trackingContext), eq(null)); verify(_mockAspectAttributeGmsClient, times(1)).ingestWithTracking(eq(urn), eq(attributes), eq(trackingContext), eq(null)); - verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getAspectCallbackRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); } @@ -356,7 +356,7 @@ public void testIngestWithOnlyRoutingAspect() { runAndWait(_resource.ingest(snapshot)); - verify(_mockLocalDAO, times(2)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(2)).getAspectCallbackRegistry(); verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); // verify(_mockGmsClient, times(1)).ingest(eq(urn), eq(foo)); verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); @@ -555,26 +555,23 @@ public void testBackfillWithNewValue() { } @Test - public void testPreUpdateRoutingWithRegisteredAspect() { + public void testAspectCallbackHelperWithRegisteredAspect() { FooUrn urn = makeFooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); + Map, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); + AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(preUpdateMap); - PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); - - when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); + when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(aspectCallbackRegistry); // given: ingest a snapshot containing a routed aspect which has a registered pre-update lambda. runAndWait(_resource.ingest(snapshot)); - verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getAspectCallbackRegistry(); // expected: the pre-update lambda is executed first (aspect value is changed from foo to foobar) and then the aspect is dual-written. AspectFoo foobar = new AspectFoo().setValue("foobar"); // dual write pt1: ensure the ingestion request is forwarded to the routed GMS. @@ -585,7 +582,7 @@ public void testPreUpdateRoutingWithRegisteredAspect() { } @Test - public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() { + public void testPreUpdateRoutingWithNonRegisteredInUpdateAspect() { FooUrn urn = makeFooUrn(1); AspectFoo foo = new AspectFoo().setValue("foo"); @@ -604,21 +601,17 @@ public void testPreUpdateRoutingWithNonRegisteredPreUpdateAspect() { } @Test - public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() { + public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredInUpdate() { FooUrn urn = makeFooUrn(1); AspectBar bar = new AspectBar().setValue("bar"); List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); - - PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); + Map, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); + AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(preUpdateMap); - when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); + when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(aspectCallbackRegistry); // given: ingest a snapshot which contains a non-routed aspect which has a registered pre-update lambda. runAndWait(_resource.ingest(snapshot)); @@ -632,7 +625,7 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndRegisteredPreUpdate() { } @Test - public void testPreUpdateRoutingWithNonRoutedAspectAndNonRegisteredPreUpdate() { + public void testPreUpdateRoutingWithNonRoutedAspectAndNonRegisteredInUpdate() { FooUrn urn = makeFooUrn(1); AspectBar bar = new AspectBar().setValue("bar"); List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, bar)); @@ -648,7 +641,7 @@ public void testPreUpdateRoutingWithNonRoutedAspectAndNonRegisteredPreUpdate() { } @Test - public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, IllegalAccessException { + public void testAspectCallbackHelperWithSkipIngestion() throws NoSuchFieldException, IllegalAccessException { // Access the SKIP_INGESTION_FOR_ASPECTS field Field skipIngestionField = BaseAspectRoutingResource.class.getDeclaredField("SKIP_INGESTION_FOR_ASPECTS"); skipIngestionField.setAccessible(true); @@ -665,24 +658,22 @@ public void testPreUpdateRoutingWithSkipIngestion() throws NoSuchFieldException, List aspects = Arrays.asList(ModelUtils.newAspectUnion(EntityAspectUnion.class, foo)); EntitySnapshot snapshot = ModelUtils.newSnapshot(EntitySnapshot.class, urn, aspects); - PreUpdateRoutingAccessor preUpdateRoutingAccessor = new PreUpdateRoutingAccessor(); - preUpdateRoutingAccessor.setPreUpdateClient(new SamplePreUpdateRoutingClient()); - Map, PreUpdateRoutingAccessor> preUpdateMap = new HashMap<>(); - preUpdateMap.put(AspectFoo.class, preUpdateRoutingAccessor); - PreUpdateAspectRegistry registry = new PreUpdateAspectRegistry(preUpdateMap); + Map, AspectCallbackRoutingClient> preUpdateMap = new HashMap<>(); + preUpdateMap.put(AspectFoo.class, new SampleAspectCallbackRoutingClient()); + AspectCallbackRegistry registry = new AspectCallbackRegistry(preUpdateMap); - when(_mockLocalDAO.getPreUpdateAspectRegistry()).thenReturn(registry); + when(_mockLocalDAO.getAspectCallbackRegistry()).thenReturn(registry); runAndWait(_resource.ingest(snapshot)); verify(_mockAspectFooGmsClient, times(0)).ingest(any(), any()); - verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getAspectCallbackRegistry(); // Should not add to local DAO verifyNoMoreInteractions(_mockLocalDAO); } //Testing the case when aspect has no pre lambda but skipIngestion contains the aspect, so it should not skip ingestion @Test - public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFieldException, IllegalAccessException { + public void testPreUpdateRoutingWithSkipIngestionNoInLambda() throws NoSuchFieldException, IllegalAccessException { Field skipIngestionField = BaseAspectRoutingResource.class.getDeclaredField("SKIP_INGESTION_FOR_ASPECTS"); skipIngestionField.setAccessible(true); Field modifiersField = Field.class.getDeclaredField("modifiers"); @@ -700,7 +691,7 @@ public void testPreUpdateRoutingWithSkipIngestionNoPreLambda() throws NoSuchFiel // Should not skip ingestion verify(_mockAspectFooGmsClient, times(1)).ingest(eq(urn), eq(foo)); // Should check for pre lambda - verify(_mockLocalDAO, times(1)).getPreUpdateAspectRegistry(); + verify(_mockLocalDAO, times(1)).getAspectCallbackRegistry(); // Should continue to dual-write into local DAO verify(_mockLocalDAO, times(1)).rawAdd(eq(urn), eq(foo), any(), any(), any()); verifyNoMoreInteractions(_mockLocalDAO); diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SampleAspectCallbackRoutingClient.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SampleAspectCallbackRoutingClient.java new file mode 100644 index 000000000..87afbca5d --- /dev/null +++ b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SampleAspectCallbackRoutingClient.java @@ -0,0 +1,22 @@ +package com.linkedin.metadata.restli.ingestion; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.dao.ingestion.AspectCallbackResponse; +import com.linkedin.metadata.dao.ingestion.AspectCallbackRoutingClient; +import com.linkedin.testing.AspectFoo; +import java.util.Optional; + + +public class SampleAspectCallbackRoutingClient implements AspectCallbackRoutingClient { + + @Override + public AspectCallbackResponse routeAspectCallback(Urn urn, RecordTemplate newAspectValue, Optional existingAspectValue) { + + // For testing, change the aspect value to "bar" + RecordTemplate updatedAspect = new AspectFoo().setValue("foobar"); + // Return a new AspectCallbackResponse with the updated aspect + return new AspectCallbackResponse<>(updatedAspect); + } + +} diff --git a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java b/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java deleted file mode 100644 index a0ca313fa..000000000 --- a/restli-resources/src/test/java/com/linkedin/metadata/restli/ingestion/SamplePreUpdateRoutingClient.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.linkedin.metadata.restli.ingestion; - -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateResponse; -import com.linkedin.metadata.dao.ingestion.preupdate.PreUpdateRoutingClient; -import com.linkedin.testing.AspectFoo; - - -public class SamplePreUpdateRoutingClient implements PreUpdateRoutingClient { - - @Override - public PreUpdateResponse preUpdate(Urn urn, RecordTemplate recordTemplate) { - - // For testing, change the aspect value to "bar" - RecordTemplate updatedAspect = new AspectFoo().setValue("foobar"); - // Return a new PreUpdateResponse with the updated aspect - return new PreUpdateResponse<>(updatedAspect); - } -}