Skip to content

Commit

Permalink
Add dedicated threadpool for search powered by streams
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Jan 31, 2025
1 parent f28a036 commit 9076e85
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -75,6 +76,8 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
private static final Logger logger = LogManager.getLogger(QueryPhaseResultConsumer.class);

private final Executor executor;
private Optional<Executor> streamExecutor = Optional.empty();

private final CircuitBreaker circuitBreaker;
private final SearchPhaseController controller;
private final SearchProgressListener progressListener;
Expand Down Expand Up @@ -120,6 +123,10 @@ public QueryPhaseResultConsumer(
this.pendingMerges = new PendingMerges(batchReduceSize, request.resolveTrackTotalHitsUpTo());
}

public void setSearchStreamExecutor(Executor searchStreamExecutor) {
this.streamExecutor = Optional.of(searchStreamExecutor);
}

@Override
public void close() {
Releasables.close(pendingMerges);
Expand Down Expand Up @@ -154,7 +161,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
results.asList().stream().map(r -> (StreamSearchResult) r).collect(Collectors.toList()),
aggReduceContextBuilder,
performFinalReduce,
executor
streamExecutor.orElse(executor)
);
logger.info("Will reduce results for {}", results.get(0));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
tracer
);
} else {
;
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(
executor,
circuitBreaker,
Expand All @@ -1267,6 +1268,7 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
shardIterators.size(),
exc -> cancelTask(task, exc)
);
queryResultConsumer.setSearchStreamExecutor(threadPool.executor(ThreadPool.Names.SEARCH_STREAM));
AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction;
switch (searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public VectorSchemaRoot createRoot(BufferAllocator allocator) {
Field countField = new Field("count", FieldType.nullable(new ArrowType.Int(64, false)), null);
arrowFields.put("count", countField);
arrowFields.put("ord", new Field("ord", FieldType.nullable(new ArrowType.Utf8()), null));

schema[0] = new Schema(arrowFields.values());
root[0] = Optional.of(VectorSchemaRoot.create(schema[0], allocator));
return root[0].get();
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public static class Names {
public static final String ANALYZE = "analyze";
public static final String WRITE = "write";
public static final String SEARCH = "search";
public static final String SEARCH_STREAM = "search_stream";

public static final String SEARCH_THROTTLED = "search_throttled";
public static final String MANAGEMENT = "management";
public static final String FLUSH = "flush";
Expand Down Expand Up @@ -181,6 +183,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.ANALYZE, ThreadPoolType.FIXED);
map.put(Names.WRITE, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.RESIZABLE);
map.put(Names.SEARCH_STREAM, ThreadPoolType.RESIZABLE);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
Expand Down Expand Up @@ -235,6 +238,19 @@ public Collection<ExecutorBuilder> builders() {
Setting.Property.NodeScope
);

public static final Setting<Integer> SEARCH_STREAM_CORE_POOL_SIZE = Setting.intSetting(
"thread_pool.search_stream.core",
4,
1,
Setting.Property.NodeScope
);
public static final Setting<Integer> SEARCH_STREAM_MAX_POOL_SIZE = Setting.intSetting(
"thread_pool.search_stream.max",
64,
1,
Setting.Property.NodeScope
);

public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
this(settings, null, customBuilders);
}
Expand All @@ -261,6 +277,18 @@ public ThreadPool(
Names.SEARCH,
new ResizableExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, runnableTaskListener)
);
builders.put(
Names.SEARCH_STREAM,
new ScalingExecutorBuilder(
Names.SEARCH_STREAM,
SEARCH_STREAM_CORE_POOL_SIZE.get(settings),
Math.max(allocatedProcessors * 4, SEARCH_STREAM_MAX_POOL_SIZE.get(settings)), // important for handling concurrent streams
// executions
TimeValue.timeValueSeconds(120) // this is an important setting for streaming use cases where threads go into waiting state
// quite often
)
);

builders.put(Names.SEARCH_THROTTLED, new ResizableExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, runnableTaskListener));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
Expand Down

0 comments on commit 9076e85

Please sign in to comment.