Skip to content

Commit

Permalink
Refactor logic for creating Parquet file metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adeet1 committed Jun 7, 2024
1 parent 48f7bd5 commit 2040c4e
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
package org.locationtech.geomesa.fs.storage.parquet

import com.typesafe.scalalogging.LazyLogging
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly
import org.locationtech.jts.geom.Envelope
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/***********************************************************************
* Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.parquet.io

import org.geotools.api.feature.`type`.GeometryDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.{Encoding, GeoParquetSchemaKey, SchemaVersionKey}
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
import org.locationtech.geomesa.utils.text.StringSerialization.alphaNumericSafeString
import org.locationtech.jts.geom.Envelope

import scala.collection.JavaConverters._

class SimpleFeatureParquetMetadataBuilder(sft: SimpleFeatureType, schemaVersion: Integer) {
private var geoParquetMetadata: String = null;

/**
* See https://geoparquet.org/releases/v1.0.0/schema.json
*
* @param sft simple feature type
* @return
*/
def withGeoParquetMetadata(envs: Array[Envelope]): SimpleFeatureParquetMetadataBuilder = {
val geomField = sft.getGeomField

if (geomField != null) {
val primaryColumn = alphaNumericSafeString(geomField)
val columns = {
val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd}
geometryDescriptors.indices.map(i => columnMetadata(geometryDescriptors(i), envs(i))).mkString(",")
}

geoParquetMetadata = s"""{"version":"1.0.0","primary_column":"$primaryColumn","columns":{$columns}}"""
}

this
}

private def columnMetadata(geom: GeometryDescriptor, bbox: Envelope): String = {
// TODO "Z" for 3d, minz/maxz for bbox
val geomTypes = {
val types = ObjectType.selectType(geom).last match {
case ObjectType.POINT => """"Point""""
case ObjectType.LINESTRING => """"LineString""""
case ObjectType.POLYGON => """"Polygon""""
case ObjectType.MULTILINESTRING => """"MultiLineString""""
case ObjectType.MULTIPOLYGON => """"MultiPolygon""""
case ObjectType.MULTIPOINT => """"MultiPoint""""
case ObjectType.GEOMETRY_COLLECTION => """"GeometryCollection""""
case ObjectType.GEOMETRY => null
}
Seq(types).filter(_ != null)
}
// note: don't provide crs, as default is EPSG:4326 with longitude first, which is our default/only crs

def stringify(geomName: String, encoding: String, geometryTypes: Seq[String], bbox: Envelope): String = {
val bboxString = s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]"
s""""$geomName":{"encoding":"$encoding","geometry_types":[${geometryTypes.mkString(",")}],"bbox":$bboxString}"""
}

val geomName = alphaNumericSafeString(geom.getLocalName)
stringify(geomName, Encoding, geomTypes, bbox)
}

def build(): java.util.Map[String, String] = {
Map(
StorageConfiguration.SftNameKey -> sft.getTypeName,
StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true),
SchemaVersionKey -> schemaVersion.toString,
GeoParquetSchemaKey -> geoParquetMetadata
).asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.Types.BasePrimitiveBuilder
import org.apache.parquet.schema._
import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.CurrentSchemaVersion
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.text.StringSerialization
import org.locationtech.jts.geom.Envelope

/**
* A paired simple feature type and parquet schema
Expand All @@ -33,19 +32,10 @@ import org.locationtech.jts.geom.Envelope
*/
case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageType, version: Integer = CurrentSchemaVersion) {

import SimpleFeatureParquetSchema.{GeoParquetSchemaKey, SchemaVersionKey}

import scala.collection.JavaConverters._

/**
* Parquet file metadata
*/
lazy val metadata: java.util.Map[String, String] = Map(
StorageConfiguration.SftNameKey -> sft.getTypeName,
StorageConfiguration.SftSpecKey -> SimpleFeatureTypes.encodeType(sft, includeUserData = true),
SchemaVersionKey -> version.toString,
GeoParquetSchemaKey -> null
).asJava
val metadata = new SimpleFeatureParquetMetadataBuilder(sft, version)

/**
* Gets the name of the parquet field for the given simple feature type attribute
Expand All @@ -59,7 +49,6 @@ case class SimpleFeatureParquetSchema(sft: SimpleFeatureType, schema: MessageTyp
object SimpleFeatureParquetSchema {

import StringSerialization.alphaNumericSafeString
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

import scala.collection.JavaConverters._

Expand All @@ -72,55 +61,6 @@ object SimpleFeatureParquetSchema {
val Encoding = "WKB"
val GeoParquetSchemaKey = "geo"

/**
* See https://geoparquet.org/releases/v1.0.0/schema.json
*
* @param sft simple feature type
* @return
*/
def geoParquetMetadata(sft: SimpleFeatureType, bboxes: Array[Envelope]): String = {
val geomField = sft.getGeomField

// If the sft has no geometry field, then omit the GeoParquet metadata entirely
if (geomField == null) {
""
} else {
val primaryColumn = alphaNumericSafeString(geomField)
val columns = {
val geometryDescriptors = sft.getAttributeDescriptors.toArray.collect {case gd: GeometryDescriptor => gd}
geometryDescriptors.indices.map(i => geoParquetMetadata(geometryDescriptors(i), bboxes(i))).mkString(",")
}

s"""{"version":"1.0.0","primary_column":"$primaryColumn","columns":{$columns}}"""
}
}

def geoParquetMetadata(geom: GeometryDescriptor, bbox: Envelope): String = {
// TODO "Z" for 3d, minz/maxz for bbox
val geomTypes = {
val types = ObjectType.selectType(geom).last match {
case ObjectType.POINT => """"Point""""
case ObjectType.LINESTRING => """"LineString""""
case ObjectType.POLYGON => """"Polygon""""
case ObjectType.MULTILINESTRING => """"MultiLineString""""
case ObjectType.MULTIPOLYGON => """"MultiPolygon""""
case ObjectType.MULTIPOINT => """"MultiPoint""""
case ObjectType.GEOMETRY_COLLECTION => """"GeometryCollection""""
case ObjectType.GEOMETRY => null
}
Seq(types).filter(_ != null)
}
// note: don't provide crs, as default is EPSG:4326 with longitude first, which is our default/only crs

def stringify(geomName: String, encoding: String, geometryTypes: Seq[String], bbox: Envelope): String = {
val bboxString = s"[${bbox.getMinX}, ${bbox.getMinY}, ${bbox.getMaxX}, ${bbox.getMaxY}]"
s""""$geomName":{"encoding":"$encoding","geometry_types":[${geometryTypes.mkString(",")}],"bbox":$bboxString}"""
}

val geomName = alphaNumericSafeString(geom.getLocalName)
stringify(geomName, Encoding, geomTypes, bbox)
}

/**
* Extract the simple feature type from a parquet read context. The read context
* contains both file metadata and the provided read conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.apache.parquet.schema.MessageType
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.geometry.jts.JTSFactoryFinder
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.SchemaVersionKey
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureReadSupport.SimpleFeatureRecordMaterializer
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
Expand All @@ -31,16 +30,14 @@ import scala.collection.mutable.ArrayBuffer
class SimpleFeatureReadSupport extends ReadSupport[SimpleFeature] {

private var schema: SimpleFeatureParquetSchema = null
private var schemaVersion: Integer = null

override def init(context: InitContext): ReadContext = {
schema = SimpleFeatureParquetSchema.read(context).getOrElse {
throw new IllegalArgumentException("Could not extract SimpleFeatureType from read context")
}
schemaVersion = schema.metadata.get(SchemaVersionKey).toInt

// ensure that our read schema matches the geomesa parquet version
new ReadContext(schema.schema, schema.metadata)
new ReadContext(schema.schema, schema.metadata.build())
}

override def prepareForRead(
Expand Down Expand Up @@ -88,7 +85,7 @@ object SimpleFeatureReadSupport {

class SimpleFeatureRecordMaterializer(schema: SimpleFeatureParquetSchema)
extends RecordMaterializer[SimpleFeature] {
private val converter = new SimpleFeatureGroupConverter(schema.sft, schema.metadata.get(SchemaVersionKey).toInt)
private val converter = new SimpleFeatureGroupConverter(schema.sft, schema.version.toInt)
override def getRootConverter: GroupConverter = converter
override def getCurrentRecord: SimpleFeature = converter.materialize
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer}
import org.geotools.api.feature.`type`.{AttributeDescriptor, GeometryDescriptor}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.MetadataObserver
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema.{GeoParquetSchemaKey, SchemaVersionKey}
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.geomesa.utils.text.WKBUtils
import org.locationtech.jts.geom._

import java.nio.ByteBuffer
import java.util.{Date, UUID}
import scala.collection.JavaConverters._

class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => {})) extends WriteSupport[SimpleFeature] {

Expand Down Expand Up @@ -87,30 +84,21 @@ class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) =>
}
this.writer = SimpleFeatureWriteSupport.SimpleFeatureWriter(schema.sft)

new WriteContext(schema.schema, schema.metadata)
new WriteContext(schema.schema, schema.metadata.build())
}

// called once at the end after all SimpleFeatures are written
// called once at the end after all SimpleFeatures are written to the file
override def finalizeWrite(): FinalizedWriteContext = {
// Get the bounding boxes that span each geometry type
val bboxes = observer.getBoundingBoxes
observer.close()

// If the SFT has no geometries, then there's no need to create GeoParquet metadata
// Omit GeoParquet metadata if the SFT has no geometries
if (bboxes.isEmpty) {
return new FinalizedWriteContext(schema.metadata)
new FinalizedWriteContext(schema.metadata.build())
} else {
new FinalizedWriteContext(schema.metadata.withGeoParquetMetadata(bboxes).build())
}

// TODO: not an elegant way to do it
// somehow trying to mutate the map, e.g. by calling metadata.put(GeoParquetSchemaKey, result), causes empty parquet files to be written
val newMetadata: java.util.Map[String, String] = Map(
StorageConfiguration.SftNameKey -> schema.metadata.get(StorageConfiguration.SftNameKey),
StorageConfiguration.SftSpecKey -> schema.metadata.get(StorageConfiguration.SftSpecKey),
SchemaVersionKey -> schema.metadata.get(SchemaVersionKey),
GeoParquetSchemaKey -> SimpleFeatureParquetSchema.geoParquetMetadata(schema.sft, bboxes)
).asJava

new FinalizedWriteContext(newMetadata)
}

// called per block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@

package org.locationtech.geomesa.fs.tools.ingest

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.HdfsConfiguration
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.geotools.api.data.{DataStoreFinder, Query, Transaction}
Expand Down

0 comments on commit 2040c4e

Please sign in to comment.