Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve query iterator source management #2694

Open
wants to merge 7 commits into
base: integration
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public Iterator<Entry<Key,Document>> mapDocument(SortedKeyValueIterator<Key,Valu
Aggregation aggregation = new Aggregation(this.getTimeFilter(), this.typeMetadataWithNonIndexed, compositeMetadata, this.isIncludeGroupingContext(),
this.includeRecordId, this.parentDisableIndexOnlyDocuments, null);

KeyToDocumentData k2d = new KeyToDocumentData(deepSourceCopy, this.myEnvironment, this.documentOptions, getEquality(), null,
SortedKeyValueIterator<Key,Value> docMapperSource = getSourceDeepCopy("map document - key to document");
KeyToDocumentData k2d = new KeyToDocumentData(docMapperSource, this.myEnvironment, this.documentOptions, getEquality(), null,
this.includeHierarchyFields, this.includeHierarchyFields);
k2d.withRangeProvider(getRangeProvider());
k2d.withAggregationThreshold(getDocAggregationThresholdMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -166,6 +167,9 @@ public class QueryIterator extends QueryOptions implements YieldingKeyValueItera

protected SortedKeyValueIterator<Key,Value> source;
protected SortedKeyValueIterator<Key,Value> sourceForDeepCopies;
protected int numberOfDeepCopies = 0;
protected List<String> deepCopyStages = new LinkedList<>();

protected Map<String,String> documentOptions;
protected NestedIterator<Key> initKeySource, seekKeySource;
protected Iterator<Entry<Key,Document>> documentIterator;
Expand Down Expand Up @@ -259,7 +263,7 @@ public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> op
}

if (isDebugMultithreadedSources()) {
this.source = new SourceThreadTrackingIterator(this.source);
this.source = new SourceThreadTrackingIterator<>(this.source);
}

this.sourceForDeepCopies = this.source.deepCopy(this.myEnvironment);
Expand Down Expand Up @@ -434,9 +438,17 @@ else if (documentRange != null && (!this.isContainsIndexOnlyTerms() && this.getT

// Create the pipeline iterator for document aggregation and
// evaluation within a thread pool

SortedKeyValueIterator<Key,Value> pipelineSource = null;
if (getMaxEvaluationPipelines() > 1) {
// only need to create a source copy IFF more than one evaluation pipeline will be used
// else, the query iterator itself is passed in and the method call to createDocumentPipeline
// can use the proper method for requesting sources
pipelineSource = getSourceDeepCopy("pipeline source");
}
PipelineIterator pipelineIter = PipelineFactory.createIterator(this.seekKeySource, getMaxEvaluationPipelines(), getMaxPipelineCachedResults(),
getSerialPipelineRequest(), querySpanCollector, trackingSpan, this, sourceForDeepCopies.deepCopy(myEnvironment), myEnvironment,
yield, yieldThresholdMs, columnFamilies, inclusive);
getSerialPipelineRequest(), querySpanCollector, trackingSpan, this, pipelineSource, myEnvironment, yield, yieldThresholdMs,
columnFamilies, inclusive);

pipelineIter.setCollectTimingDetails(collectTimingDetails);
// TODO pipelineIter.setStatsdHostAndPort(statsdHostAndPort);
Expand Down Expand Up @@ -660,12 +672,26 @@ private boolean getSerialPipelineRequest() {
*
* @return the deep copy of the source
*/
@Override
public SortedKeyValueIterator<Key,Value> getSourceDeepCopy() {
SortedKeyValueIterator<Key,Value> sourceDeepCopy;
return getSourceDeepCopy(null);
}

/**
* A version of {@link #getSourceDeepCopy()} that tracks the number of deep copies made and optionally tracks which stages requested a source
*
* @param stage
* the stage name, may be null
* @return a source deep copy
*/
@Override
@SuppressWarnings("SynchronizeOnNonFinalField")
public SortedKeyValueIterator<Key,Value> getSourceDeepCopy(String stage) {
synchronized (sourceForDeepCopies) {
sourceDeepCopy = sourceForDeepCopies.deepCopy(this.myEnvironment);
numberOfDeepCopies++;
deepCopyStages.add(stage);
return sourceForDeepCopies.deepCopy(this.myEnvironment);
}
return sourceDeepCopy;
}

/**
Expand Down Expand Up @@ -762,7 +788,8 @@ public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
};
} else {
// @formatter:off
docMapper = new KeyToDocumentData(deepSourceCopy, myEnvironment, documentOptions, getEquality(), getEventEvaluationFilter(), this.includeHierarchyFields,
SortedKeyValueIterator<Key, Value> docMapperSource = getSourceDeepCopy("document pipeline - key to document data");
docMapper = new KeyToDocumentData(docMapperSource, myEnvironment, documentOptions, getEquality(), getEventEvaluationFilter(), this.includeHierarchyFields,
this.includeHierarchyFields)
.withRangeProvider(getRangeProvider())
.withAggregationThreshold(getDocAggregationThresholdMs());
Expand Down Expand Up @@ -806,11 +833,13 @@ public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
}
}

SortedKeyValueIterator<Key,Value> evaluationSource = getSourceDeepCopy("document pipeline - evaluation");

if (gatherTimingDetails()) {
documents = new EvaluationTrackingIterator(QuerySpan.Stage.DocumentEvaluation, trackingSpan, getEvaluation(documentSpecificSource, deepSourceCopy,
documents = new EvaluationTrackingIterator(QuerySpan.Stage.DocumentEvaluation, trackingSpan, getEvaluation(documentSpecificSource, evaluationSource,
documents, compositeMetadata, typeMetadataWithNonIndexed, columnFamilies, inclusive));
} else {
documents = getEvaluation(documentSpecificSource, deepSourceCopy, documents, compositeMetadata, typeMetadataWithNonIndexed, columnFamilies,
documents = getEvaluation(documentSpecificSource, evaluationSource, documents, compositeMetadata, typeMetadataWithNonIndexed, columnFamilies,
inclusive);
}

Expand All @@ -828,9 +857,10 @@ public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
// query logics
// or if the document was not aggregated in the first place because the
// field index fields completely satisfied the query
documents = mapDocument(deepSourceCopy, documents, compositeMetadata);
// No source is passed, mapDocument() uses the correct method to request a source
documents = mapDocument(null, documents, compositeMetadata);

// apply any configured post processing
// apply any configured post-processing
documents = getPostProcessingChain(documents);
if (gatherTimingDetails()) {
documents = new EvaluationTrackingIterator(QuerySpan.Stage.PostProcessing, trackingSpan, documents);
Expand Down Expand Up @@ -937,9 +967,11 @@ protected Iterator<Entry<Key,Document>> getEvaluation(NestedQueryIterator<Key> d
final Iterator<Tuple3<Key,Document,Map<String,Object>>> itrWithContext;
if (this.isTermFrequenciesRequired()) {

SortedKeyValueIterator<Key,Value> tfSource = getSourceDeepCopy("tf aggregation");

TermFrequencyConfig tfConfig = new TermFrequencyConfig();
tfConfig.setScript(getScript(documentSource));
tfConfig.setSource(sourceDeepCopy.deepCopy(myEnvironment));
tfConfig.setSource(tfSource);
tfConfig.setContentExpansionFields(getContentExpansionFields());
tfConfig.setTfFields(getTermFrequencyFields());
tfConfig.setTypeMetadata(getTypeMetadata());
Expand Down Expand Up @@ -1092,7 +1124,10 @@ protected Iterator<Entry<Key,Document>> mapDocument(SortedKeyValueIterator<Key,V
}
if (fieldIndexSatisfiesQuery) {
// @formatter:off
final KeyToDocumentData docMapper = new KeyToDocumentData(deepSourceCopy, this.myEnvironment, this.documentOptions, getEquality(),

SortedKeyValueIterator<Key, Value> docMapperSource = getSourceDeepCopy("map document - key to document");

final KeyToDocumentData docMapper = new KeyToDocumentData(docMapperSource, this.myEnvironment, this.documentOptions, getEquality(),
getEventEvaluationFilter(), this.includeHierarchyFields, this.includeHierarchyFields)
.withRangeProvider(getRangeProvider())
.withAggregationThreshold(getDocAggregationThresholdMs());
Expand Down Expand Up @@ -1158,6 +1193,15 @@ protected void prepareKeyValue() {
}
this.key = null;
this.value = null;

if (log.isTraceEnabled()) {
synchronized (this) {
log.trace("=== produced " + numberOfDeepCopies + " source deep copies ===");
for (String stage : deepCopyStages) {
log.trace(stage);
}
}
}
}
}

Expand Down Expand Up @@ -1473,7 +1517,7 @@ private BasePoolableObjectFactory<SortedKeyValueIterator<Key,Value>> createIvara
return new BasePoolableObjectFactory<SortedKeyValueIterator<Key,Value>>() {
@Override
public SortedKeyValueIterator<Key,Value> makeObject() throws Exception {
return sourceFactory.getSourceDeepCopy();
return sourceFactory.getSourceDeepCopy("ivarator");
}
};
}
Expand Down Expand Up @@ -1621,7 +1665,9 @@ protected ExcerptTransform getExcerptTransform() {
synchronized (getExcerptFields()) {
if (excerptTransform == null) {
try {
excerptTransform = new ExcerptTransform(excerptFields, myEnvironment, sourceForDeepCopies.deepCopy(myEnvironment),
SortedKeyValueIterator<Key,Value> excerptSource = getSourceDeepCopy("excerpt transform");

excerptTransform = new ExcerptTransform(excerptFields, myEnvironment, excerptSource,
excerptIterator.getDeclaredConstructor().newInstance());
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Could not create excerpt transform", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,13 @@ public interface SourceFactory<K extends WritableComparable<?>,V extends Writabl
* @return the deep copy
*/
SortedKeyValueIterator<K,V> getSourceDeepCopy();

/**
* Create a deep copy of a source with optional stage name
*
* @param stage
* a hint as to who requested a deep copy
* @return a source deep copy
*/
SortedKeyValueIterator<K,V> getSourceDeepCopy(String stage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class Pipeline implements Runnable {

private static final Logger log = Logger.getLogger(Pipeline.class);
/**
* A source list for which the iterator will automatically reset to the beginning upon comodification. This allows us to have an iterator that will always
* A source list for which the iterator will automatically reset to the beginning upon co-modification. This allows us to have an iterator that will always
* return results as long as elements are added to the list.
*/
private DocumentSpecificNestedIterator documentSpecificSource = new DocumentSpecificNestedIterator(null);
private final DocumentSpecificNestedIterator documentSpecificSource = new DocumentSpecificNestedIterator(null);

// the result
private Entry<Key,Document> result = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,17 @@ public Pipeline checkOut(Key key, Document doc, NestedQuery<Key> nestedQuery, Co
NestedQueryIterator<Key> nq = pipeline.getDocumentSpecificSource();
if (null != nestedQuery) {
nq.setCurrentQuery(nestedQuery);
pipeline.setSourceIterator(
sourceIterator.createDocumentPipeline(sourceForDeepCopy.deepCopy(env), nq, columnFamilies, inclusive, querySpanCollector));
pipeline.setSourceIterator(sourceIterator.createDocumentPipeline(getSource(), nq, columnFamilies, inclusive, querySpanCollector));
}
}
} else if (checkedIn.size() + checkedOut.size() < maxPipelines) {
pipeline = new Pipeline(this.querySpanCollector, sourceForDeepCopy.deepCopy(env));
// this pipeline constructor doesn't actually use any of the objects passed in, so don't even bother
pipeline = new Pipeline(this.querySpanCollector, null);
NestedQueryIterator<Key> nq = pipeline.getDocumentSpecificSource();
if (null != nestedQuery) {
nq.setCurrentQuery(nestedQuery);
}
pipeline.setSourceIterator(
sourceIterator.createDocumentPipeline(sourceForDeepCopy.deepCopy(env), nq, columnFamilies, inclusive, querySpanCollector));
pipeline.setSourceIterator(sourceIterator.createDocumentPipeline(getSource(), nq, columnFamilies, inclusive, querySpanCollector));
}
if (pipeline != null) {
checkedOut.add(pipeline);
Expand All @@ -102,4 +101,14 @@ public void checkIn(Pipeline pipeline) {
checkedOut.remove(pipeline);
checkedIn.add(pipeline);
}

private SortedKeyValueIterator<Key,Value> getSource() {
if (maxPipelines == 1) {
// if the pipeline pool services a serial iterator do not deep copy the source
return sourceForDeepCopy;
}

// if more than one pipeline is allowed to run at once, assume a fresh copy of the source is a required
return sourceForDeepCopy.deepCopy(env);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1642,8 +1642,8 @@ public IteratorBuildingVisitor limitOverride(boolean limitOverride) {
return this;
}

public IteratorBuildingVisitor setSource(SourceFactory sourceFactory, IteratorEnvironment env) {
SortedKeyValueIterator<Key,Value> skvi = sourceFactory.getSourceDeepCopy();
public IteratorBuildingVisitor setSource(SourceFactory<Key,Value> sourceFactory, IteratorEnvironment env) {
SortedKeyValueIterator<Key,Value> skvi = sourceFactory.getSourceDeepCopy("IBV");
this.source = new SourceManager(skvi);
this.env = env;
Map<String,String> options = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ private void eval(ASTJexlScript query, Range docRange, Key docKeyHit, List<Map.E
}

private static class SourceFactory implements datawave.query.iterator.SourceFactory<Key,Value> {
private Iterator<Map.Entry<Key,Value>> iterator;
private final Iterator<Map.Entry<Key,Value>> iterator;

public SourceFactory(Iterator<Map.Entry<Key,Value>> iterator) {
this.iterator = iterator;
Expand All @@ -986,6 +986,11 @@ public SourceFactory(Iterator<Map.Entry<Key,Value>> iterator) {
public SortedKeyValueIterator<Key,Value> getSourceDeepCopy() {
return new SortedListKeyValueIterator(iterator);
}

@Override
public SortedKeyValueIterator<Key,Value> getSourceDeepCopy(String stage) {
return new SortedListKeyValueIterator(iterator);
}
}

private static class TestIteratorEnvironment implements IteratorEnvironment {
Expand Down
Loading