Skip to content

Commit

Permalink
add ingestion params into CallbackRoutingClientAPI (#453)
Browse files Browse the repository at this point in the history
Co-authored-by: Zihan Li <[email protected]>
  • Loading branch information
ZihanLi58 and Zihan Li authored Oct 21, 2024
1 parent cf1e5ee commit d4a2261
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN
}
// this will skip the pre/in update callbacks
if (!isRawUpdate) {
AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue);
AspectUpdateResult result = aspectCallbackHelper(urn, newValue, oldValue, updateTuple.getIngestionParams());
newValue = (ASPECT) result.getUpdatedAspect();
// skip the normal ingestion to the DAO
if (result.isSkipProcessing()) {
Expand Down Expand Up @@ -901,7 +901,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional)} as well
* Note: If you update the lambda function (ignored - newValue),
* make sure to update {@link #aspectCallbackHelper(Urn, RecordTemplate, Optional, IngestionParams)}as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
Expand Down Expand Up @@ -1718,13 +1719,15 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
* @param urn the urn of the asset
* @param newAspectValue the new aspect value
* @param oldAspectValue the old aspect value
* @param ingestionParams the ingestionparams of the current update
* @return AspectUpdateResult which contains updated aspect value
*/
protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue, Optional<ASPECT> oldAspectValue) {
protected <ASPECT extends RecordTemplate> AspectUpdateResult aspectCallbackHelper(URN urn, ASPECT newAspectValue,
Optional<ASPECT> oldAspectValue, IngestionParams ingestionParams) {
if (_aspectCallbackRegistry != null && _aspectCallbackRegistry.isRegistered(
newAspectValue.getClass())) {
AspectCallbackRoutingClient client = _aspectCallbackRegistry.getAspectCallbackRoutingClient(newAspectValue.getClass());
AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue);
AspectCallbackResponse aspectCallbackResponse = client.routeAspectCallback(urn, newAspectValue, oldAspectValue, ingestionParams);
ASPECT updatedAspect = (ASPECT) aspectCallbackResponse.getUpdatedAspect();
log.info("Aspect callback routing completed in BaseLocalDao, urn: {}, updated aspect: {}", urn, updatedAspect);
return new AspectUpdateResult(updatedAspect, client.isSkipProcessing());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.internal.IngestionParams;
import java.util.Optional;


Expand All @@ -18,6 +19,18 @@ public interface AspectCallbackRoutingClient<ASPECT extends RecordTemplate> {
*/
AspectCallbackResponse<ASPECT> routeAspectCallback(Urn urn, ASPECT newAspectValue, Optional<ASPECT> existingAspectValue);

/**
* A method that routes the updates request to the appropriate custom API.
* @param urn the urn of the asset
* @param newAspectValue the aspect to be updated
* @param existingAspectValue the existing aspect value
* @param ingestionParams the ingestionParams of current update
* @return AspectCallbackResponse containing the updated aspect
*/
default AspectCallbackResponse<ASPECT> routeAspectCallback(Urn urn, ASPECT newAspectValue,
Optional<ASPECT> existingAspectValue, IngestionParams ingestionParams) {
return routeAspectCallback(urn, newAspectValue, existingAspectValue);
}
/**
* A method that returns whether to skip processing further ingestion.
* @return true if the ingestion should be skipped, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public void testAspectCallbackHelperFromFooToBar() throws URISyntaxException {

AspectCallbackRegistry aspectCallbackRegistry = new AspectCallbackRegistry(aspectCallbackMap);
_dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry);
BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null);
BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, Optional.empty(), null);
AspectFoo newAspect = (AspectFoo) result.getUpdatedAspect();
assertEquals(newAspect, bar);
}
Expand Down Expand Up @@ -705,7 +705,7 @@ public void testAspectCallbackHelperWithUnregisteredAspect() throws URISyntaxExc
_dummyLocalDAO.setAspectCallbackRegistry(aspectCallbackRegistry);

// Call the add method
BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, null);
BaseLocalDAO.AspectUpdateResult result = _dummyLocalDAO.aspectCallbackHelper(urn, foo, Optional.empty(), null);

// Verify that the result is the same as the input aspect since it's not registered
assertEquals(result.getUpdatedAspect(), foo);
Expand Down

0 comments on commit d4a2261

Please sign in to comment.