Skip to content

Commit

Permalink
Write metric field metadata to accumulo on startup to avoid AllFieldM…
Browse files Browse the repository at this point in the history
…etadataHelper caching empty values when table is empty
  • Loading branch information
billoley committed Jul 8, 2024
1 parent 7993914 commit b0e6472
Showing 1 changed file with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,6 +138,7 @@ public ShardTableQueryMetricHandler(QueryMetricHandlerProperties queryMetricHand
if (this.tablesChecked.compareAndSet(false, true)) {
verifyTables();
}
initializeMetadata();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
Expand All @@ -147,6 +149,44 @@ public ShardTableQueryMetricHandler(QueryMetricHandlerProperties queryMetricHand
}
}

protected void initializeMetadata() throws Exception {
T metric = (T) metricFactory.createMetric(true);
populateFieldsForMetadata(metric);
writeMetadata(metric);
flush();
}

protected void populateFieldsForMetadata(T metric) {
// these values are not written and are only being set to trigger the ingest
// framework to write the entries for these fields into the metadata table
Date d = new Date();
metric.setQueryId(UUID.randomUUID().toString());
metric.setQuery("QUERY");
metric.setSetupTime(1);
metric.setCreateCallTime(1);
metric.setBeginDate(d);
metric.setEndDate(d);
metric.setPlan("PLAN");
metric.setError(new RuntimeException());
metric.setErrorMessage("ERROR");
metric.setErrorCode("ERROR");
metric.setQueryAuthorizations("AUTHS");
metric.setHost("localhost");
metric.setLastUpdated(d);
metric.setLoginTime(1);
metric.setNegativeSelectors(Collections.singletonList("SELECTOR"));
metric.setPositiveSelectors(Collections.singletonList("SELECTOR"));
metric.setParameters(new HashSet<>(Arrays.asList(new QueryImpl.Parameter())));
metric.setPredictions(new HashSet<>(Arrays.asList(new Prediction())));
metric.setProxyServers(Collections.singletonList("SERVER"));
metric.setQueryLogic("LOGIC");
metric.setQueryName("QUERY");
metric.setQueryType("TYPE");
metric.setUser("USER");
metric.setUserDN("USERDN");
metric.addPageTime(100, 100, 1000, 1000);
}

public void shutdown() throws Exception {
if (this.recordWriter != null) {
this.accumuloRecordWriterLock.writeLock().lock();
Expand Down Expand Up @@ -192,24 +232,62 @@ public AbstractContentIngestHelper getContentIndexingDataTypeHelper() {
}
}

private void writeMetadata(ContentIndexingColumnBasedHandler handler) throws Exception {
if (handler.getMetadata() != null) {
for (Entry<BulkIngestKey,Value> e : handler.getMetadata().getBulkMetadata().entries()) {
recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue()));
}
}
}

private void writeMetric(T updated, T stored, long timestamp, boolean delete, ContentIndexingColumnBasedHandler handler) throws Exception {
Multimap<BulkIngestKey,Value> r = getEntries(handler, updated, stored, timestamp);
if (r != null) {
for (Entry<BulkIngestKey,Value> e : r.entries()) {
recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue()));
}
}
if (!delete && handler.getMetadata() != null) {
for (Entry<BulkIngestKey,Value> e : handler.getMetadata().getBulkMetadata().entries()) {
recordWriter.write(e.getKey().getTableName(), getMutation(e.getKey().getKey(), e.getValue()));
}
if (!delete) {
writeMetadata(handler);
}
}

public void writeMetric(T updatedQueryMetric, List<T> storedQueryMetrics, long timestamp, boolean delete) throws Exception {
writeMetric(updatedQueryMetric, storedQueryMetrics, timestamp, delete, Collections.EMPTY_LIST);
}

public void writeMetadata(T metric) throws Exception {
try {
TaskAttemptID taskId = new TaskAttemptID(new TaskID(new JobID(JOB_ID, 1), TaskType.MAP, 1), 1);
this.accumuloRecordWriterLock.readLock().lock();

try {
MapContext<Text,RawRecordContainer,Text,Mutation> context = new MapContextImpl<>(conf, taskId, null, this.recordWriter, null, reporter, null);
ContentIndexingColumnBasedHandler handler = new ContentIndexingColumnBasedHandler() {
@Override
public AbstractContentIngestHelper getContentIndexingDataTypeHelper() {
return getQueryMetricsIngestHelper(false, Collections.emptyList());
}
};
handler.setup(context);
getEntries(handler, metric, null, System.currentTimeMillis());
writeMetadata(handler);
} finally {
this.accumuloRecordWriterLock.readLock().unlock();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
// assume that an error happened with the AccumuloRecordWriter
// mark recordWriter as unhealthy -- the first thread to get the writeLock in
// reload will create a new one that will be marked healthy
this.recordWriter.setHealthy(false);
reload();
// we have no way of knowing if the rejected mutation is this one or a previously
// written one throw the exception so that the metric will be re-written
throw e;
}
}

public void writeMetric(T updatedQueryMetric, List<T> storedQueryMetrics, long timestamp, boolean delete, Collection<String> ignoredFields)
throws Exception {
try {
Expand Down

0 comments on commit b0e6472

Please sign in to comment.