Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support functions with complex parameters #3356

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@
<artifactId>legend-engine-xt-elasticsearch-V7-executionPlan</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-ingest-executionPlan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-language-pure-dsl-service-execution</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
import org.eclipse.collections.api.set.MutableSet;
import org.eclipse.collections.impl.factory.Sets;
import org.finos.legend.engine.external.format.arrow.ArrowRuntimeExtension;
import org.finos.legend.engine.external.format.flatdata.FlatDataJavaCompilerExtension;
import org.finos.legend.engine.external.format.flatdata.FlatDataRuntimeExtension;
import org.finos.legend.engine.external.format.flatdata.driver.spi.FlatDataDriverDescription;
import org.finos.legend.engine.external.format.json.JsonJavaCompilerExtension;
import org.finos.legend.engine.external.format.json.JsonSchemaRuntimeExtension;
import org.finos.legend.engine.external.format.xml.XmlJavaCompilerExtension;
import org.finos.legend.engine.external.format.xml.XsdRuntimeExtension;
import org.finos.legend.engine.external.shared.ExternalFormatJavaCompilerExtension;
import org.finos.legend.engine.external.shared.runtime.ExternalFormatExecutionExtension;
import org.finos.legend.engine.external.shared.runtime.ExternalFormatRuntimeExtension;
import org.finos.legend.engine.language.pure.dsl.service.execution.AbstractServicePlanExecutor;
import org.finos.legend.engine.plan.execution.extension.ExecutionExtension;
import org.finos.legend.engine.plan.execution.ingest.compiler.IngestJavaCompilerExtension;
import org.finos.legend.engine.plan.execution.nodes.helpers.platform.ExecutionPlanJavaCompilerExtension;
import org.finos.legend.engine.plan.execution.stores.StoreExecutorBuilder;
import org.finos.legend.engine.plan.execution.stores.inMemory.plugin.InMemoryStoreExecutorBuilder;
import org.finos.legend.engine.plan.execution.stores.mongodb.compiler.MongoDBDocumentFormatJavaCompilerExtension;
import org.finos.legend.engine.plan.execution.stores.relational.AthenaConnectionExtension;
import org.finos.legend.engine.plan.execution.stores.relational.BigQueryConnectionExtension;
import org.finos.legend.engine.plan.execution.stores.relational.DatabricksConnectionExtension;
Expand Down Expand Up @@ -88,6 +95,23 @@ protected MutableList<Class<? extends ConnectionExtension>> expectedConnectionEx
.with(TrinoConnectionExtension.class);
}

@Test
public void testJavaCompilerExtensions()
{
assertHasExtensions(expectedJavaCompilerExtensions(), ExecutionPlanJavaCompilerExtension.class);
}

protected MutableList<Class<? extends ExecutionPlanJavaCompilerExtension>> expectedJavaCompilerExtensions()
{
return Lists.mutable.<Class<? extends ExecutionPlanJavaCompilerExtension>>empty()
.with(MongoDBDocumentFormatJavaCompilerExtension.class)
.with(IngestJavaCompilerExtension.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed

.with(ExternalFormatJavaCompilerExtension.class)
.with(FlatDataJavaCompilerExtension.class)
.with(JsonJavaCompilerExtension.class)
.with(XmlJavaCompilerExtension.class);
}

@Test
public void testExternalFormatRuntimeExtensions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,21 @@
<!-- OpenAPI -->

<!-- ENGINE XT -->
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-ingest-protocol</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-ingest-executionPlan</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-ingest-javaPlatformBinding-pure</artifactId>
</dependency>


<!-- ECLIPSE COLLECTIONS -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ protected Iterable<? extends Class<? extends LegendPureCoreExtension>> getExpect
.with(ServiceLegendPureCoreExtension.class)
.with(RelationalJavaBindingLegendPureCoreExtension.class)
.with(ArrowLegendPureCoreExtension.class)
.with(IngestJavaBindingLegendPureCoreExtension.class)
;
}

Expand Down Expand Up @@ -602,6 +603,8 @@ protected Iterable<String> getExpectedCodeRepositories()
.with("core_external_store_relational_sql_dialect_translation")
.with("core_external_store_relational_sql_dialect_translation_duckdb")
.with("core_external_store_relational_postgres_sql_parser")
.with("core_ingest_java_platform_binding")
.with("core_ingest")
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-nonrelationalStore-mongodb-executionPlan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-ingest-executionPlan</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-language-pure-modelManager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

package org.finos.legend.engine.plan.execution.nodes.helpers.platform;

import org.eclipse.collections.api.factory.Maps;
import org.eclipse.collections.api.list.MutableList;
import org.eclipse.collections.impl.factory.Lists;
import org.finos.legend.engine.shared.core.extension.LegendPlanExtension;
import org.finos.legend.engine.shared.javaCompiler.ClassPathFilter;
import java.util.Map;

public interface ExecutionPlanJavaCompilerExtension extends LegendPlanExtension
{
default Map<String, Class<?>> dependencies()
{
return Maps.fixedSize.empty();
}

ClassPathFilter getExtraClassPathFilter();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ private static void processStreamParameter(Variable param, List<ParameterValidat
ValidationResult validationResult = FunctionParametersParametersValidation.validate(param, parameterValidationContext, val);
if (validationResult.isValid())
{
return FunctionParametersNormalizer.normalizeParameterValue(param, val);
if (parameterValidationContext != null)
{
return FunctionParametersNormalizer.normalizeParameterValue(param, parameterValidationContext.stream().filter(x -> x.varName.equals(param.name)).findAny().orElse(null), val);
}
else
{
return FunctionParametersNormalizer.normalizeParameterValue(param, null, val);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

package org.finos.legend.engine.plan.execution.validation;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.collections.api.RichIterable;
import org.finos.legend.engine.plan.execution.nodes.helpers.platform.ExecutionPlanJavaCompilerExtension;
import org.finos.legend.engine.plan.execution.nodes.helpers.platform.ExecutionPlanJavaCompilerExtensionLoader;
import org.finos.legend.engine.plan.execution.nodes.state.ExecutionState;
import org.finos.legend.engine.plan.execution.result.ConstantResult;
import org.finos.legend.engine.plan.execution.result.Result;
import org.finos.legend.engine.plan.execution.result.date.EngineDate;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.ParameterValidationContext;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.ProtocolObjectValidationContext;
import org.finos.legend.engine.protocol.pure.v1.model.type.PackageableType;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.Variable;
import org.finos.legend.engine.shared.core.ObjectMapperFactory;

import java.math.BigDecimal;
import java.time.Instant;
Expand All @@ -29,22 +36,30 @@
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;

class FunctionParametersNormalizer
{
static void normalizeParameters(RichIterable<Variable> functionParameters, ExecutionState executionState)
static void normalizeParameters(RichIterable<Variable> functionParameters,List<ParameterValidationContext> parameterValidationContexts, ExecutionState executionState)
{
functionParameters.forEach(p -> normalizeParameter(p, executionState));
if (parameterValidationContexts == null)
{
functionParameters.forEach(p -> normalizeParameter(p, null, executionState));
}
else
{
functionParameters.forEach(p -> normalizeParameter(p, parameterValidationContexts.stream().filter(x -> x.varName.equals(p.name)).findAny().orElse(null), executionState));
}
}

private static void normalizeParameter(Variable parameter, ExecutionState executionState)
private static void normalizeParameter(Variable parameter, ParameterValidationContext parameterValidationContext, ExecutionState executionState)
{
Result paramResult = executionState.getResult(parameter.name);
if (paramResult instanceof ConstantResult)
{
Object paramValue = ((ConstantResult) paramResult).getValue();
Object normalized = normalizeParameterValue(parameter, paramValue);
Object normalized = normalizeParameterValue(parameter, parameterValidationContext, paramValue);
if (normalized != paramValue)
{
ConstantResult updatedDateTime = new ConstantResult(normalized);
Expand All @@ -53,7 +68,7 @@ private static void normalizeParameter(Variable parameter, ExecutionState execut
}
}

public static Object normalizeParameterValue(Variable parameter, Object paramValue)
public static Object normalizeParameterValue(Variable parameter, ParameterValidationContext parameterValidationContext, Object paramValue)
{
if (paramValue == null)
{
Expand Down Expand Up @@ -91,11 +106,47 @@ public static Object normalizeParameterValue(Variable parameter, Object paramVal
}
default:
{
if (parameterValidationContext instanceof ProtocolObjectValidationContext)
{
return normalizeParameterValue(paramValue, x -> normalizeProtocolObjectParameterValue((ProtocolObjectValidationContext) parameterValidationContext, x));
}
return paramValue;
}
}
}

private static Object normalizeProtocolObjectParameterValue(ProtocolObjectValidationContext parameterValidationContext, Object paramValue)
{
String protocolObjectClassName = parameterValidationContext.protocolClassName;
try
{
Class<?> protocolObjectClass = ExecutionPlanJavaCompilerExtensionLoader.extensions().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class.forName(protocolObjectClassName)

.map(ExecutionPlanJavaCompilerExtension::dependencies)
.map(map -> map.get(protocolObjectClassName))
.filter(Objects::nonNull)
.findAny()
.orElseThrow(() -> new IllegalArgumentException("Function Parameter class not found in package dependencies for class:." + protocolObjectClassName));

if (protocolObjectClass.isInstance(paramValue))
{
return paramValue;
}
else if (paramValue instanceof String)
{
ObjectMapper objectMapper = ObjectMapperFactory.getNewStandardObjectMapperWithPureProtocolExtensionSupports();
return objectMapper.readValue((String) paramValue, protocolObjectClass);
}
else
{
throw new IllegalArgumentException("Function Parameter should be of type JSON String or " + protocolObjectClass.getName());
}
}
catch (JsonProcessingException e)
{
throw new RuntimeException(e);
}
}

private static Object normalizeParameterValue(Object value, Function<Object, ?> normalizer)
{
if (value == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void validate(RichIterable<Variable> functionParameters, List<Para
Map<String, Result> providedParameterValues = executionState.getResults();
validateNoMissingMandatoryParamaters(functionParameters, providedParameterValues);
validateParameterValues(functionParameters, parameterValidationContext, providedParameterValues);
FunctionParametersNormalizer.normalizeParameters(functionParameters, executionState);
FunctionParametersNormalizer.normalizeParameters(functionParameters,parameterValidationContext, executionState);
FunctionParameterProcessor.processParameters(functionParameters, parameterValidationContext, executionState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.EnumValidationContext;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.ParameterValidationContextVisitor;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes.ProtocolObjectValidationContext;
import org.finos.legend.engine.protocol.pure.v1.model.type.PackageableType;
import org.finos.legend.engine.protocol.pure.v1.model.valueSpecification.Variable;

Expand All @@ -42,4 +43,10 @@ public ValidationResult visit(EnumValidationContext enumValidationContext)
}
return (validEnumValues.contains(value.toString())) ? ValidationResult.successValidationResult() : ValidationResult.errorValidationResult("Invalid enum value " + value + " for " + ((PackageableType) var.genericType.rawType).fullPath + ", valid enum values: " + validEnumValues);
}

@Override
public ValidationResult visit(ProtocolObjectValidationContext protocolObjectValidationContext)
{
return ValidationResult.successValidationResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "_type")
@JsonSubTypes({
@JsonSubTypes.Type(value = EnumValidationContext.class, name = "enumValidationContext")
@JsonSubTypes.Type(value = EnumValidationContext.class, name = "enumValidationContext"),
@JsonSubTypes.Type(value = ProtocolObjectValidationContext.class, name = "protocolObjectValidationContext")
})

// NOTE: due to plan generator producing duplicated _type field, we need to enable this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@
public interface ParameterValidationContextVisitor<T>
{
T visit(EnumValidationContext enumValidationContext);

T visit(ProtocolObjectValidationContext protocolObjectValidationContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.protocol.pure.v1.model.executionPlan.nodes;

public class ProtocolObjectValidationContext extends ParameterValidationContext
{
public String protocolClassName;

@Override
public <T> T accept(ParameterValidationContextVisitor<T> validationContextVisitor)
{
return validationContextVisitor.visit(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ Class meta::pure::executionPlan::EnumValidationContext extends ParameterValidati
validEnumValues: String[*];
}

Class meta::pure::executionPlan::ProtocolObjectValidationContext extends ParameterValidationContext
{
protocolClassName: String[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parameterClass: Class[1];
protocolClassName: String[0..1];

}

Class meta::pure::executionPlan::FunctionParameter
{
name : String[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,18 @@ function <<access.private>> meta::pure::executionPlan::addFunctionParametersVali
{
let collectionEnumParams = $planVarPlaceHolders->filter(p | $p.type->instanceOf(Enumeration) && $p.multiplicity == ZeroMany);
assert($collectionEnumParams->size() == 0, |'Collection of Enums is not supported as service parameter ' + $collectionEnumParams->map(p | $p.name)->makeString('[',', ',']'));
let enumValidationContext = $planVarPlaceHolders->filter(p | $p.type->instanceOf(Enumeration))->map(e | ^meta::pure::executionPlan::EnumValidationContext(varName = $e.name, validEnumValues = $e.type->cast(@Enumeration<Any>)->enumValues()->map(e |$e->id())));
let parameterValidationContexts = $planVarPlaceHolders->map(p |
if(
[
pair(|$p.type->instanceOf(Enumeration), |^meta::pure::executionPlan::EnumValidationContext(varName = $p.name, validEnumValues = $p.type->cast(@Enumeration<Any>)->enumValues()->map(e |$e->id()))),
pair(|$p.type->instanceOf(Class), |^meta::pure::executionPlan::ProtocolObjectValidationContext(varName = $p.name, protocolClassName = $p.type->elementToPath()))
],
|[]
)
);
let paramsSupportedForStreamInput = $routedFunction->findParamsSupportedForStreamInput();
if($planVarPlaceHolders->isNotEmpty(),| let functionParameters = $planVarPlaceHolders->map(p|^FunctionParameter(name=$p.name, supportsStream = $p.name->in($paramsSupportedForStreamInput), multiplicity=$p.multiplicity, type=$p.type));
let functionParametersValidationNode = ^FunctionParametersValidationNode(functionParameters = $functionParameters, resultType=^ResultType(type = Boolean), parameterValidationContext = $enumValidationContext);
let functionParametersValidationNode = ^FunctionParametersValidationNode(functionParameters = $functionParameters, resultType=^ResultType(type = Boolean), parameterValidationContext = $parameterValidationContexts);
$node->match([s:SequenceExecutionNode[1]|^$s(executionNodes=$functionParametersValidationNode->concatenate($s.executionNodes)),
e:ExecutionNode[1]|^SequenceExecutionNode(executionNodes=[$functionParametersValidationNode, $e], resultType=$e.resultType, resultSizeRange=$e.resultSizeRange)]);
,| $node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ Class meta::protocols::pure::v1_33_0::metamodel::executionPlan::EnumValidationCo
validEnumValues: String[*];
}

Class meta::protocols::pure::v1_33_0::metamodel::executionPlan::ProtocolObjectValidationContext extends meta::protocols::pure::v1_33_0::metamodel::executionPlan::ParameterValidationContext
{
protocolClassName: String[1];
}

Class <<typemodifiers.abstract>> meta::protocols::pure::v1_33_0::metamodel::executionPlan::graphFetch::GlobalGraphFetchExecutionNode extends ExecutionNode
{
graphFetchTree : meta::protocols::pure::v1_33_0::metamodel::valueSpecification::raw::GraphFetchTree[1];
Expand Down
Loading
Loading