Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics showing the number of events evaluated #2138

Open
wants to merge 12 commits into
base: integration
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public class TimingMetadata extends Metadata {
private static final String NEXT_COUNT = "NEXT_COUNT";
private static final String SOURCE_COUNT = "SOURCE_COUNT";
private static final String SEEK_COUNT = "SEEK_COUNT";
private static final String EVALUATED_COUNT = "EVALUATED_COUNT";
private static final String REJECTED_COUNT = "REJECTED_COUNT";
private static final String YIELD_COUNT = "YIELD_COUNT";
private static final String STAGE_TIMERS = "STAGE_TIMERS";
private static final String HOST = "HOST";
Expand Down Expand Up @@ -55,6 +57,32 @@ public void setSeekCount(long seekCount) {
put(SEEK_COUNT, new Numeric(seekCount, this.getMetadata(), this.isToKeep()));
}

public long getEvaluatedCount() {
Numeric numericValue = (Numeric) get(EVALUATED_COUNT);
if (numericValue != null) {
return ((Number) numericValue.getData()).longValue();
} else {
return 0;
}
}

public void setEvaluatedCount(long evaluatedCount) {
put(EVALUATED_COUNT, new Numeric(evaluatedCount, this.getMetadata(), this.isToKeep()));
}

public long getRejectedCount() {
Numeric numericValue = (Numeric) get(REJECTED_COUNT);
if (numericValue != null) {
return ((Number) numericValue.getData()).longValue();
} else {
return 0;
}
}

public void setRejectedCount(long rejectedCount) {
put(REJECTED_COUNT, new Numeric(rejectedCount, this.getMetadata(), this.isToKeep()));
}

public long getYieldCount() {
Numeric numericValue = (Numeric) get(YIELD_COUNT);
if (numericValue != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public static void addTimingMetadata(Document document, QuerySpan querySpan) {
timingMetadata.setHost(host);
timingMetadata.setSourceCount(querySpan.getSourceCount());
timingMetadata.setSeekCount(querySpan.getSeekCount());
timingMetadata.setEvaluatedCount(querySpan.getEvaluatedCount());
timingMetadata.setRejectedCount(querySpan.getRejectedCount());
timingMetadata.setNextCount(querySpan.getNextCount());
if (querySpan.getYield()) {
timingMetadata.setYieldCount(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ public SortedKeyValueIterator<Key,Value> getSourceDeepCopy() {
* type for the iterator
* @return an iterator to elements that satisfy the predicate
*/
public static <T> UnmodifiableIterator<T> statelessFilter(final Iterator<T> unfiltered, final Predicate<? super T> predicate) {
public static <T> UnmodifiableIterator<T> statelessFilter(final Iterator<T> unfiltered, final Predicate<? super T> predicate, QuerySpan trackingSpan) {
checkNotNull(unfiltered);
checkNotNull(predicate);
return new UnmodifiableIterator<T>() {
Expand All @@ -701,7 +701,15 @@ protected T computeNext() {
while (unfiltered.hasNext()) {
T element = unfiltered.next();
if (predicate.apply(element)) {
if (trackingSpan != null) {
trackingSpan.evaluatedIncrement(1);
}
return element;
} else {
if (trackingSpan != null) {
trackingSpan.evaluatedIncrement(1);
trackingSpan.rejectedIncrement(1);
}
}
}
return null;
Expand Down Expand Up @@ -837,7 +845,7 @@ public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
documents = mapDocument(deepSourceCopy, documents, compositeMetadata);

// apply any configured post processing
documents = getPostProcessingChain(documents);
documents = getPostProcessingChain(documents, trackingSpan);
if (gatherTimingDetails()) {
documents = new EvaluationTrackingIterator(QuerySpan.Stage.PostProcessing, trackingSpan, documents);
}
Expand Down Expand Up @@ -876,11 +884,11 @@ public Entry<DocumentData,Document> apply(@Nullable Entry<Key,Document> input) {
// projection or visibility filtering)
if (gatherTimingDetails()) {
documents = statelessFilter(documents,
new EvaluationTrackingPredicate<>(QuerySpan.Stage.EmptyDocumentFilter, trackingSpan, new EmptyDocumentFilter()));
new EvaluationTrackingPredicate<>(QuerySpan.Stage.EmptyDocumentFilter, trackingSpan, new EmptyDocumentFilter()), trackingSpan);
documents = Iterators.transform(documents,
new EvaluationTrackingFunction<>(QuerySpan.Stage.DocumentMetadata, trackingSpan, new DocumentMetadata()));
} else {
documents = statelessFilter(documents, new EmptyDocumentFilter());
documents = statelessFilter(documents, new EmptyDocumentFilter(), trackingSpan);
documents = Iterators.transform(documents, new DocumentMetadata());
}

Expand Down Expand Up @@ -976,7 +984,8 @@ protected Iterator<Entry<Key,Document>> getEvaluation(NestedQueryIterator<Key> d
}

final Iterator<Tuple3<Key,Document,DatawaveJexlContext>> itrWithDatawaveJexlContext = Iterators.transform(itrWithContext, contextCreator);
Iterator<Tuple3<Key,Document,DatawaveJexlContext>> matchedDocuments = statelessFilter(itrWithDatawaveJexlContext, jexlEvaluationFunction);
Iterator<Tuple3<Key,Document,DatawaveJexlContext>> matchedDocuments = statelessFilter(itrWithDatawaveJexlContext, jexlEvaluationFunction,
trackingSpan);
if (log.isTraceEnabled()) {
log.trace("arithmetic:" + arithmetic + " range:" + getDocumentRange(documentSource) + ", thread:" + Thread.currentThread());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import datawave.query.iterator.ivarator.IvaratorCacheDirConfig;
import datawave.query.iterator.logic.IndexIterator;
import datawave.query.iterator.logic.TermFrequencyExcerptIterator;
import datawave.query.iterator.profile.QuerySpan;
import datawave.query.jexl.DefaultArithmetic;
import datawave.query.jexl.HitListArithmetic;
import datawave.query.jexl.JexlASTHelper;
Expand Down Expand Up @@ -1703,7 +1704,7 @@ public boolean validateOptions(Map<String,String> options) {
if (options.containsKey(POSTPROCESSING_CLASSES)) {
this.postProcessingFunctions = options.get(POSTPROCESSING_CLASSES);
// test parsing of the functions
getPostProcessingChain(new WrappingIterator<>());
getPostProcessingChain(new WrappingIterator<>(), null);
}

if (options.containsKey(NON_INDEXED_DATATYPES)) {
Expand Down Expand Up @@ -2121,7 +2122,7 @@ public static Set<String> buildIgnoredColumnFamilies(String colFams) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
public Iterator<Entry<Key,Document>> getPostProcessingChain(Iterator<Entry<Key,Document>> postProcessingBase) {
public Iterator<Entry<Key,Document>> getPostProcessingChain(Iterator<Entry<Key,Document>> postProcessingBase, QuerySpan trackingSpan) {
String functions = postProcessingFunctions;
if (functions != null && !functions.isEmpty()) {
try {
Expand All @@ -2146,7 +2147,7 @@ public Iterator<Entry<Key,Document>> getPostProcessingChain(Iterator<Entry<Key,D
((ConfiguredPredicate) p).configure(options);
}

tforms = QueryIterator.statelessFilter(tforms, p);
tforms = QueryIterator.statelessFilter(tforms, p, trackingSpan);
} else {
log.error(fClass + " is not a function or predicate.");
throw new RuntimeException(fClass + " is not a function or predicate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public long getNextCount() {
return getThreadSpecificQuerySpan().getNextCount();
}

public long getEvaluatedCount() {
return getThreadSpecificQuerySpan().getEvaluatedCount();
}

public long getRejectedCount() {
return getThreadSpecificQuerySpan().getRejectedCount();
}

public long getSeekCount() {
return getThreadSpecificQuerySpan().getSeekCount();
}
Expand Down Expand Up @@ -97,6 +105,14 @@ public void setSeek(long seek) {
getThreadSpecificQuerySpan().setSeek(seek);
}

public void setEvaluatedCount(long evaluatedCount) {
getThreadSpecificQuerySpan().setEvaluatedCount(evaluatedCount);
}

public void setRejectedCount(long rejectedCount) {
getThreadSpecificQuerySpan().setRejectedCount(rejectedCount);
}

@Override
public void setNext(long next) {
getThreadSpecificQuerySpan().setNext(next);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class QuerySpan {

protected long seek = 0;

protected long evaluated = 0;
protected long rejected = 0;

protected boolean yield = false;

private Map<String,Long> stageTimers = new LinkedHashMap<>();
Expand Down Expand Up @@ -90,6 +93,34 @@ public long getNextCount() {
return nextCount;
}

public void evaluatedIncrement(long evaluatedCount) {
evaluated += evaluatedCount;
System.out.println("Evaluated Count: " + evaluated);
}

public long getEvaluatedCount() {
long evaluatedCount = evaluated;
for (QuerySpan subSpan : sources) {
evaluatedCount += subSpan.getEvaluatedCount();
}
System.out.println("Evaluated Count: " + evaluatedCount);
return evaluatedCount;
}

public void rejectedIncrement(long rejectedCount) {
rejected += rejectedCount;
System.out.println("Rejected Count: " + rejected);
}

public long getRejectedCount() {
long rejectedCount = rejected;
for (QuerySpan subSpan : sources) {
rejectedCount += subSpan.getRejectedCount();
}
System.out.println("Rejected Count: " + rejectedCount);
return rejectedCount;
}

public long getSeekCount() {
long seekCount = seek;
for (QuerySpan subSpan : sources) {
Expand All @@ -113,7 +144,7 @@ public boolean getYield() {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(super.toString()).append(" sources:").append(getSourceCount()).append(" next:").append(getNextCount()).append(" seek:").append(getSeekCount())
.append(" yield:").append(getYield());
.append(" evaluated:").append(getEvaluatedCount()).append(" rejected:").append(getRejectedCount()).append(" yield:").append(getYield());
return sb.toString();
}

Expand Down Expand Up @@ -165,6 +196,8 @@ public void reset() {
sourceCount = 0;
next = 0;
seek = 0;
rejected = 0;
evaluated = 0;
yield = false;
stageTimerTotal = 0;
stageTimers.clear();
Expand Down Expand Up @@ -202,6 +235,14 @@ public void setSeek(long seek) {
this.seek = seek;
}

public void setEvaluatedCount(long evaluatedCount) {
this.evaluated = evaluatedCount;
}

public void setRejectedCount(long rejectedCount) {
this.rejected = rejectedCount;
}

public void setNext(long next) {
this.next = next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
public class QuerySpanCollector {
private AtomicLong seekCount = new AtomicLong();
private AtomicLong nextCount = new AtomicLong();
private AtomicLong evaluatedCount = new AtomicLong();
private AtomicLong rejectedCount = new AtomicLong();
private AtomicBoolean yield = new AtomicBoolean();
private AtomicLong sourceCount = new AtomicLong();
private Map<String,Long> stageTimers = new LinkedHashMap<>();
Expand All @@ -22,6 +24,8 @@ public void addQuerySpan(QuerySpan querySpan) {
synchronized (this) {
seekCount.addAndGet(querySpan.getSeekCount());
nextCount.addAndGet(querySpan.getNextCount());
evaluatedCount.addAndGet(querySpan.getEvaluatedCount());
rejectedCount.addAndGet(querySpan.getRejectedCount());
yield.set(querySpan.getYield());
sourceCount.addAndGet(querySpan.getSourceCount());
Map<String,Long> timers = querySpan.getStageTimers();
Expand Down Expand Up @@ -50,6 +54,8 @@ private QuerySpan combineQuerySpans() {
combinedQuerySpan = new QuerySpan(null);
combinedQuerySpan.setNext(this.nextCount.getAndSet(0));
combinedQuerySpan.setSeek(this.seekCount.getAndSet(0));
combinedQuerySpan.setEvaluatedCount(this.evaluatedCount.getAndSet(0));
combinedQuerySpan.setRejectedCount(this.rejectedCount.getAndSet(0));
combinedQuerySpan.setYield(this.yield.getAndSet(false));
combinedQuerySpan.setSourceCount(this.sourceCount.getAndSet(0));
combinedQuerySpan.setStageTimers(this.stageTimers);
Expand All @@ -60,8 +66,8 @@ private QuerySpan combineQuerySpans() {
}

public boolean hasEntries() {
if (this.seekCount.intValue() > 0 || this.nextCount.intValue() > 0 || this.yield.get() || this.sourceCount.intValue() > 0
|| !this.stageTimers.isEmpty()) {
if (this.seekCount.intValue() > 0 || this.nextCount.intValue() > 0 || this.yield.get() || this.sourceCount.intValue() > 0 || !this.stageTimers.isEmpty()
|| this.evaluatedCount.intValue() > 0 || this.rejectedCount.intValue() > 0) {
return true;
} else {
return false;
Expand All @@ -85,8 +91,8 @@ public QuerySpan getCombinedQuerySpan(QuerySpan querySpan) {

public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(super.toString()).append(" seek:").append(seekCount).append(" next:").append(nextCount).append(" yield:").append(yield).append(" sources:")
.append(sourceCount);
sb.append(super.toString()).append(" seek:").append(seekCount).append(" next:").append(nextCount).append(" evaluated:").append(evaluatedCount)
.append(" rejected:").append(rejectedCount).append(" yield:").append(yield).append(" sources:").append(sourceCount);
return sb.toString();
}

Expand All @@ -111,6 +117,14 @@ public long getNextCount() {
return nextCount.longValue();
}

public long getEvaluatedCount() {
return evaluatedCount.longValue();
}

public long getRejectedCount() {
return rejectedCount.longValue();
}

public boolean getYield() {
return yield.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ public Multimap<String,String> getEventFieldsToWrite(T updatedQueryMetric) {
fields.put("SOURCE_COUNT", Long.toString(updatedQueryMetric.getSourceCount()));
fields.put("NEXT_COUNT", Long.toString(updatedQueryMetric.getNextCount()));
fields.put("SEEK_COUNT", Long.toString(updatedQueryMetric.getSeekCount()));
fields.put("EVALUATED_COUNT", Long.toString(updatedQueryMetric.getEvaluatedCount()));
fields.put("REJECTED_COUNT", Long.toString(updatedQueryMetric.getRejectedCount()));
ivakegg marked this conversation as resolved.
Show resolved Hide resolved
fields.put("YIELD_COUNT", Long.toString(updatedQueryMetric.getYieldCount()));
fields.put("DOC_RANGES", Long.toString(updatedQueryMetric.getDocRanges()));
fields.put("FI_RANGES", Long.toString(updatedQueryMetric.getFiRanges()));
Expand Down Expand Up @@ -286,6 +288,12 @@ public Multimap<String,String> getEventFieldsToDelete(T updatedQueryMetric, T st
if (updatedQueryMetric.getSeekCount() != storedQueryMetric.getSeekCount()) {
fields.put("SEEK_COUNT", Long.toString(storedQueryMetric.getSeekCount()));
}
if (updatedQueryMetric.getEvaluatedCount() != storedQueryMetric.getEvaluatedCount()) {
fields.put("EVALUATED_COUNT", Long.toString(storedQueryMetric.getEvaluatedCount()));
}
if (updatedQueryMetric.getRejectedCount() != storedQueryMetric.getRejectedCount()) {
fields.put("REJECTED_COUNT", Long.toString(storedQueryMetric.getRejectedCount()));
}
if (updatedQueryMetric.getYieldCount() != storedQueryMetric.getYieldCount()) {
fields.put("YIELD_COUNT", Long.toString(storedQueryMetric.getYieldCount()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class ActiveQuery {
private long lastSourceCount = 0;
private long lastNextCount = 0;
private long lastSeekCount = 0;
private long lastEvaluatedCount = 0;
private long lastRejectedCount = 0;
private long documentSizeBytes = 0;
private int windowSize = 0;

Expand All @@ -48,8 +50,8 @@ public ActiveQuery(String queryId, int windowSize, String activeQueryLogName) {

synchronized public ActiveQuerySnapshot snapshot() {
return new ActiveQuerySnapshot(this.activeQueryLogName, this.queryId, this.lastSourceCount, this.lastNextCount, this.lastSeekCount,
this.documentSizeBytes, this.activeRanges.size(), this.totalElapsedTime(), this.isInCall(), this.currentCallTime(), this.numCallsMap,
this.timerMap);
this.lastEvaluatedCount, this.lastRejectedCount, this.documentSizeBytes, this.activeRanges.size(), this.totalElapsedTime(),
this.isInCall(), this.currentCallTime(), this.numCallsMap, this.timerMap);
}

synchronized public void beginCall(Range range, CallType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class ActiveQuerySnapshot {
private final long lastSourceCount;
private final long lastNextCount;
private final long lastSeekCount;
private final long lastEvaluatedCount;
private final long lastRejectedCount;
private final long documentSizeBytes;

private final long totalElapsedTime;
Expand All @@ -26,14 +28,16 @@ public class ActiveQuerySnapshot {
private final Map<ActiveQuery.CallType,Long> numCallsMap = new HashMap<>();
private final Map<ActiveQuery.CallType,Snapshot> snapshotMap = new HashMap<>();

public ActiveQuerySnapshot(String activeQueryLogName, String queryId, long lastSourceCount, long lastNextCount, long lastSeekCount, long documentSizeBytes,
int numActiveRanges, long totalElapsedTime, boolean isInCall, long currentCallTime, Map<ActiveQuery.CallType,Long> numCallsMap,
Map<ActiveQuery.CallType,Timer> timerMap) {
public ActiveQuerySnapshot(String activeQueryLogName, String queryId, long lastSourceCount, long lastNextCount, long lastSeekCount, long lastEvaluatedCount,
long lastRejectedCount, long documentSizeBytes, int numActiveRanges, long totalElapsedTime, boolean isInCall, long currentCallTime,
Map<ActiveQuery.CallType,Long> numCallsMap, Map<ActiveQuery.CallType,Timer> timerMap) {
this.activeQueryLogName = activeQueryLogName;
this.queryId = queryId;
this.lastSourceCount = lastSourceCount;
this.lastNextCount = lastNextCount;
this.lastSeekCount = lastSeekCount;
this.lastEvaluatedCount = lastEvaluatedCount;
this.lastRejectedCount = lastRejectedCount;
this.documentSizeBytes = documentSizeBytes;
this.numActiveRanges = numActiveRanges;

Expand Down Expand Up @@ -87,6 +91,8 @@ public String toString() {
sb.append(this.documentSizeBytes).append("/");
sb.append(this.lastSourceCount).append("/");
sb.append(this.lastSeekCount).append("/");
sb.append(this.lastEvaluatedCount).append("/");
sb.append(this.lastRejectedCount).append("/");
sb.append(this.lastNextCount);
}
return sb.toString();
Expand Down
Loading
Loading