Skip to content

Commit

Permalink
Make searchRequestOperationsListener part of SearchRequestContext
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Nov 1, 2023
1 parent 8171745 commit 677f453
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -121,8 +120,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten

private final List<Releasable> releasables = new ArrayList<>();

private final Optional<SearchRequestOperationsListener> searchRequestOperationsListener;

AbstractSearchAsyncAction(
String name,
Logger logger,
Expand All @@ -141,7 +138,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
SearchPhaseResults<Result> resultConsumer,
int maxConcurrentRequestsPerNode,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
super(name);
Expand Down Expand Up @@ -178,7 +174,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.indexRoutings = indexRoutings;
this.results = resultConsumer;
this.clusters = clusters;
this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener);
this.searchRequestContext = searchRequestContext;
}

Expand Down Expand Up @@ -436,20 +431,16 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
}

private void onPhaseEnd(SearchRequestContext searchRequestContext) {
this.searchRequestOperationsListener.ifPresent(
searchRequestOperations -> searchRequestOperations.onPhaseEnd(this, searchRequestContext)
);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
}

private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseStart(this));
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestOperationsListener.ifPresent(
searchRequestOperations -> searchRequestOperations.onRequestEnd(this, searchRequestContext)
);
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
}

private void executePhase(SearchPhase phase) {
Expand Down Expand Up @@ -729,7 +720,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this));
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
SearchTask task,
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
Expand All @@ -113,7 +112,6 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
new CanMatchSearchPhaseResults(shardsIts.size()),
shardsIts.size(),
clusters,
searchRequestOperationsListener,
searchRequestContext
);
this.phaseFactory = phaseFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final ClusterState clusterState,
final SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
super(
Expand All @@ -98,7 +97,6 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener,
searchRequestContext
);
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
ClusterState clusterState,
SearchTask task,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
super(
Expand All @@ -103,7 +102,6 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
resultConsumer,
request.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener,
searchRequestContext
);
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,69 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This class holds request-level context for search queries at the coordinator node
*
* @opensearch.internal
*/
public class SearchRequestContext {
private long absoluteStartNanos;
public final class SearchRequestContext {
private final SearchRequestOperationsListener searchRequestOperationsListener;
private final long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private Map<String, Integer> shardStats;

public final String SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL = "total";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL = "successful";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED = "skipped";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_FAILED = "failed";

public SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}

public SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.phaseTookMap = new HashMap<>();
this.totalHits = null;
this.absoluteStartNanos = System.nanoTime();
}

protected void updatePhaseTookMap(String phaseName, Long tookTime) {
this.phaseTookMap.put(phaseName, tookTime);
SearchRequestOperationsListener getSearchRequestOperationsListener() {
return searchRequestOperationsListener;
}

protected Map<String, Long> phaseTookMap() {
return phaseTookMap;
void updatePhaseTookMap(String phaseName, Long tookTime) {
this.phaseTookMap.put(phaseName, tookTime);
}

protected void setAbsoluteStartNanos(long absoluteStartNanos) {
this.absoluteStartNanos = absoluteStartNanos;
Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

protected long absoluteStartNanos() {
long getAbsoluteStartNanos() {
return absoluteStartNanos;
}

protected void setTotalHits(TotalHits totalHits) {
void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

protected TotalHits totalHits() {
TotalHits totalHits() {
return totalHits;
}

protected void setShardStats(Map<String, Integer> shardStats) {
void setShardStats(Map<String, Integer> shardStats) {
this.shardStats = shardStats;
}

protected Map<String, Integer> shardStats() {
return shardStats;
}

public final String SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL = "total";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL = "successful";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED = "skipped";
public final String SEARCH_REQUEST_SLOWLOG_SHARD_FAILED = "failed";

protected String formattedShardStats() {
String formattedShardStats() {
if (shardStats != null) {
return String.format(
"{%s: %s, %s: %s, %s: %s, %s: %s}",
Expand All @@ -82,6 +86,5 @@ protected String formattedShardStats() {
} else {
return "{}";
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.OpenSearchLogMessage;
import org.opensearch.common.logging.SlowLogLevel;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.SlowLogLevel;
import org.opensearch.tasks.Task;

import java.nio.charset.Charset;
Expand All @@ -53,7 +53,7 @@
import java.util.concurrent.TimeUnit;

/**
* The search time slow log implementation
* The request-level search slow log implementation
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -142,13 +142,11 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe
public void onPhaseFailure(SearchPhaseContext context) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {
searchRequestContext.setAbsoluteStartNanos(System.nanoTime());
}
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
long tookInNanos = System.nanoTime() - searchRequestContext.absoluteStartNanos();
long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos();

if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) {
logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,6 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
return new AbstractSearchAsyncAction<SearchPhaseResult>(
Expand All @@ -447,7 +446,6 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
new ArraySearchPhaseResults<>(shardsIts.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters,
searchRequestOperationsListener,
searchRequestContext
) {
@Override
Expand Down Expand Up @@ -494,12 +492,10 @@ private void executeRequest(
);

final List<SearchRequestOperationsListener> searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider);
final SearchRequestOperationsListener searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(
searchListenersList,
logger
SearchRequestContext searchRequestContext = new SearchRequestContext(
new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger)
);
SearchRequestContext searchRequestContext = new SearchRequestContext();
searchRequestOperationsListener.onRequestStart(searchRequestContext);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
Expand Down Expand Up @@ -553,7 +549,6 @@ private void executeRequest(
listener,
searchContext,
searchAsyncActionProvider,
searchRequestOperationsListener,
searchRequestContext
);
} else {
Expand All @@ -576,7 +571,6 @@ private void executeRequest(
l,
searchContext,
searchAsyncActionProvider,
searchRequestOperationsListener,
searchRequestContext
)
);
Expand Down Expand Up @@ -628,7 +622,6 @@ private void executeRequest(
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext,
searchAsyncActionProvider,
searchRequestOperationsListener,
searchRequestContext
);
}, listener::onFailure)
Expand Down Expand Up @@ -909,7 +902,6 @@ private void executeLocalSearch(
ActionListener<SearchResponse> listener,
SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
executeSearch(
Expand All @@ -925,7 +917,6 @@ private void executeLocalSearch(
SearchResponse.Clusters.EMPTY,
searchContext,
searchAsyncActionProvider,
searchRequestOperationsListener,
searchRequestContext
);
}
Expand Down Expand Up @@ -1045,7 +1036,6 @@ private void executeSearch(
SearchResponse.Clusters clusters,
@Nullable SearchContextId searchContext,
SearchAsyncActionProvider searchAsyncActionProvider,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
Expand Down Expand Up @@ -1148,7 +1138,6 @@ private void executeSearch(
preFilterSearchShards,
threadPool,
clusters,
searchRequestOperationsListener,
searchRequestContext
).start();
}
Expand Down Expand Up @@ -1233,7 +1222,6 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
);
}
Expand Down Expand Up @@ -1282,7 +1270,6 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters,
SearchRequestOperationsListener searchRequestOperationsListener,
SearchRequestContext searchRequestContext
) {
if (preFilter) {
Expand Down Expand Up @@ -1316,7 +1303,6 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
false,
threadPool,
clusters,
searchRequestOperationsListener,
searchRequestContext
);
return new SearchPhase(action.getName()) {
Expand All @@ -1327,7 +1313,6 @@ public void run() {
};
},
clusters,
searchRequestOperationsListener,
searchRequestContext
);
} else {
Expand Down Expand Up @@ -1359,7 +1344,6 @@ public void run() {
clusterState,
task,
clusters,
searchRequestOperationsListener,
searchRequestContext
);
break;
Expand All @@ -1381,7 +1365,6 @@ public void run() {
clusterState,
task,
clusters,
searchRequestOperationsListener,
searchRequestContext
);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* GitHub history for details.
*/

package org.opensearch.index;
package org.opensearch.common.logging;

import java.util.Locale;

Expand Down
Loading

0 comments on commit 677f453

Please sign in to comment.