From 8283fc31890cb8339ec8a5324a120904f76f86c5 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Fri, 31 Jan 2025 10:11:15 -0800 Subject: [PATCH] Add dedicated threadpool for search powered by streams Signed-off-by: Rishabh Maurya --- .../search/QueryPhaseResultConsumer.java | 9 +++++- .../action/search/TransportSearchAction.java | 1 + .../search/query/StreamSearchPhase.java | 1 + .../org/opensearch/threadpool/ThreadPool.java | 28 +++++++++++++++++++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java index b4879c2e4b6dd..62966d859b785 100644 --- a/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java @@ -56,6 +56,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -75,6 +76,8 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults streamExecutor = Optional.empty(); + private final CircuitBreaker circuitBreaker; private final SearchPhaseController controller; private final SearchProgressListener progressListener; @@ -120,6 +123,10 @@ public QueryPhaseResultConsumer( this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo()); } + public void setSearchStreamExecutor(Executor searchStreamExecutor) { + this.streamExecutor = Optional.of(searchStreamExecutor); + } + @Override public void close() { Releasables.close(pendingMerges); @@ -154,7 +161,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { results.asList().stream().map(r -> (StreamSearchResult) r).collect(Collectors.toList()), aggReduceContextBuilder, performFinalReduce, - executor + streamExecutor.orElse(executor) ); logger.info("Will reduce results for {}", results.get(0)); } else { diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 5d7279a55073c..3c558e321e93d 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -1267,6 +1267,7 @@ private AbstractSearchAsyncAction searchAsyncAction shardIterators.size(), exc -> cancelTask(task, exc) ); + queryResultConsumer.setSearchStreamExecutor(threadPool.executor(ThreadPool.Names.SEARCH_STREAM)); AbstractSearchAsyncAction searchAsyncAction; switch (searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: diff --git a/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java index 9b5fa2558af1e..a6444c684e9c4 100644 --- a/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java +++ b/server/src/main/java/org/opensearch/search/query/StreamSearchPhase.java @@ -197,6 +197,7 @@ public VectorSchemaRoot createRoot(BufferAllocator allocator) { Field countField = new Field("count", FieldType.nullable(new ArrowType.Int(64, false)), null); arrowFields.put("count", countField); arrowFields.put("ord", new Field("ord", FieldType.nullable(new ArrowType.Utf8()), null)); + schema[0] = new Schema(arrowFields.values()); root[0] = Optional.of(VectorSchemaRoot.create(schema[0], allocator)); return root[0].get(); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 59d3b110aeca8..38f165bc4a80a 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -104,6 +104,8 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String SEARCH_STREAM = "search_stream"; + public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; @@ -181,6 +183,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.ANALYZE, ThreadPoolType.FIXED); map.put(Names.WRITE, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.RESIZABLE); + map.put(Names.SEARCH_STREAM, ThreadPoolType.SCALING); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -235,6 +238,19 @@ public Collection builders() { Setting.Property.NodeScope ); + public static final Setting SEARCH_STREAM_CORE_POOL_SIZE = Setting.intSetting( + "thread_pool.search_stream.core", + 4, + 1, + Setting.Property.NodeScope + ); + public static final Setting SEARCH_STREAM_MAX_POOL_SIZE = Setting.intSetting( + "thread_pool.search_stream.max", + 64, + 1, + Setting.Property.NodeScope + ); + public ThreadPool(final Settings settings, final ExecutorBuilder... customBuilders) { this(settings, null, customBuilders); } @@ -261,6 +277,18 @@ public ThreadPool( Names.SEARCH, new ResizableExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, runnableTaskListener) ); + builders.put( + Names.SEARCH_STREAM, + new ScalingExecutorBuilder( + Names.SEARCH_STREAM, + SEARCH_STREAM_CORE_POOL_SIZE.get(settings), + Math.max(allocatedProcessors * 4, SEARCH_STREAM_MAX_POOL_SIZE.get(settings)), // important for handling concurrent streams + // executions + TimeValue.timeValueSeconds(120) // this is an important setting for streaming use cases where threads go into waiting state + // quite often + ) + ); + builders.put(Names.SEARCH_THROTTLED, new ResizableExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, runnableTaskListener)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded