Skip to content

Commit

Permalink
Injecting PreIngestionLambda in BaseLocalDao (#410)
Browse files Browse the repository at this point in the history
* Addressed comments

* Added javadoc

* Adding Registry to get lambda

* Added unit tests

* Update test name

* Added one more unit test

* Addressed all comments

* Fixed unit typo

* Resetting all changes

* Adding unit tests again

* Updated java doc

* Indent

* Fix java doc error

* Java doc attempt 2

* Minor nitpicks

---------

Co-authored-by: Rakhi Agrawal <[email protected]>
  • Loading branch information
rakhiagr and Rakhi Agrawal authored Sep 4, 2024
1 parent e088d57 commit dc8ab8f
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
Expand All @@ -24,6 +25,8 @@
import com.linkedin.metadata.dao.exception.ModelValidationException;
import com.linkedin.metadata.dao.ingestion.BaseLambdaFunction;
import com.linkedin.metadata.dao.ingestion.LambdaFunctionRegistry;
import com.linkedin.metadata.dao.ingestion.RestliPreUpdateAspectRegistry;
import com.linkedin.metadata.dao.ingestion.RestliCompliantPreUpdateRoutingClient;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.IndefiniteRetention;
Expand Down Expand Up @@ -179,6 +182,7 @@ public static class AspectUpdateLambda<ASPECT extends RecordTemplate> {
protected UrnPathExtractor<URN> _urnPathExtractor;

private LambdaFunctionRegistry _lambdaFunctionRegistry;
private RestliPreUpdateAspectRegistry _restliPreUpdateAspectRegistry;

// Maps an aspect class to the corresponding retention policy
private final Map<Class<? extends RecordTemplate>, Retention> _aspectRetentionMap = new HashMap<>();
Expand Down Expand Up @@ -395,6 +399,15 @@ public void setLambdaFunctionRegistry(@Nullable LambdaFunctionRegistry lambdaFun
_lambdaFunctionRegistry = lambdaFunctionRegistry;
}

/**
* Set pre ingestion aspect registry.
*/
public void setRestliPreIngestionAspectRegistry(
@Nullable RestliPreUpdateAspectRegistry restliPreUpdateAspectRegistry) {
_restliPreUpdateAspectRegistry = restliPreUpdateAspectRegistry;
}


/**
* Enables or disables atomic updates of multiple aspects.
*/
Expand Down Expand Up @@ -825,6 +838,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP

/**
* Same as above {@link #add(Urn, RecordTemplate, AuditStamp)} but with tracking context.
* Note: If you update the lambda function (ignored - newValue), make sure to update {@link #preUpdateRouting(Urn, RecordTemplate)} as well
* to avoid any inconsistency between the lambda function and the add method.
*/
@Nonnull
public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASPECT newValue,
Expand All @@ -833,7 +848,8 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, @Nonnull ASP
final IngestionParams nonNullIngestionParams =
ingestionParams == null || !ingestionParams.hasTestMode() ? new IngestionParams().setIngestionMode(
IngestionMode.LIVE).setTestMode(false) : ingestionParams;
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> newValue, auditStamp, trackingContext, nonNullIngestionParams);
ASPECT updatedAspect = preUpdateRouting(urn, newValue);
return add(urn, (Class<ASPECT>) newValue.getClass(), ignored -> updatedAspect, auditStamp, trackingContext, nonNullIngestionParams);
}

/**
Expand Down Expand Up @@ -1619,4 +1635,23 @@ protected <ASPECT extends RecordTemplate> ASPECT updatePreIngestionLambdas(@Nonn
}
return newValue;
}

/**
* This method routes the update request to the appropriate custom API for pre-ingestion processing.
* @param urn the urn of the asset
* @param newValue the new aspect value
* @return the updated aspect
*/
protected <ASPECT extends RecordTemplate> ASPECT preUpdateRouting(URN urn, ASPECT newValue) {
if (_restliPreUpdateAspectRegistry != null && _restliPreUpdateAspectRegistry.isRegistered(
newValue.getClass())) {
RestliCompliantPreUpdateRoutingClient client =
_restliPreUpdateAspectRegistry.getPreUpdateRoutingClient(newValue);
Message updatedAspect =
client.routingLambda(client.convertUrnToMessage(urn), client.convertAspectToMessage(newValue));
RecordTemplate convertedAspect = client.convertAspectFromMessage(updatedAspect);
return (ASPECT) convertedAspect;
}
return newValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


/**
* An interface that defines methods to route update requests to the appropriate custom APIs.
* An interface that defines methods to route update requests to the appropriate custom APIs for pre-ingestion process.
*/

public interface PreUpdateRoutingClient<ASPECT extends Message> {
Expand All @@ -13,7 +13,6 @@ public interface PreUpdateRoutingClient<ASPECT extends Message> {
* @param urn the urn of the asset
* @param aspect the aspect to be updated
* @return the updated aspect
* @throws Exception if the routing fails
*/
ASPECT routingLambda(Message urn, ASPECT aspect) throws Exception;
ASPECT routingLambda(Message urn, ASPECT aspect);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.metadata.dao.ingestion;

import com.google.protobuf.Message;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;

/**
* A restli client to route update request to the appropriate to custom APIs.
* <p>This interface extends {@link PreUpdateRoutingClient} and provides additional methods for converting
* * URNs and aspects between different representations (e.g., from Pegasus to Protobuf).</p>
* *
*/

public interface RestliCompliantPreUpdateRoutingClient<ASPECT extends Message> extends PreUpdateRoutingClient {

/**
* Converts a URN to a Protobuf message.
*
* @param pegasusUrn the URN to be converted
* @return the converted Protobuf message
*/
Message convertUrnToMessage(Urn pegasusUrn);

/**
* Converts a {@link RecordTemplate} aspect to a Protobuf message aspect.
*
* @param pegasusAspect the aspect to be converted
* @return the converted Protobuf message aspect
*/
ASPECT convertAspectToMessage(RecordTemplate pegasusAspect);

/**
* Converts a Protobuf message aspect to a {@link RecordTemplate} aspect.
*
* @param messageAspect the Protobuf message aspect to be converted
* @return the converted {@link RecordTemplate} aspect
*/
RecordTemplate convertAspectFromMessage(ASPECT messageAspect);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.metadata.dao.ingestion;

import com.linkedin.data.template.RecordTemplate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


/**
* A registry which maintains mapping of aspects and their getPreUpdateRoutingClient.
*/
public interface RestliPreUpdateAspectRegistry {

/**
* Get PreUpdateRoutingClient for an aspect.
*/
@Nullable
<ASPECT extends RecordTemplate> RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull final ASPECT aspect);

/**
* Check if PreUpdateRoutingClient is registered for an aspect.
*/
<ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull final Class<ASPECT> aspectClass);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.data.template.UnionTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.ingestion.SampleLambdaFunctionRegistryImpl;
import com.linkedin.metadata.dao.ingestion.SamplePreUpdateAspectRegistryImpl;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
import com.linkedin.metadata.dao.retention.TimeBasedRetention;
Expand Down Expand Up @@ -650,4 +651,48 @@ public void testPreIngestionLambda() throws URISyntaxException {

verifyNoMoreInteractions(_mockTrackingEventProducer);
}

@Test
public void testPreUpdateRoutingFromFooToBar() throws URISyntaxException {
// Setup test data
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
AspectFoo result = _dummyLocalDAO.preUpdateRouting(urn, foo);
assertEquals(result, bar);
}

@Test
public void testMAEEmissionForPreUpdateRouting() throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo foo = new AspectFoo().setValue("foo");
AspectFoo bar = new AspectFoo().setValue("bar");
_dummyLocalDAO.setAlwaysEmitAuditEvent(true);
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());
expectGetLatest(urn, AspectFoo.class,
Arrays.asList(makeAspectEntry(null, null), makeAspectEntry(foo, _dummyAuditStamp)));

_dummyLocalDAO.add(urn, foo, _dummyAuditStamp);

verify(_mockEventProducer, times(1)).produceMetadataAuditEvent(urn, null, bar);
verify(_mockEventProducer, times(1)).produceAspectSpecificMetadataAuditEvent(urn, null, bar, _dummyAuditStamp, IngestionMode.LIVE);
verifyNoMoreInteractions(_mockEventProducer);
}

@Test
public void testPreUpdateRoutingWithUnregisteredAspect() throws URISyntaxException {
// Setup test data
FooUrn urn = new FooUrn(1);
AspectBar foo = new AspectBar().setValue("foo");

// Inject RestliPreIngestionAspectRegistry with no registered aspect
_dummyLocalDAO.setRestliPreIngestionAspectRegistry(new SamplePreUpdateAspectRegistryImpl());

// Call the add method
AspectBar result = _dummyLocalDAO.preUpdateRouting(urn, foo);

// Verify that the result is the same as the input aspect since it's not registered
assertEquals(result, foo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.linkedin.metadata.dao.ingestion;

import com.google.common.collect.ImmutableMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.testing.AspectFoo;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


public class SamplePreUpdateAspectRegistryImpl implements RestliPreUpdateAspectRegistry {
private final ImmutableMap<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient> registry;

public SamplePreUpdateAspectRegistryImpl() {
registry = new ImmutableMap.Builder<Class<? extends RecordTemplate>, RestliCompliantPreUpdateRoutingClient>()
.put(AspectFoo.class, new SamplePreUpdateRoutingClient())
.build();
}
@Nullable
@Override
public <ASPECT extends RecordTemplate> RestliCompliantPreUpdateRoutingClient getPreUpdateRoutingClient(@Nonnull ASPECT aspect) {
return registry.get(aspect.getClass());
}

@Override
public <ASPECT extends RecordTemplate> boolean isRegistered(@Nonnull Class<ASPECT> aspectClass) {
return registry.containsKey(aspectClass);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.metadata.dao.ingestion;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.testing.AspectFoo;


public class SamplePreUpdateRoutingClient implements RestliCompliantPreUpdateRoutingClient {
@Override
public Message routingLambda(Message urn, Message aspect) {
// For testing, change the aspect value to "bar"
return Any.pack(StringValue.of("bar"));
}

@Override
public Message convertUrnToMessage(Urn urn) {
// Directly wrap the URN string into a Protobuf message for testing
return Any.pack(StringValue.of(urn.toString()));
}

@Override
public Message convertAspectToMessage(RecordTemplate pegasusAspect) {
// For testing, convert AspectFoo to a TestMessageProtos.AspectMessage
// Assuming the aspect has a `value` field and its string representation can be used for now
String aspectString = pegasusAspect.toString(); // Extracting the aspect as a string (e.g., {value=foo})

// Wrap the aspect string into a simple Protobuf message for testing
return Any.pack(StringValue.of(aspectString));
}

@Override
public RecordTemplate convertAspectFromMessage(Message messageAspect) {
// For testing, convert TestMessageProtos.AspectMessage back to AspectFoo
// Create a new RecordTemplate (AspectFoo in this case) and set the value field
return new AspectFoo().setValue("bar");
}
}

0 comments on commit dc8ab8f

Please sign in to comment.