diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala index 27356a8395a..af9fe87e6cb 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala @@ -9,22 +9,22 @@ package org.locationtech.geomesa.fs.storage.parquet import com.typesafe.scalalogging.LazyLogging +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.hadoop.example.GroupReadSupport import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.hadoop.example.GroupReadSupport import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.factory.FastFilterFactory import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api._ +import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled} import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver -import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled} import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter import org.locationtech.geomesa.utils.io.CloseQuietly import org.locationtech.jts.geom.Envelope diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala new file mode 100644 index 00000000000..88528378e98 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetMetadataBuilder.scala @@ -0,0 +1,81 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.parquet.io + +import org.geotools.api.feature.`type`.GeometryDescriptor +import org.geotools.api.feature.simple.SimpleFeatureType +import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration +import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.{Encoding, GeoParquetSchemaKey, SchemaVersionKey} +import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType +import org.locationtech.geomesa.utils.text.StringSerialization.alphaNumericSafeString +import org.locationtech.jts.geom.Envelope + +import scala.collection.JavaConverters._ + +class SimpleFeatureParquetMetadataBuilder(sft: SimpleFeatureType, schemaVersion: Integer) { + private var geoParquetMetadata: String = null; + + /** + * See https://geoparquet.org/releases/v1.0.0/schema.json + * + * @param sft simple feature type + * @return + */ + def withGeoParquetMetadata(envs: Array[Envelope]): SimpleFeatureParquetMetadataBuilder = { + val geomField = sft.getGeomField + + if (geomField != null) { + val primaryColumn = alphaNumericSafeString(geomField) + val columns = { + val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd} + geometryDescriptors.indices.map(i => columnMetadata(geometryDescriptors(i), envs(i))).mkString(",") + } + + geoParquetMetadata = s"""{"version":"1.0.0","primary_column":"$primaryColumn","columns":{$columns}}""" + } + + this + } + + private def columnMetadata(geom: GeometryDescriptor, bbox: Envelope): String = { + // TODO "Z" for 3d, minz/maxz for bbox + val geomTypes = { + val types = ObjectType.selectType(geom).last match { + case ObjectType.POINT => """"Point"""" + case ObjectType.LINESTRING => """"LineString"""" + case ObjectType.POLYGON => """"Polygon"""" + case ObjectType.MULTILINESTRING => """"MultiLineString"""" + case ObjectType.MULTIPOLYGON => """"MultiPolygon"""" + case ObjectType.MULTIPOINT => """"MultiPoint"""" + case ObjectType.GEOMETRY_COLLECTION => """"GeometryCollection"""" + case ObjectType.GEOMETRY => null + } + Seq(types).filter(_ != null) + } + // note: don't provide crs, as default is EPSG:4326 with longitude first, which is our default/only crs + + def stringify(geomName: String, encoding: String, geometryTypes: Seq[String], bbox: Envelope): String = { + val bboxString = s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]" + s""""$geomName":{"encoding":"$encoding","geometry_types":[${geometryTypes.mkString(",")}],"bbox":$bboxString}""" + } + + val geomName = alphaNumericSafeString(geom.getLocalName) + stringify(geomName, Encoding, geomTypes, bbox) + } + + def build(): java.util.Map[String, String] = { + Map( + StorageConfiguration.SftNameKey -> sft.getTypeName, + StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true), + SchemaVersionKey -> schemaVersion.toString, + GeoParquetSchemaKey -> geoParquetMetadata + ).asJava + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala index f742c8de03f..db5b7c920c5 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala @@ -16,14 +16,13 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.Types.BasePrimitiveBuilder import org.apache.parquet.schema._ -import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor} +import org.geotools.api.feature.`type`.AttributeDescriptor import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.CurrentSchemaVersion import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes} import org.locationtech.geomesa.utils.text.StringSerialization -import org.locationtech.jts.geom.Envelope /** * A paired simple feature type and parquet schema @@ -33,19 +32,10 @@ import org.locationtech.jts.geom.Envelope */ case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageType, version: Integer = CurrentSchemaVersion) { - import SimpleFeatureParquetSchema.{GeoParquetSchemaKey, SchemaVersionKey} - - import scala.collection.JavaConverters._ - /** * Parquet file metadata */ - lazy val metadata: java.util.Map[String, String] = Map( - StorageConfiguration.SftNameKey -> sft.getTypeName, - StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true), - SchemaVersionKey -> version.toString, - GeoParquetSchemaKey -> null - ).asJava + val metadata = new SimpleFeatureParquetMetadataBuilder(sft, version) /** * Gets the name of the parquet field for the given simple feature type attribute @@ -59,7 +49,6 @@ case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageTyp object SimpleFeatureParquetSchema { import StringSerialization.alphaNumericSafeString - import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType import scala.collection.JavaConverters._ @@ -72,55 +61,6 @@ object SimpleFeatureParquetSchema { val Encoding = "WKB" val GeoParquetSchemaKey = "geo" - /** - * See https://geoparquet.org/releases/v1.0.0/schema.json - * - * @param sft simple feature type - * @return - */ - def geoParquetMetadata(sft: SimpleFeatureType, bboxes: Array[Envelope]): String = { - val geomField = sft.getGeomField - - // If the sft has no geometry field, then omit the GeoParquet metadata entirely - if (geomField == null) { - "" - } else { - val primaryColumn = alphaNumericSafeString(geomField) - val columns = { - val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd} - geometryDescriptors.indices.map(i => geoParquetMetadata(geometryDescriptors(i), bboxes(i))).mkString(",") - } - - s"""{"version":"1.0.0","primary_column":"$primaryColumn","columns":{$columns}}""" - } - } - - def geoParquetMetadata(geom: GeometryDescriptor, bbox: Envelope): String = { - // TODO "Z" for 3d, minz/maxz for bbox - val geomTypes = { - val types = ObjectType.selectType(geom).last match { - case ObjectType.POINT => """"Point"""" - case ObjectType.LINESTRING => """"LineString"""" - case ObjectType.POLYGON => """"Polygon"""" - case ObjectType.MULTILINESTRING => """"MultiLineString"""" - case ObjectType.MULTIPOLYGON => """"MultiPolygon"""" - case ObjectType.MULTIPOINT => """"MultiPoint"""" - case ObjectType.GEOMETRY_COLLECTION => """"GeometryCollection"""" - case ObjectType.GEOMETRY => null - } - Seq(types).filter(_ != null) - } - // note: don't provide crs, as default is EPSG:4326 with longitude first, which is our default/only crs - - def stringify(geomName: String, encoding: String, geometryTypes: Seq[String], bbox: Envelope): String = { - val bboxString = s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]" - s""""$geomName":{"encoding":"$encoding","geometry_types":[${geometryTypes.mkString(",")}],"bbox":$bboxString}""" - } - - val geomName = alphaNumericSafeString(geom.getLocalName) - stringify(geomName, Encoding, geomTypes, bbox) - } - /** * Extract the simple feature type from a parquet read context. The read context * contains both file metadata and the provided read conf diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala index d855721dbde..c2a67cb1a84 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureReadSupport.scala @@ -17,7 +17,6 @@ import org.apache.parquet.schema.MessageType import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.geometry.jts.JTSFactoryFinder import org.locationtech.geomesa.features.ScalaSimpleFeature -import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.SchemaVersionKey import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureReadSupport.SimpleFeatureRecordMaterializer import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType @@ -31,16 +30,14 @@ import scala.collection.mutable.ArrayBuffer class SimpleFeatureReadSupport extends ReadSupport[SimpleFeature] { private var schema: SimpleFeatureParquetSchema = null - private var schemaVersion: Integer = null override def init(context: InitContext): ReadContext = { schema = SimpleFeatureParquetSchema.read(context).getOrElse { throw new IllegalArgumentException("Could not extract SimpleFeatureType from read context") } - schemaVersion = schema.metadata.get(SchemaVersionKey).toInt // ensure that our read schema matches the geomesa parquet version - new ReadContext(schema.schema, schema.metadata) + new ReadContext(schema.schema, schema.metadata.build()) } override def prepareForRead( @@ -88,7 +85,7 @@ object SimpleFeatureReadSupport { class SimpleFeatureRecordMaterializer(schema: SimpleFeatureParquetSchema) extends RecordMaterializer[SimpleFeature] { - private val converter = new SimpleFeatureGroupConverter(schema.sft, schema.metadata.get(SchemaVersionKey).toInt) + private val converter = new SimpleFeatureGroupConverter(schema.sft, schema.version.toInt) override def getRootConverter: GroupConverter = converter override def getCurrentRecord: SimpleFeature = converter.materialize } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala index 2cadb42af81..794b0e65721 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala @@ -15,8 +15,6 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.MetadataObserver -import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration -import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.{GeoParquetSchemaKey, SchemaVersionKey} import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType import org.locationtech.geomesa.utils.text.WKBUtils @@ -24,7 +22,6 @@ import org.locationtech.jts.geom._ import java.nio.ByteBuffer import java.util.{Date, UUID} -import scala.collection.JavaConverters._ class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => {})) extends WriteSupport[SimpleFeature] { @@ -87,30 +84,21 @@ class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => } this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft) - new WriteContext(schema.schema, schema.metadata) + new WriteContext(schema.schema, schema.metadata.build()) } - // called once at the end after all SimpleFeatures are written + // called once at the end after all SimpleFeatures are written to the file override def finalizeWrite(): FinalizedWriteContext = { // Get the bounding boxes that span each geometry type val bboxes = observer.getBoundingBoxes observer.close() - // If the SFT has no geometries, then there's no need to create GeoParquet metadata + // Omit GeoParquet metadata if the SFT has no geometries if (bboxes.isEmpty) { - return new FinalizedWriteContext(schema.metadata) + new FinalizedWriteContext(schema.metadata.build()) + } else { + new FinalizedWriteContext(schema.metadata.withGeoParquetMetadata(bboxes).build()) } - - // TODO: not an elegant way to do it - // somehow trying to mutate the map, e.g. by calling metadata.put(GeoParquetSchemaKey, result), causes empty parquet files to be written - val newMetadata: java.util.Map[String, String] = Map( - StorageConfiguration.SftNameKey -> schema.metadata.get(StorageConfiguration.SftNameKey), - StorageConfiguration.SftSpecKey -> schema.metadata.get(StorageConfiguration.SftSpecKey), - SchemaVersionKey -> schema.metadata.get(SchemaVersionKey), - GeoParquetSchemaKey -> SimpleFeatureParquetSchema.geoParquetMetadata(schema.sft, bboxes) - ).asJava - - new FinalizedWriteContext(newMetadata) } // called per block diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala index 63faeec862d..55e76cc5ff7 100644 --- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala +++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala @@ -8,10 +8,8 @@ package org.locationtech.geomesa.fs.tools.ingest -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.ParquetFileReader import org.geotools.api.data.{DataStoreFinder, Query, Transaction}