Skip to content

Commit

Permalink
Add timer to record the time it takes in AccumuloMapStore to fetch la…
Browse files Browse the repository at this point in the history
…stWrittenQuertMetric when necessary
  • Loading branch information
billoley committed Dec 8, 2023
1 parent e0ac962 commit c7a8739
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ public Map<String,String> formatStats(Map<String,Double> stats, boolean useSepar
public Map<String,Double> getServiceStats() {
Map<String,Double> stats = new LinkedHashMap<>();
addTimerStats("store", getTimer(TIMERS.STORE), stats);
addTimerStats("accumulo", this.mapStore.getWriteTimer(), stats);
addTimerStats("accumulo.write", this.mapStore.getWriteTimer(), stats);
addTimerStats("accumulo.read", this.mapStore.getReadTimer(), stats);
addTimerStats("message.send", getTimer(TIMERS.MESSAGE_SEND), stats);
addTimerStats("rest", getTimer(TIMERS.REST), stats);
addMeterStats("message.receive", getMeter(METERS.MESSAGE_RECEIVE), stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import com.codahale.metrics.Timer;
import com.google.common.cache.CacheBuilder;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.hazelcast.core.MapLoader;
import com.hazelcast.core.MapStore;
import com.hazelcast.core.MapStoreFactory;
import datawave.microservice.querymetric.BaseQueryMetric;
import datawave.microservice.querymetric.MergeLockLifecycleListener;
import datawave.microservice.querymetric.QueryMetricType;
import datawave.microservice.querymetric.QueryMetricUpdateHolder;
import datawave.microservice.querymetric.handler.ShardTableQueryMetricHandler;
Expand All @@ -29,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.MINUTES;
Expand All @@ -53,9 +51,9 @@ public class AccumuloMapStore<T extends BaseQueryMetric> extends AccumuloMapLoad
private static AccumuloMapStore instance;
private Logger log = LoggerFactory.getLogger(AccumuloMapStore.class);
private Cache lastWrittenQueryMetricCache;
private MergeLockLifecycleListener mergeLock;
private com.google.common.cache.Cache failures;
private com.github.benmanes.caffeine.cache.Cache failures;
private Timer writeTimer = new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES));
private Timer readTimer = new Timer(new SlidingTimeWindowArrayReservoir(1, MINUTES));
private boolean shuttingDown = false;

public static class Factory implements MapStoreFactory<String,BaseQueryMetric> {
Expand All @@ -66,10 +64,13 @@ public MapLoader<String,BaseQueryMetric> newMapStore(String mapName, Properties
}

@Autowired
public AccumuloMapStore(ShardTableQueryMetricHandler handler, MergeLockLifecycleListener mergeLock) {
public AccumuloMapStore(ShardTableQueryMetricHandler handler) {
this.handler = handler;
this.mergeLock = mergeLock;
this.failures = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
// @formatter:off
this.failures = Caffeine.newBuilder()
.expireAfterWrite(60, TimeUnit.MINUTES)
.build();
// @formatter:on
AccumuloMapStore.instance = this;
}

Expand Down Expand Up @@ -130,14 +131,19 @@ public void store(QueryMetricUpdateHolder<T> queryMetricUpdate) throws Exception
if (!queryMetricUpdate.isNewMetric()) {
lastQueryMetricUpdate = (QueryMetricUpdateHolder<T>) lastWrittenQueryMetricCache.get(queryId, () -> {
log.debug("getting metric {} from accumulo", queryId);
T m = handler.getQueryMetric(queryId, ignoreFieldsOnQuery);
if (m == null) {
return null;
} else {
// these fields will not be populated in the returned metric,
// so we should not compare them later for writing mutations
ignoredFields.addAll(ignoreFieldsOnWrite);
return new QueryMetricUpdateHolder(m, metricType);
Timer.Context readTimerContext = readTimer.time();
try {
T m = handler.getQueryMetric(queryId, ignoreFieldsOnQuery);
if (m == null) {
return null;
} else {
// these fields will not be populated in the returned metric,
// so we should not compare them later for writing mutations
ignoredFields.addAll(ignoreFieldsOnWrite);
return new QueryMetricUpdateHolder(m, metricType);
}
} finally {
readTimerContext.stop();
}
});
}
Expand Down Expand Up @@ -208,8 +214,8 @@ private boolean retryOnException(QueryMetricUpdate update, Exception e) {
String queryId = update.getMetric().getQueryId();
Integer numFailures = 1;
try {
numFailures = (Integer) this.failures.get(queryId, () -> 0) + 1;
} catch (ExecutionException e1) {
numFailures = (Integer) this.failures.get(queryId, o -> 0) + 1;
} catch (Exception e1) {
log.error(e1.getMessage(), e1);
}
if (numFailures < 3) {
Expand Down Expand Up @@ -265,4 +271,8 @@ public void deleteAll(Collection<String> keys) {
public Timer getWriteTimer() {
return writeTimer;
}

public Timer getReadTimer() {
return readTimer;
}
}

0 comments on commit c7a8739

Please sign in to comment.