diff --git a/server/src/main/java/org/opensearch/action/search/SearchResponse.java b/server/src/main/java/org/opensearch/action/search/SearchResponse.java index 067141025afb4..de14db5cc187a 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/opensearch/action/search/SearchResponse.java @@ -33,6 +33,7 @@ package org.opensearch.action.search; import org.apache.lucene.search.TotalHits; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.StatusToXContentObject; @@ -63,6 +64,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -113,7 +115,11 @@ public SearchResponse(StreamInput in) throws IOException { clusters = new Clusters(in); scrollId = in.readOptionalString(); tookInMillis = in.readVLong(); - phaseTook = new PhaseTook(in); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + phaseTook = new PhaseTook(in); + } else { + phaseTook = PhaseTook.NULL; + } skippedShards = in.readVInt(); pointInTimeId = in.readOptionalString(); } @@ -428,34 +434,23 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE } } } else if (PhaseTook.PHASE_TOOK.match(currentFieldName, parser.getDeprecationHandler())) { - long dfsPreQueryTotal = -1; - long canMatchTotal = -1; - long queryTotal = -1; - long fetchTotal = -1; - long expandSearchTotal = -1; + Map phaseTookMap = new HashMap<>(); while ((token = parser.nextToken()) != Token.END_OBJECT) { if (token == Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token.isValue()) { - if (PhaseTook.DFS_PREQUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - dfsPreQueryTotal = parser.longValue(); // we don't need it but need to consume it - } else if (PhaseTook.CAN_MATCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - canMatchTotal = parser.longValue(); - } else if (PhaseTook.QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - queryTotal = parser.longValue(); - } else if (PhaseTook.FETCH_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - fetchTotal = parser.longValue(); - } else if (PhaseTook.EXPAND_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { - expandSearchTotal = parser.longValue(); - } else { + try { + Enum.valueOf(SearchPhaseName.class, currentFieldName); + phaseTookMap.put(currentFieldName, parser.longValue()); + } catch (final IllegalArgumentException ex) { parser.skipChildren(); } } else { parser.skipChildren(); } } - phaseTook = new PhaseTook(dfsPreQueryTotal, canMatchTotal, queryTotal, fetchTotal, expandSearchTotal); + phaseTook = new PhaseTook(phaseTookMap); } else if (Clusters._CLUSTERS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { int successful = -1; int total = -1; @@ -547,7 +542,9 @@ public void writeTo(StreamOutput out) throws IOException { clusters.writeTo(out); out.writeOptionalString(scrollId); out.writeVLong(tookInMillis); - phaseTook.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + phaseTook.writeTo(out); + } out.writeVInt(skippedShards); out.writeOptionalString(pointInTimeId); } @@ -668,89 +665,45 @@ public String toString() { * @opensearch.internal */ public static class PhaseTook implements ToXContentFragment, Writeable { - public static final PhaseTook NULL = new PhaseTook(-1, -1, -1, -1, -1); + public static final PhaseTook NULL = new PhaseTook(); static final ParseField PHASE_TOOK = new ParseField("phase_took"); - static final ParseField DFS_PREQUERY_FIELD = new ParseField("dfs_prequery"); - static final ParseField CAN_MATCH_FIELD = new ParseField("can_match"); - static final ParseField QUERY_FIELD = new ParseField("query"); - static final ParseField FETCH_FIELD = new ParseField("fetch"); - static final ParseField EXPAND_FIELD = new ParseField("expand_search"); - - private final long dfsPreQueryTotal; - private final long canMatchTotal; - private final long queryTotal; - private final long fetchTotal; - private final long expandSearchTotal; - - public PhaseTook(long dfsPreQueryTotal, long canMatchTotal, long queryTotal, long fetchTotal, long expandSearchTotal) { - this.dfsPreQueryTotal = dfsPreQueryTotal; - this.canMatchTotal = canMatchTotal; - this.queryTotal = queryTotal; - this.fetchTotal = fetchTotal; - this.expandSearchTotal = expandSearchTotal; + private final Map phaseStatsMap; + + private PhaseTook() { + Map nullPhaseTookMap = new HashMap<>(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + nullPhaseTookMap.put(searchPhaseName.getName(), (long) -1); + } + this.phaseStatsMap = nullPhaseTookMap; + } + + public PhaseTook(Map phaseStatsMap) { + this.phaseStatsMap = phaseStatsMap; } private PhaseTook(StreamInput in) throws IOException { - this(in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong()); + this(in.readMap(StreamInput::readString, StreamInput::readLong)); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeLong(dfsPreQueryTotal); - out.writeLong(canMatchTotal); - out.writeLong(queryTotal); - out.writeLong(fetchTotal); - out.writeLong(expandSearchTotal); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + out.writeLong(phaseStatsMap.get(searchPhaseName.getName())); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(PHASE_TOOK.getPreferredName()); - builder.field(DFS_PREQUERY_FIELD.getPreferredName(), dfsPreQueryTotal); - builder.field(CAN_MATCH_FIELD.getPreferredName(), canMatchTotal); - builder.field(QUERY_FIELD.getPreferredName(), queryTotal); - builder.field(FETCH_FIELD.getPreferredName(), fetchTotal); - builder.field(EXPAND_FIELD.getPreferredName(), expandSearchTotal); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + builder.field(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName.getName())); + } builder.endObject(); return builder; } - /** - * Returns time spent in DFS Prequery phase during the execution of the search request - */ - public long getDfsPreQueryTotal() { - return dfsPreQueryTotal; - } - - /** - * Returns time spent in canMatch phase during the execution of the search request - */ - public long getCanMatchTotal() { - return canMatchTotal; - } - - /** - * Returns time spent in query phase during the execution of the search request - */ - public long getQueryTotal() { - return queryTotal; - } - - /** - * Returns time spent in fetch phase during the execution of the search request - */ - public long getFetchTotal() { - return fetchTotal; - } - - /** - * Returns time spent in expand phase during the execution of the search request - */ - public long getExpandSearchTotal() { - return expandSearchTotal; - } - @Override public boolean equals(Object o) { if (this == o) { @@ -760,16 +713,19 @@ public boolean equals(Object o) { return false; } PhaseTook phaseTook = (PhaseTook) o; - return dfsPreQueryTotal == phaseTook.dfsPreQueryTotal - && queryTotal == phaseTook.queryTotal - && canMatchTotal == phaseTook.canMatchTotal - && fetchTotal == phaseTook.fetchTotal - && expandSearchTotal == phaseTook.expandSearchTotal; + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + String searchPhaseNameString = searchPhaseName.getName(); + if (!phaseStatsMap.get(searchPhaseNameString).equals(phaseTook.phaseStatsMap.get(searchPhaseNameString))) { + return false; + } + } + return true; } @Override public int hashCode() { - return Objects.hash(dfsPreQueryTotal, queryTotal, canMatchTotal, fetchTotal, expandSearchTotal); + return Objects.hash(phaseStatsMap); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 678bad222fd8a..b476a342e9232 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -57,7 +57,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.unit.TimeValue; @@ -99,6 +98,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -111,7 +111,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -293,6 +292,9 @@ static final class SearchTimeProvider implements SearchRequestOperationsListener this.absoluteStartMillis = absoluteStartMillis; this.relativeStartNanos = relativeStartNanos; this.relativeCurrentNanosProvider = relativeCurrentNanosProvider; + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseStatsMap.put(searchPhaseName, 0L); + } } long getAbsoluteStartMillis() { @@ -313,107 +315,32 @@ public boolean isPhaseTookEnabled() { SearchResponse.PhaseTook getPhaseTook() { if (phaseTookEnabled) { - return new SearchResponse.PhaseTook( - totalStats.dfsPreQueryTotal.count(), - totalStats.canMatchTotal.count(), - totalStats.queryTotal.count(), - totalStats.fetchTotal.count(), - totalStats.expandSearchTotal.count() - ); + Map phaseTookMap = new HashMap<>(); + // Convert Map to Map for SearchResponse() + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName)); + } + return new SearchResponse.PhaseTook(phaseTookMap); } else { return SearchResponse.PhaseTook.NULL; } } - public SearchTimeProvider.RequestStatsHolder totalStats = new RequestStatsHolder(); - - public static final class RequestStatsHolder { - public CounterMetric dfsPreQueryTotal = new CounterMetric(); - public CounterMetric canMatchTotal = new CounterMetric(); - public CounterMetric queryTotal = new CounterMetric(); - public CounterMetric fetchTotal = new CounterMetric(); - public CounterMetric expandSearchTotal = new CounterMetric(); - } - - // call these to populate return values - public long getDFSPreQueryTotal() { - return totalStats.dfsPreQueryTotal.count(); - } - - public long getCanMatchTotal() { - return totalStats.canMatchTotal.count(); - } - - public long getQueryTotal() { - return totalStats.queryTotal.count(); - } - - public long getFetchTotal() { - return totalStats.fetchTotal.count(); - } - - public long getExpandSearchTotal() { - return totalStats.expandSearchTotal.count(); - } - - private void computeStats(Consumer consumer) { - consumer.accept(totalStats); - } - - @Override - public void onDFSPreQueryPhaseStart(SearchPhaseContext context) {} - - @Override - public void onDFSPreQueryPhaseEnd(SearchPhaseContext context, long tookTime) { - computeStats(statsHolder -> { totalStats.dfsPreQueryTotal.inc(tookTime); }); - } - - @Override - public void onDFSPreQueryPhaseFailure(SearchPhaseContext context) {} - - @Override - public void onCanMatchPhaseStart(SearchPhaseContext context) {} - - @Override - public void onCanMatchPhaseEnd(SearchPhaseContext context, long tookTime) { - computeStats(statsHolder -> { totalStats.canMatchTotal.inc(tookTime); }); - } - - @Override - public void onCanMatchPhaseFailure(SearchPhaseContext context) {} - - @Override - public void onQueryPhaseStart(SearchPhaseContext context) {} - - @Override - public void onQueryPhaseEnd(SearchPhaseContext context, long tookTime) { - computeStats(statsHolder -> { totalStats.queryTotal.inc(tookTime); }); - } - - @Override - public void onQueryPhaseFailure(SearchPhaseContext context) {} - - @Override - public void onFetchPhaseStart(SearchPhaseContext context) {} - - @Override - public void onFetchPhaseEnd(SearchPhaseContext context, long tookTime) { - computeStats(statsHolder -> { totalStats.fetchTotal.inc(tookTime); }); - } - - @Override - public void onFetchPhaseFailure(SearchPhaseContext context) {} + Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); @Override - public void onExpandSearchPhaseStart(SearchPhaseContext context) {} + public void onPhaseStart(SearchPhaseContext context) {} @Override - public void onExpandSearchPhaseEnd(SearchPhaseContext context, long tookTime) { - computeStats(statsHolder -> { totalStats.expandSearchTotal.inc(tookTime); }); + public void onPhaseEnd(SearchPhaseContext context) { + phaseStatsMap.put( + context.getCurrentPhase().getSearchPhaseName(), + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()) + ); } @Override - public void onExpandSearchPhaseFailure(SearchPhaseContext context) {} + public void onPhaseFailure(SearchPhaseContext context) {} } @Override @@ -455,13 +382,6 @@ public void executeRequest( SinglePhaseSearchAction phaseSearchAction, ActionListener listener ) { - final List searchListenersList = createSearchListenerList(); - final SearchRequestOperationsListener searchRequestOperationsListener; - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); - } else { - searchRequestOperationsListener = null; - } executeRequest(task, searchRequest, new SearchAsyncActionProvider() { @Override public AbstractSearchAsyncAction asyncSearchAction( @@ -478,14 +398,9 @@ public AbstractSearchAsyncAction asyncSearchAction( ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { - if (searchRequest.isPhaseTookQueryParamEnabled() == SearchRequest.ParamValue.TRUE - || (searchRequest.isPhaseTookQueryParamEnabled() == SearchRequest.ParamValue.UNSET - && clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED))) { - timeProvider.setPhaseTookEnabled(true); - } - return new AbstractSearchAsyncAction( actionName, logger, @@ -548,11 +463,23 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); + // 2nd executeRequest(), called by 1st executeRequest() + final List searchListenersList = createSearchListenerList(); + if (originalSearchRequest.isPhaseTookQueryParamEnabled() == SearchRequest.ParamValue.TRUE || (originalSearchRequest.isPhaseTookQueryParamEnabled() == SearchRequest.ParamValue.UNSET && clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED))) { timeProvider.setPhaseTookEnabled(true); + searchListenersList.add(timeProvider); } + + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } + PipelinedRequest searchRequest; ActionListener listener; try { @@ -596,7 +523,8 @@ private void executeRequest( clusterState, listener, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); } else { if (shouldMinimizeRoundtrips(searchRequest)) { @@ -617,7 +545,8 @@ private void executeRequest( clusterState, l, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ) ); } else { @@ -667,7 +596,8 @@ private void executeRequest( listener, new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()), searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); }, listener::onFailure) ); @@ -946,7 +876,8 @@ private void executeLocalSearch( ClusterState clusterState, ActionListener listener, SearchContextId searchContext, - SearchAsyncActionProvider searchAsyncActionProvider + SearchAsyncActionProvider searchAsyncActionProvider, + SearchRequestOperationsListener searchRequestOperationsListener ) { executeSearch( (SearchTask) task, @@ -960,7 +891,8 @@ private void executeLocalSearch( listener, SearchResponse.Clusters.EMPTY, searchContext, - searchAsyncActionProvider + searchAsyncActionProvider, + searchRequestOperationsListener ); } @@ -1078,7 +1010,8 @@ private void executeSearch( ActionListener listener, SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext, - SearchAsyncActionProvider searchAsyncActionProvider + SearchAsyncActionProvider searchAsyncActionProvider, + SearchRequestOperationsListener searchRequestOperationsListener ) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -1179,7 +1112,8 @@ private void executeSearch( listener, preFilterSearchShards, threadPool, - clusters + clusters, + searchRequestOperationsListener ).start(); } @@ -1262,7 +1196,8 @@ AbstractSearchAsyncAction asyncSearchAction( ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ); } @@ -1288,15 +1223,9 @@ private AbstractSearchAsyncAction searchAsyncAction ActionListener listener, boolean preFilter, ThreadPool threadPool, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { - final List searchListenersList = createSearchListenerList(); - final SearchRequestOperationsListener searchRequestOperationsListener; - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); - } else { - searchRequestOperationsListener = null; - } if (preFilter) { return new CanMatchPreFilterSearchPhase( logger, @@ -1327,7 +1256,8 @@ private AbstractSearchAsyncAction searchAsyncAction listener, false, threadPool, - clusters + clusters, + searchRequestOperationsListener ); return new SearchPhase(action.getName()) { @Override