Skip to content
This repository has been archived by the owner on Jun 26, 2024. It is now read-only.

chore | adding the support of joining nodes for eds source query #204

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public QueryNode build() {
executionContext.getExpressionContext(), EDS.name());
if (entitiesRequest.getIncludeNonLiveEntities() && areFiltersOnlyOnEds) {
ExecutionTreeUtils.removeDuplicateSelectionAttributes(executionContext, EDS.name());

QueryNode rootNode = new DataFetcherNode(EDS.name(), entitiesRequest.getFilter());
// if the filter by and order by are from the same source, pagination can be pushed down to
// EDS
Expand All @@ -84,12 +83,16 @@ public QueryNode build() {
entitiesRequest.getOrderByList(),
entitiesRequest.getFetchTotal());
executionContext.setSortAndPaginationNodeAdded(true);
rootNode.acceptVisitor(new ExecutionContextBuilderVisitor(executionContext));
QueryNode executionTree = buildExecutionTree(executionContext, rootNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Execution Tree:{}", executionTree.acceptVisitor(new PrintVisitor()));
}
return executionTree;
}

rootNode.acceptVisitor(new ExecutionContextBuilderVisitor(executionContext));

QueryNode executionTree = buildExecutionTree(executionContext, rootNode);

QueryNode executionTree = buildExecutionTreeWithJoinNode(executionContext, rootNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Execution Tree:{}", executionTree.acceptVisitor(new PrintVisitor()));
}
Expand Down Expand Up @@ -223,6 +226,37 @@ QueryNode buildExecutionTree(EntityExecutionContext executionContext, QueryNode
metricSourcesForOrderBy.forEach(executionContext::removePendingMetricAggregationSources);
}

return buildPaginationAndSelectionsNode(executionContext, rootNode);
}

@VisibleForTesting
QueryNode buildExecutionTreeWithJoinNode(
EntityExecutionContext executionContext, QueryNode filterTree) {
QueryNode rootNode = filterTree;
// Select attributes from sources in order by but not part of the filter tree
Set<String> attrSourcesForOrderBy = executionContext.getPendingSelectionSourcesForOrderBy();
if (!attrSourcesForOrderBy.isEmpty()) {
rootNode =
new JoinNode.Builder(filterTree).setAttrSelectionSources(attrSourcesForOrderBy).build();
attrSourcesForOrderBy.forEach(executionContext::removePendingSelectionSource);
}

// Select agg attributes from sources in order by
Set<String> metricSourcesForOrderBy =
executionContext.getPendingMetricAggregationSourcesForOrderBy();
if (!metricSourcesForOrderBy.isEmpty()) {
rootNode =
new JoinNode.Builder(rootNode)
.setAggMetricSelectionSources(metricSourcesForOrderBy)
.build();
metricSourcesForOrderBy.forEach(executionContext::removePendingMetricAggregationSources);
}

return buildPaginationAndSelectionsNode(executionContext, rootNode);
}

private QueryNode buildPaginationAndSelectionsNode(
EntityExecutionContext executionContext, QueryNode rootNode) {
// Try adding SortAndPaginateNode
rootNode = checkAndAddSortAndPaginationNode(rootNode, executionContext);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.hypertrace.gateway.service.entity.query;

import java.util.Collections;
import java.util.Set;
import org.hypertrace.gateway.service.entity.query.visitor.Visitor;

public class JoinNode implements QueryNode {
private final Set<String> attrSelectionSources;
private final Set<String> aggMetricSelectionSources;

private final QueryNode childNode;

private JoinNode(
QueryNode childNode,
Set<String> attrSelectionSources,
Set<String> aggMetricSelectionSources) {
this.attrSelectionSources = attrSelectionSources;
this.aggMetricSelectionSources = aggMetricSelectionSources;
this.childNode = childNode;
}

public Set<String> getAttrSelectionSources() {
return attrSelectionSources;
}

public Set<String> getAggMetricSelectionSources() {
return aggMetricSelectionSources;
}

public QueryNode getChildNode() {
return childNode;
}

@Override
public <R> R acceptVisitor(Visitor<R> v) {
return v.visit(this);
}

@Override
public String toString() {
return "SelectionNode{"
+ "attrSelectionSources="
+ attrSelectionSources
+ ", aggMetricSelectionSources="
+ aggMetricSelectionSources
+ ", childNode="
+ childNode
+ '}';
}

public static class Builder {
private final QueryNode childNode;
private Set<String> attrSelectionSources = Collections.emptySet();
private Set<String> aggMetricSelectionSources = Collections.emptySet();

public Builder(QueryNode childNode) {
this.childNode = childNode;
}

public JoinNode.Builder setAttrSelectionSources(Set<String> attrSelectionSources) {
this.attrSelectionSources = attrSelectionSources;
return this;
}

public JoinNode.Builder setAggMetricSelectionSources(Set<String> aggMetricSelectionSources) {
this.aggMetricSelectionSources = aggMetricSelectionSources;
return this;
}

public JoinNode build() {
return new JoinNode(childNode, attrSelectionSources, aggMetricSelectionSources);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.hypertrace.gateway.service.entity.query.AndNode;
import org.hypertrace.gateway.service.entity.query.DataFetcherNode;
import org.hypertrace.gateway.service.entity.query.EntityExecutionContext;
import org.hypertrace.gateway.service.entity.query.JoinNode;
import org.hypertrace.gateway.service.entity.query.NoOpNode;
import org.hypertrace.gateway.service.entity.query.OrNode;
import org.hypertrace.gateway.service.entity.query.PaginateOnlyNode;
Expand Down Expand Up @@ -119,6 +120,11 @@ public Void visit(PaginateOnlyNode paginateOnlyNode) {
return paginateOnlyNode.getChildNode().acceptVisitor(this);
}

@Override
public Void visit(JoinNode joinNode) {
return joinNode.getChildNode().acceptVisitor(this);
}

private Set<String> getRedundantPendingSelectionSources(
Set<String> fetchedAttributes,
Set<String> pendingAttributeSelectionSources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -29,6 +30,7 @@
import org.hypertrace.gateway.service.entity.query.AndNode;
import org.hypertrace.gateway.service.entity.query.DataFetcherNode;
import org.hypertrace.gateway.service.entity.query.EntityExecutionContext;
import org.hypertrace.gateway.service.entity.query.JoinNode;
import org.hypertrace.gateway.service.entity.query.NoOpNode;
import org.hypertrace.gateway.service.entity.query.OrNode;
import org.hypertrace.gateway.service.entity.query.PaginateOnlyNode;
Expand Down Expand Up @@ -93,6 +95,25 @@ protected static EntityResponse intersect(List<EntityResponse> entityResponses)
return new EntityResponse(entityFetcherResponse, entityFetcherResponse.size());
}

private static EntityFetcherResponse mergeEntities(
EntityFetcherResponse rootEntityResponse, EntityFetcherResponse otherResponse) {
return new EntityFetcherResponse(
rootEntityResponse.getEntityKeyBuilderMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Entry::getKey,
entry -> {
Map<EntityKey, Builder> entityKeyBuilderMap =
otherResponse.getEntityKeyBuilderMap();
if (entityKeyBuilderMap.containsKey(entry.getKey())) {
return entry
.getValue()
.mergeFrom(entityKeyBuilderMap.get(entry.getKey()).build());
}
return entry.getValue();
})));
}

private static EntityFetcherResponse unionEntities(List<EntityFetcherResponse> builders) {
return new EntityFetcherResponse(
builders.stream()
Expand Down Expand Up @@ -209,6 +230,92 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) {
}
}

@Override
public EntityResponse visit(JoinNode joinNode) {
EntityResponse childNodeResponse = joinNode.getChildNode().acceptVisitor(this);
EntityFetcherResponse childEntityFetcherResponse = childNodeResponse.getEntityFetcherResponse();
// If the result was empty when the filter is non-empty, it means no entities matched the filter
// and hence no need to do any more follow up calls.
if (childEntityFetcherResponse.isEmpty()
&& !Filter.getDefaultInstance().equals(executionContext.getEntitiesRequest().getFilter())) {
LOG.debug("No results matched the filter so not fetching aggregate/timeseries metrics.");
return childNodeResponse;
}

List<EntityFetcherResponse> resultMapList = new ArrayList<>();
resultMapList.addAll(
joinNode.getAttrSelectionSources().parallelStream()
.map(
source -> {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
// TODO: Should we push order by, limit and offet down to the data source?
// If we want to push the order by down, we would also have to divide
// order by into sourceToOrderBySelectionExpressionMap,
// sourceToOrderByMetricExpressionMap, sourceToOrderByTimeAggregationMap
.clearOrderBy()
.clearLimit()
.clearOffset()
.addAllSelection(
executionContext
.getExpressionContext()
.getSourceToSelectionExpressionMap()
.get(source))
.setFilter(addSourceFilters(executionContext, source))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add id node filter if say ids lenght is less than 10K or something?

.build();
IEntityFetcher entityFetcher = queryHandlerRegistry.getEntityFetcher(source);
EntitiesRequestContext context =
new EntitiesRequestContext(
executionContext.getEntitiesRequestContext().getGrpcContext(),
request.getStartTimeMillis(),
request.getEndTimeMillis(),
request.getEntityType(),
executionContext.getTimestampAttributeId());
return entityFetcher.getEntities(context, request);
})
.collect(Collectors.toList()));
resultMapList.addAll(
joinNode.getAggMetricSelectionSources().parallelStream()
.map(
source -> {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
.clearOffset()
.clearLimit()
.addAllSelection(
executionContext
.getExpressionContext()
.getSourceToMetricExpressionMap()
.get(source))
.setFilter(addSourceFilters(executionContext, source))
.build();
IEntityFetcher entityFetcher = queryHandlerRegistry.getEntityFetcher(source);
EntitiesRequestContext context =
new EntitiesRequestContext(
executionContext.getEntitiesRequestContext().getGrpcContext(),
request.getStartTimeMillis(),
request.getEndTimeMillis(),
request.getEntityType(),
executionContext.getTimestampAttributeId());
return entityFetcher.getEntities(context, request);
})
.collect(Collectors.toList()));

EntityFetcherResponse response =
resultMapList.stream()
.reduce(new EntityFetcherResponse(), (r1, r2) -> unionEntities(Arrays.asList(r1, r2)));

return new EntityResponse(
mergeEntities(childEntityFetcherResponse, response), childNodeResponse.getTotal());
}

@Override
public EntityResponse visit(AndNode andNode) {
return intersect(
Expand Down Expand Up @@ -357,6 +464,10 @@ public EntityResponse visit(SelectionNode selectionNode) {
}
}

private Filter addSourceFilters(EntityExecutionContext executionContext, String source) {
return addSourceFilters(executionContext, source, null);
}

private Filter addSourceFilters(
EntityExecutionContext executionContext, String source, Filter filter) {
Optional<Filter> sourceFilterOptional =
Expand All @@ -365,6 +476,10 @@ private Filter addSourceFilters(
.getExpressionContext()
.getSourceToFilterMap()
.get(AttributeSource.valueOf(source)));
if (filter == null) {
return sourceFilterOptional.orElse(Filter.getDefaultInstance());
}

return sourceFilterOptional
.map(
sourceFilter ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.stream.Stream;
import org.hypertrace.gateway.service.entity.query.AndNode;
import org.hypertrace.gateway.service.entity.query.DataFetcherNode;
import org.hypertrace.gateway.service.entity.query.JoinNode;
import org.hypertrace.gateway.service.entity.query.NoOpNode;
import org.hypertrace.gateway.service.entity.query.OrNode;
import org.hypertrace.gateway.service.entity.query.PaginateOnlyNode;
Expand Down Expand Up @@ -194,4 +195,9 @@ public QueryNode visit(PaginateOnlyNode paginateOnlyNode) {
return new PaginateOnlyNode(
childNode, paginateOnlyNode.getLimit(), paginateOnlyNode.getOffset());
}

@Override
public QueryNode visit(JoinNode joinNode) {
return joinNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.stream.Collectors;
import org.hypertrace.gateway.service.entity.query.AndNode;
import org.hypertrace.gateway.service.entity.query.DataFetcherNode;
import org.hypertrace.gateway.service.entity.query.JoinNode;
import org.hypertrace.gateway.service.entity.query.NoOpNode;
import org.hypertrace.gateway.service.entity.query.OrNode;
import org.hypertrace.gateway.service.entity.query.PaginateOnlyNode;
Expand Down Expand Up @@ -66,4 +67,9 @@ public String visit(PaginateOnlyNode paginateOnlyNode) {
+ ") --> \n"
+ paginateOnlyNode.getChildNode().acceptVisitor(this);
}

@Override
public String visit(JoinNode joinNode) {
return "JOIN_NODE(" + joinNode + ") --> \n" + joinNode.getChildNode().acceptVisitor(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.hypertrace.gateway.service.entity.query.AndNode;
import org.hypertrace.gateway.service.entity.query.DataFetcherNode;
import org.hypertrace.gateway.service.entity.query.JoinNode;
import org.hypertrace.gateway.service.entity.query.NoOpNode;
import org.hypertrace.gateway.service.entity.query.OrNode;
import org.hypertrace.gateway.service.entity.query.PaginateOnlyNode;
Expand All @@ -28,4 +29,6 @@ public interface Visitor<R> {
R visit(NoOpNode noOpNode);

R visit(PaginateOnlyNode paginateOnlyNode);

R visit(JoinNode joinNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public static Value getStringValue(String value) {
return Value.newBuilder().setString(value).setValueType(ValueType.STRING).build();
}

public static Value getLongValue(long value) {
return Value.newBuilder().setLong(value).setValueType(ValueType.LONG).build();
}

public static AggregatedMetricValue getAggregatedMetricValue(
FunctionType functionType, double value) {
return AggregatedMetricValue.newBuilder()
Expand Down
Loading
Loading