Skip to content

Commit

Permalink
SOLR-17447 : Support for maxHitsPerShard.
Browse files Browse the repository at this point in the history
Adds the capability to limit hits per shard.

"maxHitsPerShard" request parameter controls how many hits the searcher should run over per shard. Once the searcher runs over the specfied number of documents, it will terminate the search with EarlyTerminatingCollectorException. This will be indicated by a new response header "terminatedEarly" also the "partialResults" will indicate that the results are partial.
This parameter is supported in MT mode as well.

Though there are other mechanisms to control runaway queries with CPU usage limits and time limits, this is simpler for certain use cases esp in case high recall queries and rerank use cases.
  • Loading branch information
Siju Varghese committed Jan 10, 2025
1 parent 440e70d commit e48c636
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,17 @@ public void process(ResponseBuilder rb) throws IOException {
params.getBool(
CommonParams.SEGMENT_TERMINATE_EARLY, CommonParams.SEGMENT_TERMINATE_EARLY_DEFAULT));

// max hits per shard
final Integer maxHits = params.getInt(CommonParams.MAX_HITS_PER_SHARD);

if (maxHits != null) {
int hitsPerShard = Math.max(maxHits, cmd.getLen());
if (cmd.getMinExactCount() < Integer.MAX_VALUE) {
hitsPerShard = Math.max(cmd.getMinExactCount(), hitsPerShard);
}
cmd.setMaxHits(hitsPerShard);
}

//
// grouping / field collapsing
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ public void setResult(QueryResult result) {
SolrQueryResponse.RESPONSE_HEADER_SEGMENT_TERMINATED_EARLY_KEY,
segmentTerminatedEarly);
}
final Boolean terminatedEarly = result.getTerminatedEarly();
if (terminatedEarly != null) {
rsp.getResponseHeader()
.add(
SolrQueryResponse.RESPONSE_HEADER_TERMINATED_EARLY_KEY,
terminatedEarly);
}
if (null != cursorMark) {
assert null != result.getNextCursorMark() : "using cursor but no next cursor set";
this.setNextCursorMark(result.getNextCursorMark());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class SolrQueryResponse {
public static final String RESPONSE_HEADER_PARTIAL_RESULTS_DETAILS_KEY = "partialResultsDetails";
public static final String RESPONSE_HEADER_SEGMENT_TERMINATED_EARLY_KEY =
"segmentTerminatedEarly";
public static final String RESPONSE_HEADER_TERMINATED_EARLY_KEY =
"terminatedEarly";
public static final String RESPONSE_HEADER_KEY = "responseHeader";
private static final String RESPONSE_KEY = "response";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.solr.search;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FilterCollector;
Expand All @@ -29,11 +31,16 @@
*/
public class EarlyTerminatingCollector extends FilterCollector {

private final int chunkSize = 100; // Check across threads only at a chunk size

private final int maxDocsToCollect;

private int numCollected = 0;
private int prevReaderCumulativeSize = 0;
private int currentReaderSize = 0;
private final AtomicInteger pendingDocsToCollect;
private boolean terminatedEarly = false;


/**
* Wraps a {@link Collector}, throwing {@link EarlyTerminatingCollectorException} once the
Expand All @@ -43,13 +50,14 @@ public class EarlyTerminatingCollector extends FilterCollector {
* @param maxDocsToCollect - the maximum number of documents to Collect
*/
public EarlyTerminatingCollector(Collector delegate, int maxDocsToCollect) {
super(delegate);
assert 0 < maxDocsToCollect;
assert null != delegate;
this(delegate,maxDocsToCollect, new AtomicInteger(maxDocsToCollect));
}

public EarlyTerminatingCollector(Collector delegate, int maxDocsToCollect,AtomicInteger docsToCollect) {
super(delegate);
this.maxDocsToCollect = maxDocsToCollect;
this.pendingDocsToCollect = docsToCollect;
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
prevReaderCumulativeSize += currentReaderSize; // not current any more
Expand All @@ -61,11 +69,22 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
public void collect(int doc) throws IOException {
super.collect(doc);
numCollected++;
if (maxDocsToCollect <= numCollected) {
throw new EarlyTerminatingCollectorException(
numCollected, prevReaderCumulativeSize + (doc + 1));
terminatedEarly = maxDocsToCollect <= numCollected;
if (numCollected % chunkSize == 0) {
terminatedEarly = pendingDocsToCollect.addAndGet(-1 * chunkSize) < 0;
}
if (terminatedEarly) {
throw new EarlyTerminatingCollectorException(maxDocsToCollect,
prevReaderCumulativeSize + (doc + 1));
}
}
};
}
public boolean isTerminatedEarly() {
return terminatedEarly;
}

public Collector getDelegate() {
return super.in;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
*/
public class EarlyTerminatingCollectorException extends RuntimeException {
private static final long serialVersionUID = 5939241340763428118L;
private int numberScanned;
private int numberCollected;
private final int numberScanned;
private final int numberCollected;

public EarlyTerminatingCollectorException(int numberCollected, int numberScanned) {
assert numberCollected <= numberScanned : numberCollected + "<=" + numberScanned;
assert 0 < numberCollected;
assert 0 < numberScanned;

this.numberCollected = numberCollected;
this.numberScanned = numberScanned;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ SearchResult searchCollectorManagers(

@SuppressWarnings({"unchecked", "rawtypes"})
CollectorManager<Collector, Object>[] colls = collectors.toArray(new CollectorManager[0]);
SolrMultiCollectorManager manager = new SolrMultiCollectorManager(colls);
final SolrMultiCollectorManager manager = new SolrMultiCollectorManager(cmd, colls);
boolean isTerminatedEarly = false;
Object[] ret;
try {
ret = searcher.search(query, manager);
} catch (Exception ex) {
if (ex instanceof RuntimeException
if (ex instanceof EarlyTerminatingCollectorException) {
ret = manager.reduce();
isTerminatedEarly = true;
} else if (ex instanceof RuntimeException
&& ex.getCause() != null
&& ex.getCause() instanceof ExecutionException
&& ex.getCause().getCause() != null
Expand All @@ -108,7 +112,7 @@ SearchResult searchCollectorManagers(

ScoreMode scoreMode = SolrMultiCollectorManager.scoreMode(firstCollectors);

return new SearchResult(scoreMode, ret);
return new SearchResult(scoreMode, ret, isTerminatedEarly);
}

static boolean allowMT(DelegatingCollector postFilter, QueryCommand cmd) {
Expand Down Expand Up @@ -190,10 +194,16 @@ public ScoreMode scoreMode() {
static class SearchResult {
final ScoreMode scoreMode;
private final Object[] result;
final boolean isTerminatedEarly;


public SearchResult(ScoreMode scoreMode, Object[] result) {
this(scoreMode, result, false);
}
public SearchResult(ScoreMode scoreMode, Object[] result,boolean isTerminatedEarly) {
this.scoreMode = scoreMode;
this.result = result;
this.isTerminatedEarly = isTerminatedEarly;
}

public TopDocsResult getTopDocsResult() {
Expand Down Expand Up @@ -255,7 +265,13 @@ public Object reduce(Collection collectors) throws IOException {
for (Iterator var4 = collectors.iterator();
var4.hasNext();
maxScore = Math.max(maxScore, collector.getMaxScore())) {
collector = (MaxScoreCollector) var4.next();
Collector next = (Collector) var4.next();
if (next instanceof EarlyTerminatingCollector) {
EarlyTerminatingCollector earlyTerminatingCollector = (EarlyTerminatingCollector) next;
next = earlyTerminatingCollector.getDelegate();
}
collector = (MaxScoreCollector) next;

}

return new MaxScoreResult(maxScore);
Expand Down Expand Up @@ -324,6 +340,10 @@ public Object reduce(Collection collectors) throws IOException {
Collector collector;
for (Object o : collectors) {
collector = (Collector) o;
if (collector instanceof EarlyTerminatingCollector) {
EarlyTerminatingCollector earlyTerminatingCollector = (EarlyTerminatingCollector) collector;
collector = earlyTerminatingCollector.getDelegate();
}
if (collector instanceof TopDocsCollector) {
TopDocs td = ((TopDocsCollector) collector).topDocs(0, len);
assert td != null : Arrays.asList(topDocs);
Expand Down
11 changes: 10 additions & 1 deletion solr/core/src/java/org/apache/solr/search/QueryCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class QueryCommand {
private int minExactCount = Integer.MAX_VALUE;
private CursorMark cursorMark;
private boolean distribStatsDisabled;
private int maxHits = Integer.MAX_VALUE;

public CursorMark getCursorMark() {
return cursorMark;
Expand Down Expand Up @@ -194,7 +195,7 @@ public QueryCommand setNeedDocSet(boolean needDocSet) {
}

public boolean getTerminateEarly() {
return (flags & SolrIndexSearcher.TERMINATE_EARLY) != 0;
return (flags & SolrIndexSearcher.TERMINATE_EARLY) != 0 || maxHits < Integer.MAX_VALUE;
}

public QueryCommand setTerminateEarly(boolean segmentTerminateEarly) {
Expand Down Expand Up @@ -245,4 +246,12 @@ public boolean isDistribStatsDisabled() {
public QueryResult search(SolrIndexSearcher searcher) throws IOException {
return searcher.search(this);
}

public int getMaxHits() {
return maxHits;
}

public void setMaxHits(int maxHits) {
this.maxHits = maxHits;
}
}
9 changes: 9 additions & 0 deletions solr/core/src/java/org/apache/solr/search/QueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class QueryResult {
// Object for back compatibility so that we render true not "true" in json
private Object partialResults;
private Boolean segmentTerminatedEarly;
private Boolean terminatedEarly;
private DocListAndSet docListAndSet;
private CursorMark nextCursorMark;

Expand Down Expand Up @@ -85,4 +86,12 @@ public void setNextCursorMark(CursorMark next) {
public CursorMark getNextCursorMark() {
return nextCursorMark;
}

public Boolean getTerminatedEarly() {
return terminatedEarly;
}

public void setTerminatedEarly(boolean terminatedEarly) {
this.terminatedEarly = terminatedEarly;
}
}
13 changes: 10 additions & 3 deletions solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private Collector buildAndRunCollectorChain(

final boolean terminateEarly = cmd.getTerminateEarly();
if (terminateEarly) {
collector = new EarlyTerminatingCollector(collector, cmd.getLen());
collector = new EarlyTerminatingCollector(collector, cmd.getMaxHits());
}

final long timeAllowed = cmd.getTimeAllowed();
Expand Down Expand Up @@ -329,7 +329,8 @@ private Collector buildAndRunCollectorChain(
if (collector instanceof DelegatingCollector) {
((DelegatingCollector) collector).complete();
}
throw etce;
qr.setPartialResults(true);

} finally {
if (earlyTerminatingSortingCollector != null) {
qr.setSegmentTerminatedEarly(earlyTerminatingSortingCollector.terminatedEarly());
Expand All @@ -338,6 +339,9 @@ private Collector buildAndRunCollectorChain(
if (cmd.isQueryCancellable()) {
core.getCancellableQueryTracker().removeCancellableQuery(cmd.getQueryID());
}
if (collector instanceof EarlyTerminatingCollector) {
qr.setTerminatedEarly(((EarlyTerminatingCollector) collector).isTerminatedEarly());
}
}
if (collector instanceof DelegatingCollector) {
((DelegatingCollector) collector).complete();
Expand Down Expand Up @@ -1979,7 +1983,10 @@ public ScoreMode scoreMode() {
MultiThreadedSearcher.TopDocsResult topDocsResult = searchResult.getTopDocsResult();
totalHits = topDocsResult.totalHits;
topDocs = topDocsResult.topDocs;

if(searchResult.isTerminatedEarly){
qr.setTerminatedEarly(true);
qr.setPartialResults(Boolean.TRUE);
}
maxScore = searchResult.getMaxScore(totalHits);
}

Expand Down
Loading

0 comments on commit e48c636

Please sign in to comment.