Skip to content

Commit

Permalink
repl: add getExecutionPlan() endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
akphi committed Jan 10, 2025
1 parent dc77330 commit 3ebcbbf
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private MongoCollection<Document> getQueryEventCollection()
{
return this.getQueryDatabase().getCollection(Vault.INSTANCE.getValue("query.mongo.collection.dataCubeEvent"));
}
throw new RuntimeException("Query event MongoDB collection has not been configured properly");
throw new RuntimeException("DataCube Query event MongoDB collection has not been configured properly");
}

private <T> T documentToClass(Document document, Class<T> _class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.finos.legend.engine.plan.generation.PlanGenerator;
import org.finos.legend.engine.plan.generation.transformers.LegendPlanTransformers;
import org.finos.legend.engine.protocol.pure.v1.model.context.PureModelContextData;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.SingleExecutionPlan;
import org.finos.legend.engine.protocol.pure.v1.model.domain.Function;
import org.finos.legend.engine.protocol.pure.v1.model.domain.Multiplicity;
import org.finos.legend.engine.protocol.pure.v1.model.executionPlan.SingleExecutionPlan;
import org.finos.legend.engine.protocol.pure.v1.model.type.GenericType;
import org.finos.legend.engine.protocol.pure.v1.model.type.PackageableType;
import org.finos.legend.engine.protocol.pure.v1.model.type.relationType.RelationType;
Expand All @@ -48,6 +48,7 @@
import org.finos.legend.engine.repl.client.Client;
import org.finos.legend.engine.repl.core.legend.LegendInterface;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeExecutionResult;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeGetExecutionPlanResult;
import org.finos.legend.engine.shared.core.api.grammar.RenderStyle;
import org.finos.legend.engine.shared.core.identity.Identity;
import org.finos.legend.engine.shared.core.kerberos.SubjectTools;
Expand All @@ -65,7 +66,6 @@ public class DataCubeHelpers
{
public static DataCubeExecutionResult executeQuery(Client client, LegendInterface legendInterface, PlanExecutor planExecutor, PureModelContextData data, boolean debug) throws IOException
{

Function func = (Function) ListIterate.select(data.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst();
String queryCode = getQueryCode(func.body.get(0), false);

Expand Down Expand Up @@ -129,24 +129,49 @@ public static DataCubeExecutionResult executeQuery(Client client, LegendInterfac
}
}

public static RelationType getRelationReturnType(LegendInterface legendInterface, Lambda lambda, PureModelContextData data)
public static DataCubeGetExecutionPlanResult getExecutionPlan(Client client, LegendInterface legendInterface, PureModelContextData data, boolean debug) throws IOException
{
PureModelContextData pmcd;
if (data != null)
Function func = (Function) ListIterate.select(data.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst();
String queryCode = getQueryCode(func.body.get(0), false);

if (client != null && debug)
{
pmcd = DataCubeHelpers.injectNewFunction(data, lambda).getOne();
client.println("Debugging query execution...");
client.printDebug("---------------------------------------- INPUT ----------------------------------------");
client.println("Function: " + queryCode);
}
else

PureModel pureModel = legendInterface.compile(data);
RichIterable<? extends Root_meta_pure_extension_Extension> extensions = PureCoreExtensionLoader.extensions().flatCollect(e -> e.extraPureCoreExtensions(pureModel.getExecutionSupport()));

// Plan
if (client != null && debug)
{
pmcd = PureModelContextData.newBuilder().withElement(wrapLambda(lambda)).build();
client.printDebug("---------------------------------------- PLAN ----------------------------------------");
}
return getRelationReturnType(legendInterface, pmcd);
// TODO: Since H2 does not support pivot(), when pivot() is used, the debugger will fail as it defaults to use H2
// when we switch out to use DuckDB as the core testing DB, then this issue should be resolved
Root_meta_pure_executionPlan_ExecutionPlan _plan = legendInterface.generatePlan(pureModel, false);
String planStr = PlanGenerator.serializeToJSON(_plan, "vX_X_X", pureModel, extensions, LegendPlanTransformers.transformers);
if (client != null && debug)
{
client.println("Generated Plan: " + planStr);
}

DataCubeGetExecutionPlanResult result = new DataCubeGetExecutionPlanResult();
result.plan = (SingleExecutionPlan) PlanExecutor.readExecutionPlan(planStr);
return result;
}

public static RelationType getRelationReturnType(LegendInterface legendInterface, PureModelContextData data)
public static RelationType getRelationReturnType(LegendInterface legendInterface, Lambda lambda, PureModelContextData model)
{
PureModel pureModel = legendInterface.compile(data);
return RelationTypeHelper.convert((org.finos.legend.pure.m3.coreinstance.meta.pure.metamodel.relation.RelationType) pureModel.getConcreteFunctionDefinition(REPL_RUN_FUNCTION_QUALIFIED_PATH, null)._expressionSequence().getLast()._genericType()._typeArguments().getFirst()._rawType());
return getRelationReturnType(legendInterface, DataCubeHelpers.injectNewFunction(model, lambda).getOne());
}

public static RelationType getRelationReturnType(LegendInterface legendInterface, PureModelContextData model)
{
PureModel pureModel = legendInterface.compile(model);
return RelationTypeHelper.convert((org.finos.legend.pure.m3.coreinstance.meta.pure.metamodel.relation.RelationType<?>) pureModel.getConcreteFunctionDefinition(REPL_RUN_FUNCTION_QUALIFIED_PATH, null)._expressionSequence().getLast()._genericType()._typeArguments().getFirst()._rawType());
}

public static ValueSpecification parseQuery(String code, Boolean returnSourceInformation)
Expand All @@ -159,20 +184,16 @@ public static String getQueryCode(ValueSpecification valueSpecification, Boolean
return valueSpecification.accept(DEPRECATED_PureGrammarComposerCore.Builder.newInstance().withRenderStyle(pretty != null && pretty ? RenderStyle.PRETTY : RenderStyle.STANDARD).build());
}

public static CompletionResult getCodeTypeahead(String code, Lambda lambda, PureModelContextData data, MutableList<CompleterExtension> extensions, LegendInterface legendInterface)
public static CompletionResult getCodeTypeahead(String code, Lambda lambda, PureModelContextData model, MutableList<CompleterExtension> extensions, LegendInterface legendInterface)
{
try
{
String graphCode = "";
if (data != null)
{
PureModelContextData newData = PureModelContextData.newBuilder()
.withOrigin(data.getOrigin())
.withSerializer(data.getSerializer())
.withElements(ListIterate.select(data.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.build();
graphCode = PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(newData);
}
PureModelContextData newData = PureModelContextData.newBuilder()
.withOrigin(model.getOrigin())
.withSerializer(model.getSerializer())
.withElements(ListIterate.select(model.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.build();
String graphCode = PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(newData);
String baseQueryCode = lambda != null ? getQueryCode(lambda.body.get(0), false) : null;
String queryCode = (baseQueryCode != null ? baseQueryCode : "") + code;
Completer completer = new Completer(graphCode, extensions, legendInterface);
Expand All @@ -189,38 +210,46 @@ public static CompletionResult getCodeTypeahead(String code, Lambda lambda, Pure
}
}

public static Function wrapLambda(Lambda lambda)
{
Function func = new Function();
func.name = REPL_RUN_FUNCTION_QUALIFIED_PATH.substring(REPL_RUN_FUNCTION_QUALIFIED_PATH.lastIndexOf("::") + 2);
func._package = REPL_RUN_FUNCTION_QUALIFIED_PATH.substring(0, REPL_RUN_FUNCTION_QUALIFIED_PATH.lastIndexOf("::"));
func.returnGenericType = new GenericType(new PackageableType(M3Paths.Any));
func.returnMultiplicity = new Multiplicity(0, null);
func.body = lambda.body;
return func;
}

/**
* Replace the magic function in the given graph data by a new function with the body of the specified lambda
* Replace the magic function (if exists) in the given graph data by a new function with the body of the specified lambda
*/
public static Pair<PureModelContextData, Function> injectNewFunction(PureModelContextData originalData, Lambda lambda)
public static Pair<PureModelContextData, Function> injectNewFunction(PureModelContextData model, Lambda lambda)
{
Function originalFunction = (Function) ListIterate.select(originalData.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst();
Function func = new Function();
func.name = originalFunction.name;
func._package = originalFunction._package;
func.parameters = originalFunction.parameters;
func.returnGenericType = originalFunction.returnGenericType;
func.returnMultiplicity = originalFunction.returnMultiplicity;
func.body = lambda != null ? lambda.body : func.body; // if no lambda is specified, we'll just use the original function

PureModelContextData data = PureModelContextData.newBuilder()
.withOrigin(originalData.getOrigin())
.withSerializer(originalData.getSerializer())
.withElements(ListIterate.select(originalData.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.withElement(func)
.build();

return Tuples.pair(data, func);
PureModelContextData newModel;
Function func;
if (model.getElements().stream().anyMatch(e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
{
Function originalFunction = (Function) ListIterate.select(model.getElements(), e -> e.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)).getFirst();
func = new Function();
func.name = originalFunction.name;
func._package = originalFunction._package;
func.parameters = originalFunction.parameters;
func.returnGenericType = originalFunction.returnGenericType;
func.returnMultiplicity = originalFunction.returnMultiplicity;
func.body = lambda != null ? lambda.body : func.body; // if no lambda is specified, we'll just use the original function

newModel = PureModelContextData.newBuilder()
.withOrigin(model.getOrigin())
.withSerializer(model.getSerializer())
.withElements(ListIterate.select(model.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.withElement(func)
.build();
}
else
{
func = new Function();
func.name = REPL_RUN_FUNCTION_QUALIFIED_PATH.substring(REPL_RUN_FUNCTION_QUALIFIED_PATH.lastIndexOf("::") + 2);
func._package = REPL_RUN_FUNCTION_QUALIFIED_PATH.substring(0, REPL_RUN_FUNCTION_QUALIFIED_PATH.lastIndexOf("::"));
func.returnGenericType = new GenericType(new PackageableType(M3Paths.Any));
func.returnMultiplicity = new Multiplicity(0, null);
func.body = lambda.body;
newModel = PureModelContextData.newBuilder()
.withOrigin(model.getOrigin())
.withSerializer(model.getSerializer())
.withElements(model.getElements())
.withElement(func)
.build();
}
return Tuples.pair(newModel, func);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public void initialize() throws Exception
.withKeyValue("/api/dataCube/getRelationReturnType", new DataCubeQueryBuilder.GetRelationReturnType())
.withKeyValue("/api/dataCube/getRelationReturnType/code", new DataCubeQueryBuilder.GetQueryCodeRelationReturnType())
.withKeyValue("/api/dataCube/executeQuery", new DataCubeQueryExecutor.ExecuteQuery())
.withKeyValue("/api/dataCube/getExecutionPlan", new DataCubeQueryExecutor.GetExecutionPlan())
.keyValuesView().collect(config -> server.createContext(config.getOne(), config.getTwo().getHandler(this.state))).toList();

// CORS filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public HttpHandler getHandler(REPLServerState state)
BufferedReader bufferReader = new BufferedReader(inputStreamReader);
String requestBody = bufferReader.lines().collect(Collectors.joining());
DataCubeQueryTypeaheadInput input = state.objectMapper.readValue(requestBody, DataCubeQueryTypeaheadInput.class);
CompletionResult result = DataCubeHelpers.getCodeTypeahead(input.code, input.baseQuery, input.isolated ? null : state.getCurrentPureModelContextData(), state.client.getCompleterExtensions(), state.legendInterface);
CompletionResult result = DataCubeHelpers.getCodeTypeahead(input.code, input.baseQuery, input.model != null ? input.model : state.getCurrentPureModelContextData(), state.client.getCompleterExtensions(), state.legendInterface);
handleJSONResponse(exchange, 200, state.objectMapper.writeValueAsString(result.getCompletion()), state);
}
catch (Exception e)
Expand All @@ -182,7 +182,7 @@ public HttpHandler getHandler(REPLServerState state)
BufferedReader bufferReader = new BufferedReader(inputStreamReader);
String requestBody = bufferReader.lines().collect(Collectors.joining());
DataCubeGetQueryRelationReturnTypeInput input = state.objectMapper.readValue(requestBody, DataCubeGetQueryRelationReturnTypeInput.class);
handleJSONResponse(exchange, 200, state.objectMapper.writeValueAsString(DataCubeHelpers.getRelationReturnType(state.legendInterface, input.query, input.isolated ? null : state.getCurrentPureModelContextData())), state);
handleJSONResponse(exchange, 200, state.objectMapper.writeValueAsString(DataCubeHelpers.getRelationReturnType(state.legendInterface, input.query, input.model != null ? input.model : state.getCurrentPureModelContextData())), state);
}
catch (Exception e)
{
Expand Down Expand Up @@ -217,17 +217,14 @@ public HttpHandler getHandler(REPLServerState state)
DataCubeGetQueryCodeRelationReturnTypeInput input = state.objectMapper.readValue(requestBody, DataCubeGetQueryCodeRelationReturnTypeInput.class);

String graphCode = "";
if (!input.isolated)
{
PureModelContextData currentData = state.getCurrentPureModelContextData();
PureModelContextData newData = PureModelContextData.newBuilder()
.withOrigin(currentData.getOrigin())
.withSerializer(currentData.getSerializer())
.withElements(ListIterate.select(currentData.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.build();
graphCode += PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(newData);
graphCode += "\n###Pure\n";
}
PureModelContextData currentData = input.model != null ? input.model : state.getCurrentPureModelContextData();
PureModelContextData newData = PureModelContextData.newBuilder()
.withOrigin(currentData.getOrigin())
.withSerializer(currentData.getSerializer())
.withElements(ListIterate.select(currentData.getElements(), el -> !el.getPath().equals(REPL_RUN_FUNCTION_QUALIFIED_PATH)))
.build();
graphCode += PureGrammarComposer.newInstance(PureGrammarComposerContext.Builder.newInstance().build()).renderPureModelContextData(newData);
graphCode += "\n###Pure\n";
graphCode += "function " + REPL_RUN_FUNCTION_SIGNATURE + "{\n";
graphCode += DataCubeHelpers.getQueryCode(input.baseQuery.body.get(0), false) + "\n";
int lineOffset = StringUtils.countMatches(graphCode, "\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import org.finos.legend.engine.repl.dataCube.server.DataCubeHelpers;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeExecutionInput;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeExecutionResult;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeGetExecutionPlanInput;
import org.finos.legend.engine.repl.dataCube.server.model.DataCubeGetExecutionPlanResult;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;

import static org.finos.legend.engine.repl.dataCube.server.DataCubeHelpers.executeQuery;
import static org.finos.legend.engine.repl.dataCube.server.DataCubeHelpers.getExecutionPlan;
import static org.finos.legend.engine.repl.dataCube.server.REPLServerHelpers.*;

public class DataCubeQueryExecutor
Expand Down Expand Up @@ -60,4 +63,34 @@ public HttpHandler getHandler(REPLServerState state)
};
}
}

public static class GetExecutionPlan implements DataCubeServerHandler
{
@Override
public HttpHandler getHandler(REPLServerState state)
{
return exchange ->
{
if ("POST".equals(exchange.getRequestMethod()))
{
try
{
InputStreamReader inputStreamReader = new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8);
BufferedReader bufferReader = new BufferedReader(inputStreamReader);
String requestBody = bufferReader.lines().collect(Collectors.joining());
DataCubeGetExecutionPlanInput input = state.objectMapper.readValue(requestBody, DataCubeGetExecutionPlanInput.class);
boolean debug = input.debug != null && input.debug;
Lambda lambda = input.query;
PureModelContextData model = DataCubeHelpers.injectNewFunction(state.getCurrentPureModelContextData(), lambda).getOne();
DataCubeGetExecutionPlanResult result = getExecutionPlan(state.client, state.legendInterface, model, debug);
handleJSONResponse(exchange, 200, state.objectMapper.writeValueAsString(result), state);
}
catch (Exception e)
{
handleTextResponse(exchange, 500, e.getMessage(), state);
}
}
};
}
}
}
Loading

0 comments on commit 3ebcbbf

Please sign in to comment.