Skip to content

Commit

Permalink
adding an example app with a drell yatest file
Browse files Browse the repository at this point in the history
  • Loading branch information
vkhristenko committed Dec 19, 2016
1 parent 656a8ee commit dbc5657
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ organization := "org.diana-hep"

licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

version := "0.1.1"
version := "0.1.3"
//isSnapshot := true

scalaVersion := "2.11.8"
Expand Down
Binary file added src/main/resources/test_drellyan.root
Binary file not shown.
56 changes: 56 additions & 0 deletions src/main/scala/org/dianahep/sparkroot/apps/HiggsExampleApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.dianahep.sparkroot.apps

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

//import org.apache.spark.implicits._

import org.dianahep.sparkroot._

/**
* A simple example of reading a ROOT file into Spark's DataFrame.
* Define a case class to properly cast the Dataset[X]
* and print it out
*
* @author Viktor Khristenko
*/

object HiggsExampleApp {
case class Object();
case class Track(obj: Object, charge: Int, pt: Float, pterr: Float, eta: Float, phi: Float);
case class Electron(track: Track, ids: Seq[Boolean], trackIso: Float, ecalIso: Float, hcalIso: Float, dz: Double, isPF: Boolean, convVeto: Boolean);
case class Event(electrons: Seq[Electron]);

def main(args: Array[String]) {
if (args.size!=0) {
val inputFileName = args(0)
val conf = new SparkConf().setAppName("Higgs Example Application")
val spark = SparkSession.builder()
.master("local")
.appName("Higgs Example Application")
.getOrCreate()

doWork(spark, inputFileName)
spark.stop()
}
else {
println("No ROOT file provided")
}
}

def doWork(spark: SparkSession, inputName: String) = {
// load the ROOT file
val df = spark.sqlContext.read.root(inputName)

// see https://issues.apache.org/jira/browse/SPARK-13540
import spark.implicits._

// build the RDD out of the Dataset and filter out right away
val rdd = df.select("Electrons").as[Event].filter(_.electrons.size!=0).rdd

// print all the events where electrons are present
for (x <- rdd) println(x)
}
}
2 changes: 0 additions & 2 deletions src/main/scala/org/dianahep/sparkroot/core/TypeSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,6 @@ case class SRVector(
val size = buffer.readInt

entry += 1L;
println(s"reading vector unsplitted $name")
for (i <- 0 until size) yield t.read(buffer)
}
}
Expand Down Expand Up @@ -750,7 +749,6 @@ case class SRVector(
val data = (for (x <- composite.members)
yield {
//we own the buffer
println(s"reading Array for member ${x.name} of composite: ${composite.name}")
x.readArray(buffer, size)
// increment the entry
}).transpose.map(Row.fromSeq(_))
Expand Down

0 comments on commit dbc5657

Please sign in to comment.