diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index e9b0f3902f96..60d5bb557f5b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -20,8 +20,8 @@ import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction import org.locationtech.geomesa.fs.storage.api.StorageMetadata._ import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig} -import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.{CompositeObserver, NoOpObserver} -import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory} +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver +import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory} import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType import org.locationtech.geomesa.fs.storage.common.utils.{PathCache, StorageUtils} @@ -73,7 +73,7 @@ abstract class AbstractFileSystemStorage( * @param observer observer to report stats on the data written * @return */ - protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter + protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter /** * Create a path reader with the given filter and transform @@ -236,9 +236,9 @@ abstract class AbstractFileSystemStorage( def pathAndObserver: WriterConfig = { val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType) PathCache.register(context.fc, path) - val noopObserver = NoOpObserver - val observer = if (observers.isEmpty) { noopObserver } else { - new CompositeObserver(observers.map(_.apply(path)).+:(noopObserver)).asInstanceOf[BoundsObserver] + val observer = if (observers.isEmpty) { None } else { + val compositeObserver = new CompositeObserver(observers.map(_.apply(path))) + Some(compositeObserver) } WriterConfig(partition, action, path, observer) } @@ -372,7 +372,7 @@ object AbstractFileSystemStorage { /** * Tracks metadata during writes */ - abstract class MetadataObserver extends BoundsObserver { + abstract class MetadataObserver extends FileSystemObserver { private var count: Long = 0L private val bounds: Envelope = new Envelope() @@ -386,8 +386,6 @@ object AbstractFileSystemStorage { } } - def getBoundingBox: Envelope = bounds - override def flush(): Unit = {} override def close(): Unit = onClose(bounds, count) @@ -395,5 +393,5 @@ object AbstractFileSystemStorage { protected def onClose(bounds: Envelope, count: Long): Unit } - private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: BoundsObserver) + private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: Option[FileSystemObserver]) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala index 497f4fbec32d..fb6f6bb02e82 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserver.scala @@ -9,13 +9,8 @@ package org.locationtech.geomesa.fs.storage.common.observer import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter -import org.locationtech.jts.geom.Envelope /** * Marker trait for writer hooks */ -trait FileSystemObserver extends FileSystemWriter - -trait BoundsObserver extends FileSystemObserver { - def getBoundingBox: Envelope -} \ No newline at end of file +trait FileSystemObserver extends FileSystemWriter \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserverFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserverFactory.scala index 4e7dbee92bab..2b397e242208 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserverFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/observer/FileSystemObserverFactory.scala @@ -13,7 +13,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.locationtech.geomesa.utils.io.{CloseQuietly, FlushQuietly} -import org.locationtech.jts.geom.Envelope import java.io.Closeable @@ -42,9 +41,8 @@ trait FileSystemObserverFactory extends Closeable { object FileSystemObserverFactory { - object NoOpObserver extends BoundsObserver { + object NoOpObserver extends FileSystemObserver { override def write(feature: SimpleFeature): Unit = {} - override def getBoundingBox: Envelope = new Envelope() override def flush(): Unit = {} override def close(): Unit = {} } @@ -54,14 +52,8 @@ object FileSystemObserverFactory { * * @param observers observers */ - class CompositeObserver(observers: Seq[FileSystemObserver]) extends BoundsObserver { + class CompositeObserver(observers: Seq[FileSystemObserver]) extends FileSystemObserver { override def write(feature: SimpleFeature): Unit = observers.foreach(_.write(feature)) - - // Get the bounding box for the UpdateObserver instance (the first one in the list) - override def getBoundingBox: Envelope = { - observers.head.asInstanceOf[BoundsObserver].getBoundingBox - } - override def flush(): Unit = FlushQuietly(observers).foreach(e => throw e) override def close(): Unit = CloseQuietly(observers).foreach(e => throw e) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 59da23b8610d..55ee96f880a1 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -31,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co // actually need to be closed, and since they will only open a single connection per converter, the // impact should be low - override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = throw new NotImplementedError() override protected def createReader( diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala index 41d74ccf2fe6..0b5b5f421a5c 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala @@ -21,7 +21,7 @@ import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver} import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver -import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver} +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.jts.geom.{Envelope, Geometry} @@ -34,13 +34,19 @@ import org.locationtech.jts.geom.{Envelope, Geometry} class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) { - private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver with BoundsObserver { + private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver { override protected def onClose(bounds: Envelope, count: Long): Unit = new FileBasedMetadataCallback(partition, action, file)(bounds, count) } - override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = { - val compositeObserver = new CompositeObserver(Seq(new SingleGeometryObserver(partition, action, file), observer)) - new OrcFileSystemWriter(metadata.sft, context.conf, file, compositeObserver) + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = { + val singleGeometryObserver = new SingleGeometryObserver(partition, action, file) + + observer match { + case Some(_) => + val compositeObserver = new CompositeObserver(Seq(singleGeometryObserver, observer.get)) + new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(compositeObserver)) + case None => new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(singleGeometryObserver)) + } } override protected def createReader( diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala index 58f4ae822638..21a7156041a0 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemWriter.scala @@ -24,7 +24,7 @@ class OrcFileSystemWriter( sft: SimpleFeatureType, config: Configuration, file: Path, - observer: FileSystemObserver = NoOpObserver + observer: Option[FileSystemObserver] = None ) extends FileSystemWriter { private val schema = OrcFileSystemStorage.createTypeDescription(sft) @@ -34,6 +34,7 @@ class OrcFileSystemWriter( private val batch = schema.createRowBatch() private val attributeWriter = OrcAttributeWriter(sft, batch) + private val observerVal = observer.getOrElse(NoOpObserver) override def write(sf: SimpleFeature): Unit = { attributeWriter.apply(sf, batch.size) @@ -43,19 +44,19 @@ class OrcFileSystemWriter( writer.addRowBatch(batch) batch.reset() } - observer.write(sf) + observerVal.write(sf) } override def flush(): Unit = { flushBatch() - observer.flush() + observerVal.flush() } override def close(): Unit = { try { flushBatch() } catch { - case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e + case NonFatal(e) => CloseQuietly(Seq(writer, observerVal)).foreach(e.addSuppressed); throw e } - CloseQuietly.raise(Seq(writer, observer)) + CloseQuietly.raise(Seq(writer, observerVal)) } private def flushBatch(): Unit = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala index af9fe87e6cb6..08329a2434ef 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala @@ -37,7 +37,7 @@ import org.locationtech.jts.geom.Envelope class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) { - override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = { + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = { val sftConf = new Configuration(context.conf) StorageConfiguration.setSft(sftConf, metadata.sft) new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new FileBasedMetadataCallback(partition, action, file)) @@ -76,19 +76,21 @@ object ParquetFileSystemStorage extends LazyLogging { sft: SimpleFeatureType, file: Path, conf: Configuration, - observer: FileSystemObserver = NoOpObserver, + observer: Option[FileSystemObserver] = None, callback: (Envelope, Long) => Unit = ((_, _) => {}) ) extends FileSystemWriter { private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build() + private val observerVal = observer.getOrElse(NoOpObserver) override def write(f: SimpleFeature): Unit = { writer.write(f) - observer.write(f) + observerVal.write(f) } - override def flush(): Unit = observer.flush() + override def flush(): Unit = observerVal.flush() override def close(): Unit = { - CloseQuietly(Seq(writer, observer)).foreach(e => throw e) + CloseQuietly(Seq(writer, observerVal)).foreach(e => throw e) + if (FileValidationEnabled.get.toBoolean) { validateParquetFile(file) }