Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Commit

Permalink
fix read GeoJSON hdfs issue in UtilsShape.readGeoJSONMultiPolygonLong…
Browse files Browse the repository at this point in the history
…Attribute
  • Loading branch information
aklink committed Jul 16, 2018
1 parent c82e9bd commit 8d260b9
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions src/main/scala/biggis/landuse/spark/examples/UtilsShape.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package biggis.landuse.spark.examples
import geotrellis.util.LazyLogging
import geotrellis.shapefile.ShapeFileReader
import geotrellis.shapefile.ShapeFileReader.SimpleFeatureWrapper
import geotrellis.vector.{Extent, MultiPolygon, Feature}
import geotrellis.vector.{Extent, Feature, MultiPolygon}
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

import scala.collection.JavaConverters._
Expand All @@ -12,14 +12,18 @@ import geotrellis.proj4.CRS
import geotrellis.vector.io.json.JsonFeatureCollection
import geotrellis.vector.io.json._
import geotrellis.vector.io._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.spark.SparkContext
import spray.json._
import spray.json.DefaultJsonProtocol

/**
* Created by ak on 22.06.2017.
*/
object UtilsShape extends LazyLogging{

def readShapefileMultiPolygonLongAttribute(shapefileName: String, attribName: String)(implicit targetcrs : Option[CRS] = None): List[Feature[MultiPolygon,Long]] = {
def readShapefileMultiPolygonLongAttribute(shapefileName: String, attribName: String)(implicit targetcrs : Option[CRS] = None, sc: SparkContext): List[Feature[MultiPolygon,Long]] = {
if (shapefileName.contains(".shp")) {
ShapeFileReader.readSimpleFeatures(shapefileName)
.filter { feat =>
Expand All @@ -35,9 +39,9 @@ object UtilsShape extends LazyLogging{
List[Feature[MultiPolygon,Long]]()
}
}
def readGeoJSONMultiPolygonLongAttribute(geojsonName: String, attribName: String)(implicit targetcrs : Option[CRS] = None): List[Feature[MultiPolygon,Long]] = {
def readGeoJSONMultiPolygonLongAttribute(geojsonName: String, attribName: String)(implicit targetcrs : Option[CRS] = None, sc: SparkContext): List[Feature[MultiPolygon,Long]] = {
if(geojsonName.contains(".geojson")){
val collection = GeoJson.fromFile[WithCrs[JsonFeatureCollection]](geojsonName) //Source.fromFile(geojsonName, "UTF-8").mkString.parseGeoJson[WithCrs[JsonFeatureCollection]]
val collection = fromFileHdfs[WithCrs[JsonFeatureCollection]](geojsonName)//GeoJson.fromFile[WithCrs[JsonFeatureCollection]](geojsonName) //Source.fromFile(geojsonName, "UTF-8").mkString.parseGeoJson[WithCrs[JsonFeatureCollection]]

case class Landcover(landcover: Long)
object UtilsShapeJsonProtocol extends DefaultJsonProtocol {
Expand Down Expand Up @@ -66,8 +70,38 @@ object UtilsShape extends LazyLogging{
List[Feature[MultiPolygon,Long]]()
}
}
def fromFileHdfs[T: JsonReader](path: String)(implicit sc: SparkContext) = {
val src = openHdfs(new Path(path)) //val src = scala.io.Source.fromFile(path)
val txt =
try {
scala.io.Source.fromInputStream(src).mkString
} finally {
src.close
}
GeoJson.parse[T](txt)
}
def openHdfs(path: Path)(implicit sc: SparkContext): FSDataInputStream = {
val conf: Configuration = sc.hadoopConfiguration
val fs = path.getFileSystem(conf)
val valid = fs.exists(path)
val isFile = fs.isFile(path)
val isDir = fs.isDirectory(path)
val src = if(isDir){
// Fix Ingest into HDFS issue (directory is created for each file with same name as file)
val status = fs.listStatus(path) //val status = fs.getStatus(pathHdfs)
val filelist = status.map( file => file.getPath ) //fs.listFiles(pathHdfs,false)
val file = if(filelist.length == 1) Some(fs.open(filelist(0))) else None
if(file.nonEmpty)
file.get // Open file in hdfs (contained in dir with same name)
else
fs.open(path) // Unhandled - will cause exception
} else{
fs.open(path) // Open file in hdfs
}
src
}

def readShapefileMultiPolygonDoubleAttribute(shapefileName: String, attribName: String)(implicit targetcrs : Option[CRS] = None): List[Feature[MultiPolygon,Double]] = {
def readShapefileMultiPolygonDoubleAttribute(shapefileName: String, attribName: String)(implicit targetcrs : Option[CRS] = None, sc: SparkContext): List[Feature[MultiPolygon,Double]] = {
readShapefileMultiPolygonLongAttribute(shapefileName, attribName).map{ feature => Feature(feature.geom,feature.data.toDouble) }
}
def getExtent(mps : List[Feature[MultiPolygon,Any]]) : Extent = {
Expand Down

0 comments on commit 8d260b9

Please sign in to comment.