Skip to content

Commit

Permalink
Support field cardinality across a date range (#37)
Browse files Browse the repository at this point in the history
* Add seeking filter for the F column to support getting field cardinality across a date range

* guard against empty ranges

* move log messages from debug to trace
  • Loading branch information
apmoriarty authored Jun 24, 2024
1 parent 44482bd commit 6f2d4d4
Show file tree
Hide file tree
Showing 4 changed files with 711 additions and 1 deletion.
173 changes: 173 additions & 0 deletions src/main/java/datawave/iterators/MetadataFColumnSeekingFilter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package datawave.iterators;

import java.io.IOException;
import java.util.Map;
import java.util.TreeSet;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.user.SeekingFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Splitter;

import datawave.data.ColumnFamilyConstants;

/**
* A {@link SeekingFilter} that operates on the metadata table's {@link ColumnFamilyConstants#COLF_F} column.
* <p>
* This filter solves the problem of calculating field cardinality for a small date range on a system that contains many days worth of data, i.e., it is not
* practical to simply filter based on date and/or datatype.
* <p>
* Given that the F column is simply the datatype and date concatenated with a null byte, it is easy to calculate a seek range that limits the time spent
* iterating across useless keys.
*/
public class MetadataFColumnSeekingFilter extends SeekingFilter implements OptionDescriber {

private static final Logger log = LoggerFactory.getLogger(MetadataFColumnSeekingFilter.class);

public static final String DATATYPES_OPT = "datatypes";
public static final String START_DATE = "start.date";
public static final String END_DATE = "end.date";

private TreeSet<String> datatypes;
private String startDate;
private String endDate;

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
if (!validateOptions(options)) {
throw new IllegalArgumentException("Iterator not configured with correct options");
}

String opt = options.get(DATATYPES_OPT);
if (StringUtils.isBlank(opt)) {
datatypes = new TreeSet<>();
} else {
datatypes = new TreeSet<>(Splitter.on(',').splitToList(opt));
}

startDate = options.get(START_DATE);
endDate = options.get(END_DATE);

super.init(source, options, env);
}

@Override
public IteratorOptions describeOptions() {
IteratorOptions opts = new IteratorOptions(getClass().getName(), "Filter keys by datatype and date range", null, null);
opts.addNamedOption(DATATYPES_OPT, "The set of datatypes used as a filter");
opts.addNamedOption(START_DATE, "The start date, used for seeking");
opts.addNamedOption(END_DATE, "The end date, used for seeking");
return null;
}

@Override
public boolean validateOptions(Map<String,String> options) {
return options.containsKey(DATATYPES_OPT) && options.containsKey(START_DATE) && options.containsKey(END_DATE);
}

/**
* A key is filtered if one of the following three conditions is met. Otherwise, the source will call next.
* <ol>
* <li>datatype miss</li>
* <li>key date is before the start date</li>
* <li>key date is after the end date</li>
* </ol>
*
* @param k
* a key
* @param v
* a value
* @return a {@link FilterResult}
*/
@Override
public FilterResult filter(Key k, Value v) {
if (log.isTraceEnabled()) {
log.trace("filter key: {}", k.toStringNoTime());
}
String cq = k.getColumnQualifier().toString();
int index = cq.indexOf('\u0000');
String datatype = cq.substring(0, index);
if (!datatypes.isEmpty() && !datatypes.contains(datatype)) {
return new FilterResult(false, AdvanceResult.USE_HINT);
}

String date = cq.substring(index + 1);
if (date.compareTo(startDate) < 0) {
return new FilterResult(false, AdvanceResult.USE_HINT);
}

if (date.compareTo(endDate) > 0) {
return new FilterResult(false, AdvanceResult.USE_HINT);
}

return new FilterResult(true, AdvanceResult.NEXT);
}

@Override
public Key getNextKeyHint(Key k, Value v) {
if (log.isTraceEnabled()) {
log.trace("get next hint for key: {}", k.toStringNoTime());
}

Key hint;
String cq = k.getColumnQualifier().toString();
int index = cq.indexOf('\u0000');
String datatype = cq.substring(0, index);

if (!datatypes.isEmpty() && !datatypes.contains(datatype)) {
hint = getSeekToNextDatatypeKey(k, datatype);
} else {
String date = cq.substring(index + 1);
if (date.compareTo(startDate) < 0) {
hint = getSeekToStartDateKey(k, datatype);
} else if (date.compareTo(endDate) > 0) {
hint = getDatatypeRolloverKey(k, datatype);
} else {
hint = k.followingKey(PartialKey.ROW_COLFAM_COLQUAL);
}
}

log.trace("hint: {}", hint);
return hint;
}

private Key getSeekToNextDatatypeKey(Key key, String datatype) {
if (datatypes.isEmpty()) {
// no datatypes provided, so we must instead produce a 'rollover' start key
return getDatatypeRolloverKey(key, datatype);
}

// otherwise datatypes were provided
String nextDatatype = datatypes.higher(datatype);
if (nextDatatype != null) {
log.trace("seek to next datatype");
Text nextColumnQualifier = new Text(nextDatatype + '\u0000' + startDate);
return new Key(key.getRow(), key.getColumnFamily(), nextColumnQualifier);
} else {
log.trace("seek to next ROW_COLFAM");
// out of datatypes, we're done. This partial range will trigger a "beyond source" condition
return key.followingKey(PartialKey.ROW_COLFAM);
}
}

private Key getDatatypeRolloverKey(Key key, String datatype) {
log.trace("seek to rollover datatype");
Text cq = new Text(datatype + '\u0000' + '\uffff');
return new Key(key.getRow(), key.getColumnFamily(), cq);
}

private Key getSeekToStartDateKey(Key k, String datatype) {
log.trace("seek to start date");
Text cq = new Text(datatype + '\u0000' + startDate);
return new Key(k.getRow(), k.getColumnFamily(), cq);
}
}
147 changes: 146 additions & 1 deletion src/main/java/datawave/query/util/MetadataHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimeZone;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -26,6 +27,7 @@
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.IteratorSetting;
Expand Down Expand Up @@ -53,6 +55,7 @@
import org.springframework.stereotype.Component;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
Expand All @@ -67,6 +70,7 @@
import datawave.data.MetadataCardinalityCounts;
import datawave.data.type.Type;
import datawave.iterators.EdgeMetadataCombiner;
import datawave.iterators.MetadataFColumnSeekingFilter;
import datawave.iterators.filter.EdgeMetadataCQStrippingIterator;
import datawave.marking.MarkingFunctions;
import datawave.query.composite.CompositeMetadata;
Expand Down Expand Up @@ -1533,6 +1537,147 @@ protected HashMap<String,Long> getCountsByFieldInDayWithTypes(String fieldName,
return datatypeToCounts;
}

/**
* Get counts for each field across the date range.
* <p>
* Note: it is highly recommended to use this method instead {@link #getCountsForFieldsInDateRange(Set, Set, Date, Date)}.
*
* @param fields
* the fields
* @param begin
* the start date
* @param end
* the end date
* @return a map of field counts
*/
public Map<String,Long> getCountsForFieldsInDateRange(Set<String> fields, Date begin, Date end) {
return getCountsForFieldsInDateRange(fields, Collections.emptySet(), begin, end);
}

/**
* Get counts for each field across the date range. Optionally filter by datatypes if provided.
*
* @param fields
* the fields
* @param datatypes
* the datatypes
* @param begin
* the start date
* @param end
* the end date
* @return a map of field counts
*/
public Map<String,Long> getCountsForFieldsInDateRange(Set<String> fields, Set<String> datatypes, Date begin, Date end) {
Date truncatedBegin = DateUtils.truncate(begin, Calendar.DATE);
Date truncatedEnd = DateUtils.truncate(end, Calendar.DATE);
String startDate = DateHelper.format(truncatedBegin);
String endDate = DateHelper.format(truncatedEnd);
return getCountsForFieldsInDateRange(fields, datatypes, startDate, endDate);
}

/**
* Get counts for each field across the date range. Optionally filter by datatypes if provided.
*
* @param fields
* the fields
* @param datatypes
* the datatypes
* @param beginDate
* the start date
* @param endDate
* the end date
* @return a map of field counts
*/
public Map<String,Long> getCountsForFieldsInDateRange(Set<String> fields, Set<String> datatypes, String beginDate, String endDate) {

SortedSet<String> sortedDatatypes = new TreeSet<>(datatypes);
Map<String,Long> fieldCounts = new HashMap<>();
Set<Range> ranges = createFieldCountRanges(fields, sortedDatatypes, beginDate, endDate);

if (ranges.isEmpty()) {
return fieldCounts;
}

try (BatchScanner bs = ScannerHelper.createBatchScanner(accumuloClient, getMetadataTableName(), getAuths(), fields.size())) {

bs.setRanges(ranges);
bs.fetchColumnFamily(ColumnFamilyConstants.COLF_F);

IteratorSetting setting = new IteratorSetting(50, "MetadataFrequencySeekingIterator", MetadataFColumnSeekingFilter.class);
setting.addOption(MetadataFColumnSeekingFilter.DATATYPES_OPT, Joiner.on(',').join(sortedDatatypes));
setting.addOption(MetadataFColumnSeekingFilter.START_DATE, beginDate);
setting.addOption(MetadataFColumnSeekingFilter.END_DATE, endDate);
bs.addScanIterator(setting);

for (Entry<Key,Value> entry : bs) {

String field = entry.getKey().getRow().toString();
Long count = readLongFromValue(entry.getValue());

if (fieldCounts.containsKey(field)) {
Long existingCount = fieldCounts.get(field);
existingCount += count;
fieldCounts.put(field, existingCount);
} else {
fieldCounts.put(field, count);
}
}

} catch (TableNotFoundException | IOException e) {
throw new RuntimeException(e);
}
return fieldCounts;
}

/**
* Build ranges for the {@link #getCountsForFieldsInDateRange(Set, Set, String, String)} method.
* <p>
* The {@link MetadataFColumnSeekingFilter} can handle a field range, but providing datatypes enables more precise ranges.
*
* @param fields
* the fields
* @param datatypes
* the datatypes
* @param beginDate
* the start date
* @param endDate
* the end date
* @return a set of ranges for the provided fields, bounded by date and optionally datatypes
*/
private Set<Range> createFieldCountRanges(Set<String> fields, SortedSet<String> datatypes, String beginDate, String endDate) {
Set<Range> ranges = new HashSet<>();
for (String field : fields) {
if (datatypes.isEmpty()) {
// punt the hard work to the MetadataFColumnSeekingFilter
ranges.add(Range.exact(field, "f"));
} else {
// more precise range, the MetadataFColumnSeekingFilter will handle seeing between the first and
// last datatypes as necessary
Key start = new Key(field, "f", datatypes.first() + '\u0000' + beginDate);
Key end = new Key(field, "f", datatypes.last() + '\u0000' + endDate + '\u0000');
ranges.add(new Range(start, true, end, false));
}
}
return ranges;
}

/**
* Deserialize a Value that contains a Long
*
* @param value
* an accumulo Value
* @return a long
* @throws IOException
* if there is a deserialization problem
*/
private Long readLongFromValue(Value value) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(value.get())) {
try (DataInputStream inputStream = new DataInputStream(bais)) {
return WritableUtils.readVLong(inputStream);
}
}
}

/**
* Get the earliest occurrence of a field across all datatypes
*
Expand Down
Loading

0 comments on commit 6f2d4d4

Please sign in to comment.