From 6f3339529b5bc81382c8e2ba2488b625993b94b2 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 22 Oct 2020 12:58:07 +0000 Subject: [PATCH] Closes #1 - remove use of Accumulo IteratorUtil, replace with IteratorUtils --- NOTICE | 4 + .../inmemory/InMemoryScannerBase.java | 5 +- .../accumulo/inmemory/IteratorUtils.java | 139 ++++++++++++++++++ 3 files changed, 145 insertions(+), 3 deletions(-) create mode 100644 NOTICE create mode 100644 src/main/java/datawave/accumulo/inmemory/IteratorUtils.java diff --git a/NOTICE b/NOTICE new file mode 100644 index 0000000..c18e787 --- /dev/null +++ b/NOTICE @@ -0,0 +1,4 @@ +This project reuses or contains code derived from the +Apache Accumulo project (Apache 2.0) +https://github.com/apache/accumulo +Copyright 2011-2020 The Apache Software Foundation. All Rights Reserved. diff --git a/src/main/java/datawave/accumulo/inmemory/InMemoryScannerBase.java b/src/main/java/datawave/accumulo/inmemory/InMemoryScannerBase.java index ba3440a..752b52a 100644 --- a/src/main/java/datawave/accumulo/inmemory/InMemoryScannerBase.java +++ b/src/main/java/datawave/accumulo/inmemory/InMemoryScannerBase.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; @@ -136,8 +135,8 @@ public SortedKeyValueIterator createFilter(SortedKeyValueIterator injectedIterators = applyInjectedIterators(wrappedFilter); - SortedKeyValueIterator result = iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, injectedIterators, null, conf, - serverSideIteratorList, serverSideIteratorOptions, iterEnv, false)); + SortedKeyValueIterator result = iterEnv.getTopLevelIterator(IteratorUtils.loadIterators(IteratorScope.scan, injectedIterators, null, conf, + serverSideIteratorList, serverSideIteratorOptions, iterEnv)); return result; } diff --git a/src/main/java/datawave/accumulo/inmemory/IteratorUtils.java b/src/main/java/datawave/accumulo/inmemory/IteratorUtils.java new file mode 100644 index 0000000..da29141 --- /dev/null +++ b/src/main/java/datawave/accumulo/inmemory/IteratorUtils.java @@ -0,0 +1,139 @@ +package datawave.accumulo.inmemory; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.IteratorUtil.IterInfoComparator; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.system.SynchronizedIterator; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IteratorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(IteratorUtils.class); + + /** + * Fetch the correct configuration key prefix for the given scope. Throws an IllegalArgumentException if no property exists for the given scope. + */ + private static Property getProperty(IteratorScope scope) { + requireNonNull(scope); + switch (scope) { + case scan: + return Property.TABLE_ITERATOR_SCAN_PREFIX; + case minc: + return Property.TABLE_ITERATOR_MINC_PREFIX; + case majc: + return Property.TABLE_ITERATOR_MAJC_PREFIX; + default: + throw new IllegalStateException("Could not find configuration property for IteratorScope"); + } + } + + private static void parseIteratorConfiguration(IteratorScope scope, List iters, Map> ssio, + Map> allOptions, AccumuloConfiguration conf) { + parseIterConf(scope, iters, allOptions, conf); + + mergeOptions(ssio, allOptions); + } + + private static void mergeOptions(Map> ssio, Map> allOptions) { + for (Entry> entry : ssio.entrySet()) { + if (entry.getValue() == null) + continue; + Map options = allOptions.get(entry.getKey()); + if (options == null) { + allOptions.put(entry.getKey(), entry.getValue()); + } else { + options.putAll(entry.getValue()); + } + } + } + + private static void parseIterConf(IteratorScope scope, List iters, Map> allOptions, AccumuloConfiguration conf) { + final Property scopeProperty = getProperty(scope); + final String scopePropertyKey = scopeProperty.getKey(); + + for (Entry entry : conf.getAllPropertiesWithPrefix(scopeProperty).entrySet()) { + String suffix = entry.getKey().substring(scopePropertyKey.length()); + String suffixSplit[] = suffix.split("\\.", 3); + + if (suffixSplit.length == 1) { + String sa[] = entry.getValue().split(","); + int prio = Integer.parseInt(sa[0]); + String className = sa[1]; + iters.add(new IterInfo(prio, className, suffixSplit[0])); + } else if (suffixSplit.length == 3 && suffixSplit[1].equals("opt")) { + String iterName = suffixSplit[0]; + String optName = suffixSplit[2]; + + Map options = allOptions.get(iterName); + if (options == null) { + options = new HashMap<>(); + allOptions.put(iterName, options); + } + + options.put(optName, entry.getValue()); + + } else { + throw new IllegalArgumentException("Invalid iterator format: " + entry.getKey()); + } + } + + Collections.sort(iters, new IterInfoComparator()); + } + + @SuppressWarnings("unchecked") + public static ,V extends Writable> SortedKeyValueIterator loadIterators(IteratorScope scope, + SortedKeyValueIterator source, KeyExtent extent, AccumuloConfiguration conf, List ssiList, + Map> ssio, IteratorEnvironment env) throws IOException { + + List iters = new ArrayList<>(ssiList); + Map> allOptions = new HashMap<>(); + parseIteratorConfiguration(scope, iters, ssio, allOptions, conf); + // wrap the source in a SynchronizedIterator in case any of the additional configured iterators + // want to use threading + SortedKeyValueIterator prev = new SynchronizedIterator<>(source); + + try { + for (IterInfo iterInfo : iters) { + + Class> clazz = (Class>) Class.forName(iterInfo.className) + .asSubclass(SortedKeyValueIterator.class); + + SortedKeyValueIterator skvi = clazz.newInstance(); + Map options = allOptions.get(iterInfo.iterName); + if (options == null) + options = Collections.emptyMap(); + skvi.init(prev, options, env); + prev = skvi; + } + } catch (ClassNotFoundException e) { + LOG.error(e.toString()); + throw new RuntimeException(e); + } catch (InstantiationException e) { + LOG.error(e.toString()); + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + LOG.error(e.toString()); + throw new RuntimeException(e); + } + return prev; + } + +}