Skip to content

Commit

Permalink
Make observer an Option, and remove BoundsObserver
Browse files Browse the repository at this point in the history
  • Loading branch information
adeet1 committed Jun 20, 2024
1 parent 2040c4e commit 830d1c6
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata._
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.{CompositeObserver, NoOpObserver}
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType
import org.locationtech.geomesa.fs.storage.common.utils.{PathCache, StorageUtils}
Expand Down Expand Up @@ -73,7 +73,7 @@ abstract class AbstractFileSystemStorage(
* @param observer observer to report stats on the data written
* @return
*/
protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter
protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter

/**
* Create a path reader with the given filter and transform
Expand Down Expand Up @@ -236,9 +236,9 @@ abstract class AbstractFileSystemStorage(
def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fc, path)
val noopObserver = NoOpObserver
val observer = if (observers.isEmpty) { noopObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(noopObserver)).asInstanceOf[BoundsObserver]
val observer = if (observers.isEmpty) { None } else {
val compositeObserver = new CompositeObserver(observers.map(_.apply(path)))
Some(compositeObserver)
}
WriterConfig(partition, action, path, observer)
}
Expand Down Expand Up @@ -372,7 +372,7 @@ object AbstractFileSystemStorage {
/**
* Tracks metadata during writes
*/
abstract class MetadataObserver extends BoundsObserver {
abstract class MetadataObserver extends FileSystemObserver {

private var count: Long = 0L
private val bounds: Envelope = new Envelope()
Expand All @@ -386,14 +386,12 @@ object AbstractFileSystemStorage {
}
}

def getBoundingBox: Envelope = bounds

override def flush(): Unit = {}

override def close(): Unit = onClose(bounds, count)

protected def onClose(bounds: Envelope, count: Long): Unit
}

private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: BoundsObserver)
private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: Option[FileSystemObserver])
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@
package org.locationtech.geomesa.fs.storage.common.observer

import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.jts.geom.Envelope

/**
* Marker trait for writer hooks
*/
trait FileSystemObserver extends FileSystemWriter

trait BoundsObserver extends FileSystemObserver {
def getBoundingBox: Envelope
}
trait FileSystemObserver extends FileSystemWriter
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.utils.io.{CloseQuietly, FlushQuietly}
import org.locationtech.jts.geom.Envelope

import java.io.Closeable

Expand Down Expand Up @@ -42,9 +41,8 @@ trait FileSystemObserverFactory extends Closeable {

object FileSystemObserverFactory {

object NoOpObserver extends BoundsObserver {
object NoOpObserver extends FileSystemObserver {
override def write(feature: SimpleFeature): Unit = {}
override def getBoundingBox: Envelope = new Envelope()
override def flush(): Unit = {}
override def close(): Unit = {}
}
Expand All @@ -54,14 +52,8 @@ object FileSystemObserverFactory {
*
* @param observers observers
*/
class CompositeObserver(observers: Seq[FileSystemObserver]) extends BoundsObserver {
class CompositeObserver(observers: Seq[FileSystemObserver]) extends FileSystemObserver {
override def write(feature: SimpleFeature): Unit = observers.foreach(_.write(feature))

// Get the bounding box for the UpdateObserver instance (the first one in the list)
override def getBoundingBox: Envelope = {
observers.head.asInstanceOf[BoundsObserver].getBoundingBox
}

override def flush(): Unit = FlushQuietly(observers).foreach(e => throw e)
override def close(): Unit = CloseQuietly(observers).foreach(e => throw e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
// actually need to be closed, and since they will only open a single connection per converter, the
// impact should be low

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter =
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter =
throw new NotImplementedError()

override protected def createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.jts.geom.{Envelope, Geometry}
Expand All @@ -34,13 +34,19 @@ import org.locationtech.jts.geom.{Envelope, Geometry}
class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver with BoundsObserver {
private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver {
override protected def onClose(bounds: Envelope, count: Long): Unit = new FileBasedMetadataCallback(partition, action, file)(bounds, count)
}

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
val compositeObserver = new CompositeObserver(Seq(new SingleGeometryObserver(partition, action, file), observer))
new OrcFileSystemWriter(metadata.sft, context.conf, file, compositeObserver)
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = {
val singleGeometryObserver = new SingleGeometryObserver(partition, action, file)

observer match {
case Some(_) =>
val compositeObserver = new CompositeObserver(Seq(singleGeometryObserver, observer.get))
new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(compositeObserver))
case None => new OrcFileSystemWriter(metadata.sft, context.conf, file, Some(singleGeometryObserver))
}
}

override protected def createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class OrcFileSystemWriter(
sft: SimpleFeatureType,
config: Configuration,
file: Path,
observer: FileSystemObserver = NoOpObserver
observer: Option[FileSystemObserver] = None
) extends FileSystemWriter {

private val schema = OrcFileSystemStorage.createTypeDescription(sft)
Expand All @@ -34,6 +34,7 @@ class OrcFileSystemWriter(
private val batch = schema.createRowBatch()

private val attributeWriter = OrcAttributeWriter(sft, batch)
private val observerVal = observer.getOrElse(NoOpObserver)

override def write(sf: SimpleFeature): Unit = {
attributeWriter.apply(sf, batch.size)
Expand All @@ -43,19 +44,19 @@ class OrcFileSystemWriter(
writer.addRowBatch(batch)
batch.reset()
}
observer.write(sf)
observerVal.write(sf)
}

override def flush(): Unit = {
flushBatch()
observer.flush()
observerVal.flush()
}

override def close(): Unit = {
try { flushBatch() } catch {
case NonFatal(e) => CloseQuietly(Seq(writer, observer)).foreach(e.addSuppressed); throw e
case NonFatal(e) => CloseQuietly(Seq(writer, observerVal)).foreach(e.addSuppressed); throw e
}
CloseQuietly.raise(Seq(writer, observer))
CloseQuietly.raise(Seq(writer, observerVal))
}

private def flushBatch(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.locationtech.jts.geom.Envelope
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: Option[FileSystemObserver]): FileSystemWriter = {
val sftConf = new Configuration(context.conf)
StorageConfiguration.setSft(sftConf, metadata.sft)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new FileBasedMetadataCallback(partition, action, file))
Expand Down Expand Up @@ -76,19 +76,21 @@ object ParquetFileSystemStorage extends LazyLogging {
sft: SimpleFeatureType,
file: Path,
conf: Configuration,
observer: FileSystemObserver = NoOpObserver,
observer: Option[FileSystemObserver] = None,
callback: (Envelope, Long) => Unit = ((_, _) => {})
) extends FileSystemWriter {

private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build()
private val observerVal = observer.getOrElse(NoOpObserver)

override def write(f: SimpleFeature): Unit = {
writer.write(f)
observer.write(f)
observerVal.write(f)
}
override def flush(): Unit = observer.flush()
override def flush(): Unit = observerVal.flush()
override def close(): Unit = {
CloseQuietly(Seq(writer, observer)).foreach(e => throw e)
CloseQuietly(Seq(writer, observerVal)).foreach(e => throw e)

if (FileValidationEnabled.get.toBoolean) {
validateParquetFile(file)
}
Expand Down

0 comments on commit 830d1c6

Please sign in to comment.