diff --git a/build.sbt b/build.sbt
index c0af1c9..5d31a04 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,6 +1,6 @@
name := "data-tc"
organization in ThisBuild := "io.malcolmgreaves"
-version in ThisBuild := {
+version in ThisBuild := {
val major: Int = 0
val minor: Int = 0
val patch: Int = 0
@@ -8,9 +8,7 @@ version in ThisBuild := {
}
import SharedBuild._
-com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
-ScalariformKeys.preferences := sharedCodeFmt
-
+
lazy val root = project
.in(file("."))
.aggregate(
@@ -21,35 +19,26 @@ lazy val root = project
)
.settings {
publishArtifact := false
- publishLocal := {}
- publish := {}
+ publishLocal := {}
+ publish := {}
}
-lazy val `data-tc-scala` = project
- .in(file("data-tc-scala"))
- .settings {
- publishArtifact := true
- }
+lazy val `data-tc-scala` = project.in(file("data-tc-scala")).settings {
+ publishArtifact := true
+}
-lazy val `data-tc-spark` = project
- .in(file("data-tc-spark"))
- .dependsOn(`data-tc-scala`)
- .settings {
+lazy val `data-tc-spark` =
+ project.in(file("data-tc-spark")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}
-lazy val `data-tc-flink` = project
- .in(file("data-tc-flink"))
- .dependsOn(`data-tc-scala`)
- .settings {
+lazy val `data-tc-flink` =
+ project.in(file("data-tc-flink")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}
-
-lazy val `data-tc-extra` = project
- .in(file("data-tc-extra"))
- .dependsOn(`data-tc-scala`)
- .settings {
+lazy val `data-tc-extra` =
+ project.in(file("data-tc-extra")).dependsOn(`data-tc-scala`).settings {
publishArtifact := true
}
@@ -58,22 +47,20 @@ lazy val publishTasks = subprojects.map { publish.in }
resolvers in ThisBuild := Seq(
// sonatype, maven central
- "Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/",
+ "Sonatype Releases" at "https://oss.sonatype.org/content/repositories/releases/",
"Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/",
-
// bintray
"Scalaz Bintray" at "http://dl.bintray.com/scalaz/releases",
Resolver.bintrayRepo("mfglabs", "maven"),
Resolver.bintrayRepo("dwhjames", "maven"),
-
// etc.
"Confluent" at "http://packages.confluent.io/maven/"
)
// runtime & compiliation
-lazy val javaV = "1.8"
-scalaVersion in ThisBuild := "2.11.8"
+lazy val javaV = "1.8"
+scalaVersion in ThisBuild := "2.11.8"
scalacOptions in ThisBuild := Seq(
"-optimize",
"-deprecation",
@@ -96,12 +83,13 @@ scalacOptions in ThisBuild := Seq(
"-Xfatal-warnings" // Every warning is esclated to an error.
)
javacOptions in ThisBuild := Seq("-source", javaV, "-target", javaV)
-javaOptions in ThisBuild := Seq(
- "-server",
- "-XX:+AggressiveOpts",
+javaOptions in ThisBuild := Seq(
+ "-server",
+ "-XX:+AggressiveOpts",
"-XX:+TieredCompilation",
"-XX:CompileThreshold=100",
"-Xmx3000M",
"-XX:+UseG1GC"
)
+publishArtifact := false
diff --git a/data-tc-extra/build.sbt b/data-tc-extra/build.sbt
index 3412059..7c9abd6 100644
--- a/data-tc-extra/build.sbt
+++ b/data-tc-extra/build.sbt
@@ -2,15 +2,11 @@ name := "data-tc-extra"
import SharedBuild._
-com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
-ScalariformKeys.preferences := sharedCodeFmt
-
addCompilerPlugin(scalaMacros)
-libraryDependencies ++=
+libraryDependencies ++=
extraDeps ++
- testDeps
-
+ testDeps
// doc hacks
@@ -21,7 +17,8 @@ sources in (Compile, doc) ~= (_ filter (_.getName endsWith "DataOps.scala"))
//
// test, runtime settings
//
-fork in run := true
-fork in Test := true
+fork in run := true
+fork in Test := true
parallelExecution in Test := true
+pomExtra := pomExtraInfo
diff --git a/data-tc-extra/src/main/scala/fif/ops/ToMap.scala b/data-tc-extra/src/main/scala/fif/ops/ToMap.scala
index c78be49..6adb595 100644
--- a/data-tc-extra/src/main/scala/fif/ops/ToMap.scala
+++ b/data-tc-extra/src/main/scala/fif/ops/ToMap.scala
@@ -29,12 +29,14 @@ object ToMap extends Serializable {
}
}
- def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[(T, U)]): Map[T, U] = {
+ def apply[T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](
+ data: D[(T, U)]): Map[T, U] = {
implicit val _ = identity[(T, U)] _
apply[(T, U), T, U, D](data)
}
- def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])(implicit ev: A <:< (T, U)): Map[T, U] = {
+ def apply[A, T: ClassTag, U: ClassTag: Semigroup, D[_]: Data](data: D[A])(
+ implicit ev: A <:< (T, U)): Map[T, U] = {
val sg = implicitly[Semigroup[U]]
diff --git a/data-tc-extra/src/test/scala/fif/TestHelpers.scala b/data-tc-extra/src/test/scala/fif/TestHelpers.scala
index 3dc4382..df9ee5f 100644
--- a/data-tc-extra/src/test/scala/fif/TestHelpers.scala
+++ b/data-tc-extra/src/test/scala/fif/TestHelpers.scala
@@ -8,4 +8,4 @@ object TestHelpers {
override def combine(a: Int, b: Int) = a + b
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-extra/src/test/scala/fif/ops/SumTest.scala b/data-tc-extra/src/test/scala/fif/ops/SumTest.scala
index 3f04b2c..fdfe1f9 100644
--- a/data-tc-extra/src/test/scala/fif/ops/SumTest.scala
+++ b/data-tc-extra/src/test/scala/fif/ops/SumTest.scala
@@ -27,4 +27,3 @@ class SumTest extends FunSuite {
}
}
-
diff --git a/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala b/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala
index 513baaa..d5a73a1 100644
--- a/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala
+++ b/data-tc-extra/src/test/scala/fif/ops/ToMapTest.scala
@@ -19,7 +19,8 @@ class ToMapTest extends FunSuite {
test("ToMap list of many elements") {
val l = Traversable(
- ("hello", 10), ("hello", 20),
+ ("hello", 10),
+ ("hello", 20),
("world", 30),
("sunday funday", 40),
("sunday funday", 50),
@@ -56,4 +57,4 @@ class ToMapTest extends FunSuite {
assert(ToMap.combine(first, second) == expected)
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-flink/build.sbt b/data-tc-flink/build.sbt
index 29cd07e..182047d 100644
--- a/data-tc-flink/build.sbt
+++ b/data-tc-flink/build.sbt
@@ -2,16 +2,15 @@ name := "data-tc-flink"
import SharedBuild._
-com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
-ScalariformKeys.preferences := sharedCodeFmt
-
addCompilerPlugin(scalaMacros)
-libraryDependencies ++=
+libraryDependencies ++=
flinkTcDeps ++
- testDeps
+ testDeps
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v")
testOptions in Test += Tests.Argument("-oF")
-fork in Test := true
+fork in Test := true
parallelExecution in Test := true
+
+pomExtra := pomExtraInfo
diff --git a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala
index e8d8614..9c8d4cd 100644
--- a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala
+++ b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkData.scala
@@ -10,11 +10,11 @@ import scala.util.Try
import fif.Data
/**
- * Implementation of the Data typeclass with the Flink DataSet type.
- *
- * Since DataSet instances are lazy, this typeclass implementation's methods
- * have lazy (vs. eager) semantics.
- */
+ * Implementation of the Data typeclass with the Flink DataSet type.
+ *
+ * Since DataSet instances are lazy, this typeclass implementation's methods
+ * have lazy (vs. eager) semantics.
+ */
case object FlinkData extends Data[DataSet] {
import Data.ops._
@@ -24,7 +24,8 @@ case object FlinkData extends Data[DataSet] {
data.map(f)
}
- override def mapParition[A, B: ClassTag](d: DataSet[A])(f: Iterable[A] => Iterable[B]): DataSet[B] = {
+ override def mapParition[A, B: ClassTag](d: DataSet[A])(
+ f: Iterable[A] => Iterable[B]): DataSet[B] = {
implicit val ti = FlinkHelper.typeInfo[B]
d.mapPartition { partition =>
f(partition.toIterable)
@@ -41,7 +42,8 @@ case object FlinkData extends Data[DataSet] {
()
}
- override def foreachPartition[A](d: DataSet[A])(f: Iterable[A] => Any): Unit = {
+ override def foreachPartition[A](d: DataSet[A])(
+ f: Iterable[A] => Any): Unit = {
// ignore this map operation's return type
implicit val ti = FlinkHelper.unitTypeInformation
d.mapPartition { partition =>
@@ -54,24 +56,22 @@ case object FlinkData extends Data[DataSet] {
override def filter[A](d: DataSet[A])(f: A => Boolean): DataSet[A] =
d.filter(f)
- override def aggregate[A, B: ClassTag](data: DataSet[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = {
+ override def aggregate[A, B: ClassTag](data: DataSet[A])(
+ zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B = {
implicit val ti = FlinkHelper.typeInfo[B]
- data
- .mapPartition { partition =>
- Seq(partition.foldLeft(zero)(seqOp))
- }
- .reduce(combOp)
- .collect()
- .reduce(combOp)
+ data.mapPartition { partition =>
+ Seq(partition.foldLeft(zero)(seqOp))
+ }.reduce(combOp).collect().reduce(combOp)
}
/**
- * Unimplemented!
- *
- * Flink doesn't support an API for total sorting. Must determine a correct
- * implementation using the lower-level API.
- */
- override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) ⇒ B)(implicit ord: math.Ordering[B]): DataSet[A] =
+ * Unimplemented!
+ *
+ * Flink doesn't support an API for total sorting. Must determine a correct
+ * implementation using the lower-level API.
+ */
+ override def sortBy[A, B: ClassTag](d: DataSet[A])(f: (A) ⇒ B)(
+ implicit ord: math.Ordering[B]): DataSet[A] =
???
override def take[A](d: DataSet[A])(k: Int): Traversable[A] =
@@ -83,40 +83,38 @@ case object FlinkData extends Data[DataSet] {
override def toSeq[A](d: DataSet[A]): Seq[A] =
d.collect()
- override def flatMap[A, B: ClassTag](d: DataSet[A])(f: A => TraversableOnce[B]): DataSet[B] = {
+ override def flatMap[A, B: ClassTag](d: DataSet[A])(
+ f: A => TraversableOnce[B]): DataSet[B] = {
implicit val ti = FlinkHelper.typeInfo[B]
d.flatMap(f)
}
- override def flatten[A, B: ClassTag](d: DataSet[A])(implicit asDataSet: A => TraversableOnce[B]): DataSet[B] =
+ override def flatten[A, B: ClassTag](d: DataSet[A])(
+ implicit asDataSet: A => TraversableOnce[B]): DataSet[B] =
flatMap(d)(asDataSet)
- override def groupBy[A, B: ClassTag](data: DataSet[A])(f: A => B): DataSet[(B, Iterable[A])] = {
+ override def groupBy[A, B: ClassTag](data: DataSet[A])(
+ f: A => B): DataSet[(B, Iterable[A])] = {
val reducedToMaps = {
- implicit val ti: TypeInformation[scala.collection.immutable.Map[B, Iterable[A]]] =
+ implicit val ti: TypeInformation[
+ scala.collection.immutable.Map[B, Iterable[A]]] =
FlinkHelper.typeInfo(ClassTag(classOf[Map[B, Iterable[A]]]))
- data
- .mapPartition { partition =>
- Seq(
- partition
- .toIterable
- .groupBy(f)
- .map {
- case (key, values) => (key, values.toIterable)
- }
- )
- }
- .reduce(FlinkHelper.mapCombine[B, A] _)
+ data.mapPartition { partition =>
+ Seq(
+ partition.toIterable.groupBy(f).map {
+ case (key, values) => (key, values.toIterable)
+ }
+ )
+ }.reduce(FlinkHelper.mapCombine[B, A] _)
}
implicit val ti: TypeInformation[(B, Iterable[A])] =
FlinkHelper.typeInfo(ClassTag(classOf[(B, Iterable[A])]))
- reducedToMaps
- .flatMap(_.toSeq)
+ reducedToMaps.flatMap(_.toSeq)
}
override def reduce[A](d: DataSet[A])(op: (A, A) => A): A =
@@ -129,19 +127,20 @@ case object FlinkData extends Data[DataSet] {
size(d) == 0
/**
- * Unimplemented!
- *
- * Waiting on support coming in Flink 0.10 !
- */
- override def zip[A, B: ClassTag](d: DataSet[A])(that: DataSet[B]): DataSet[(A, B)] =
+ * Unimplemented!
+ *
+ * Waiting on support coming in Flink 0.10 !
+ */
+ override def zip[A, B: ClassTag](d: DataSet[A])(
+ that: DataSet[B]): DataSet[(A, B)] =
???
/**
- * Unimplemented!
- *
- * Waiting on support coming in Flink 0.10 !
- */
+ * Unimplemented!
+ *
+ * Waiting on support coming in Flink 0.10 !
+ */
override def zipWithIndex[A](d: DataSet[A]): DataSet[(A, Long)] =
???
-}
\ No newline at end of file
+}
diff --git a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala
index 92ebe17..7edfd9e 100644
--- a/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala
+++ b/data-tc-flink/src/main/scala/com/nitro/absnlp/FlinkHelper.scala
@@ -9,11 +9,11 @@ import scala.language.higherKinds
import scala.reflect.ClassTag
/**
- * Methods, values, and functions that provide some common functionality
- * necessary for interacting with Flink DataSet objects. Most importantly
- * is the typeInfo method that generates a TypeInformation instance from
- * ClassTag evidence.
- */
+ * Methods, values, and functions that provide some common functionality
+ * necessary for interacting with Flink DataSet objects. Most importantly
+ * is the typeInfo method that generates a TypeInformation instance from
+ * ClassTag evidence.
+ */
object FlinkHelper extends Serializable {
private[absnlp] val productClass: Class[Product] =
@@ -24,7 +24,6 @@ object FlinkHelper extends Serializable {
val fields = c.getFields
if (fields.isEmpty)
1
-
else
fields.foldLeft(0) {
case (result, field) =>
@@ -32,9 +31,9 @@ object FlinkHelper extends Serializable {
}
}
- private type _M[B,A]=Map[B, Iterable[A]]
+ private type _M[B, A] = Map[B, Iterable[A]]
- def mapCombine[B, A](m1: _M[B,A], m2: _M[B,A]): _M[B,A] = {
+ def mapCombine[B, A](m1: _M[B, A], m2: _M[B, A]): _M[B, A] = {
val (larger, smaller) =
if (m1.size > m2.size)
@@ -66,7 +65,7 @@ object FlinkHelper extends Serializable {
new TypeInformation[A] {
- override def canEqual(x: Any): Boolean =
+ override def canEqual(x: Any): Boolean =
x.getClass.isAssignableFrom(ct.runtimeClass)
override lazy val isBasicType: Boolean =
@@ -84,21 +83,19 @@ object FlinkHelper extends Serializable {
override lazy val getTypeClass: Class[A] =
ct.runtimeClass.asInstanceOf[Class[A]]
- override lazy val getGenericParameters: java.util.List[TypeInformation[_]] = {
+ override lazy val getGenericParameters: java.util.List[TypeInformation[
+ _]] = {
import scala.collection.JavaConversions._
val tVars = ct.getClass.getTypeParameters
if (tVars.isEmpty)
emptyTypeInfoList
-
else
- tVars
- .map { typeVariable =>
- val genericClass = typeVariable.getGenericDeclaration
- typeInfo(ClassTag(genericClass))
- }
- .toList
+ tVars.map { typeVariable =>
+ val genericClass = typeVariable.getGenericDeclaration
+ typeInfo(ClassTag(genericClass))
+ }.toList
}
override lazy val isKeyType: Boolean =
@@ -107,19 +104,19 @@ object FlinkHelper extends Serializable {
override lazy val isSortKeyType: Boolean =
isKeyType
- override def createSerializer(config: ExecutionConfig): TypeSerializer[A] =
+ override def createSerializer(
+ config: ExecutionConfig): TypeSerializer[A] =
new KryoSerializer[A](getTypeClass, config)
- override val toString: String =
+ override val toString: String =
s"TypeInformation for ${ct.runtimeClass.toString}"
- override def equals(x: Any): Boolean =
+ override def equals(x: Any): Boolean =
x != null && x.isInstanceOf[TypeInformation[_]] && this == x
- override val hashCode: Int =
+ override val hashCode: Int =
ct.hashCode
}
}
}
-
diff --git a/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala b/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala
index 5619d6d..12eb090 100644
--- a/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala
+++ b/data-tc-flink/src/test/scala/com/nitro/absnlp/FlinkDataTest.scala
@@ -24,11 +24,11 @@ class FlinkDataTest extends FunSuite {
test("test map") {
- def addElementwise10[D[_]: Data](data: D[Int]): D[Int] =
- data.map(_ + 10)
+ def addElementwise10[D[_]: Data](data: D[Int]): D[Int] =
+ data.map(_ + 10)
- def addElementwise10_tc[D[_]](data: D[Int])(implicit ev: Data[D]): D[Int] =
- ev.map(data)(_ + 10)
+ def addElementwise10_tc[D[_]](data: D[Int])(implicit ev: Data[D]): D[Int] =
+ ev.map(data)(_ + 10)
{
val changed = addElementwise10(data)
@@ -47,8 +47,10 @@ class FlinkDataTest extends FunSuite {
test("mapPartition") {
- def mapParition10[D[_]: Data](data: D[Int]): D[Int] =
- data.mapParition { elements => elements.map(_ + 10) }
+ def mapParition10[D[_]: Data](data: D[Int]): D[Int] =
+ data.mapParition { elements =>
+ elements.map(_ + 10)
+ }
val changed = mapParition10(data)
assert(changed != data)
@@ -57,64 +59,67 @@ class FlinkDataTest extends FunSuite {
test("foreach") {
- def testForeach[D[_]: Data](data: D[Int]): Unit =
- data.foreach { x =>
- val res = x >= 1 && x <= 3
- if (!res) throw new RuntimeException
- }
+ def testForeach[D[_]: Data](data: D[Int]): Unit =
+ data.foreach { x =>
+ val res = x >= 1 && x <= 3
+ if (!res) throw new RuntimeException
+ }
testForeach(data)
}
test("foreachPartition") {
- def testForeachPart[D[_]: Data](data: D[Int]): Unit =
- data.foreachPartition(_.foreach { x =>
- val res = x >= 1 && x <= 3
- if (!res) throw new RuntimeException
- })
+ def testForeachPart[D[_]: Data](data: D[Int]): Unit =
+ data.foreachPartition(_.foreach { x =>
+ val res = x >= 1 && x <= 3
+ if (!res) throw new RuntimeException
+ })
testForeachPart(data)
}
test("aggregate") {
- def aggregateTest[D[_]: Data](data: D[Int]): Int =
- data.aggregate(0)(_ + _, _ + _)
+ def aggregateTest[D[_]: Data](data: D[Int]): Int =
+ data.aggregate(0)(_ + _, _ + _)
assert(aggregateTest(data) == 6)
}
ignore("sortBy") {
- def reverseSort[D[_]: Data](data: D[Int]): D[Int] =
- data.sortBy(x => -x)
+ def reverseSort[D[_]: Data](data: D[Int]): D[Int] =
+ data.sortBy(x => -x)
assert(reverseSort(data).collect() == Seq(3, 2, 1))
}
test("take") {
- def testTake[D[_]: Data](data: D[Int]): Boolean =
- data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3)
+ def testTake[D[_]: Data](data: D[Int]): Boolean =
+ data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(
+ 1,
+ 2,
+ 3)
assert(testTake(data))
}
test("toSeq") {
- def testToSeqIs123[D[_]: Data](data: D[Int]): Boolean =
- data.toSeq == Seq(1, 2, 3)
+ def testToSeqIs123[D[_]: Data](data: D[Int]): Boolean =
+ data.toSeq == Seq(1, 2, 3)
assert(testToSeqIs123(data))
}
test("flatMap") {
- def testFlat[D[_]: Data](data: D[Int]): D[Int] =
- data.flatMap { number =>
- (0 until number).map(_ => number)
- }
+ def testFlat[D[_]: Data](data: D[Int]): D[Int] =
+ data.flatMap { number =>
+ (0 until number).map(_ => number)
+ }
val result = testFlat(data)
assert(result.collect() == Seq(1, 2, 2, 3, 3, 3))
@@ -122,8 +127,8 @@ class FlinkDataTest extends FunSuite {
test("flatten") {
- def flattenTest[D[_]: Data](data: D[Seq[Int]]): D[Int] =
- data.flatten
+ def flattenTest[D[_]: Data](data: D[Seq[Int]]): D[Int] =
+ data.flatten
val expanded = data.map(x => Seq(x))
val flattened = flattenTest(expanded)
@@ -132,8 +137,8 @@ class FlinkDataTest extends FunSuite {
test("groupBy") {
- def groupIt[D[_]: Data](data: D[Int]) =
- data.groupBy { _ % 2 == 0 }
+ def groupIt[D[_]: Data](data: D[Int]) =
+ data.groupBy { _ % 2 == 0 }
val evenGroup = groupIt(data).toSeq.toMap
@@ -148,42 +153,41 @@ class FlinkDataTest extends FunSuite {
test("size") {
- def sizeIs3[D[_]: Data](data: D[Int]): Boolean =
- data.size == 3
+ def sizeIs3[D[_]: Data](data: D[Int]): Boolean =
+ data.size == 3
assert(sizeIs3(data))
}
test("reduce") {
- def foo[D[_]: Data](data: D[Int]): Int =
- data.reduce {
- case (a, b) => 1 + a + b
- }
+ def foo[D[_]: Data](data: D[Int]): Int =
+ data.reduce {
+ case (a, b) => 1 + a + b
+ }
val result = foo(data)
assert(result == 8)
}
-
test("filter") {
- def f[D[_]: Data](data: D[Int]): D[Int] =
- data.filter(_ % 2 == 0)
+ def f[D[_]: Data](data: D[Int]): D[Int] =
+ data.filter(_ % 2 == 0)
assert(f(data).collect() == Seq(2))
}
test("headOption") {
- def h[D[_]: Data](data: D[Int]): Option[Int] =
- data.headOption
+ def h[D[_]: Data](data: D[Int]): Option[Int] =
+ data.headOption
assert(h(data) == Some(1))
assert(h(empty[Int]) == None)
}
test("isEmpty") {
- def e[D[_]: Data](data: D[_]): Boolean =
- data.isEmpty
+ def e[D[_]: Data](data: D[_]): Boolean =
+ data.isEmpty
assert(!e(data))
assert(e(empty[Int]))
@@ -196,7 +200,6 @@ class FlinkDataTest extends FunSuite {
// assert(toM(data) == Map(1 -> 1, 2 -> 2, 3 -> 3))
// }
-
// test("sum") {
// def s[D[_]: Data](data: D[Int]): Int =
// fif.ops.Sum(data)
@@ -205,17 +208,17 @@ class FlinkDataTest extends FunSuite {
// }
ignore("zipWithIndex") {
- def foo[D[_]: Data](data: D[Int]): Unit =
- assert(data.zipWithIndex == Seq((1, 0), (2, 1), (3, 2)))
+ def foo[D[_]: Data](data: D[Int]): Unit =
+ assert(data.zipWithIndex == Seq((1, 0), (2, 1), (3, 2)))
foo(data)
}
ignore("zip") {
- def foo[D[_]: Data](data: D[Int]): D[(Int, Int)] =
- data.zip(data)
+ def foo[D[_]: Data](data: D[Int]): D[(Int, Int)] =
+ data.zip(data)
assert(foo(data).collect() == Seq((1, 1), (2, 2), (3, 3)))
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/build.sbt b/data-tc-scala/build.sbt
index 16f946e..8ecb913 100644
--- a/data-tc-scala/build.sbt
+++ b/data-tc-scala/build.sbt
@@ -2,19 +2,17 @@ name := "data-tc-scala"
import SharedBuild._
-com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
-ScalariformKeys.preferences := sharedCodeFmt
-
addCompilerPlugin(scalaMacros)
-libraryDependencies ++=
+libraryDependencies ++=
scalaTcDeps ++
- testDeps
+ testDeps
//
// test, runtime settings
//
-fork in run := true
-fork in Test := true
+fork in run := true
+fork in Test := true
parallelExecution in Test := true
+pomExtra := pomExtraInfo
diff --git a/data-tc-scala/src/main/scala/fif/ArrayData.scala b/data-tc-scala/src/main/scala/fif/ArrayData.scala
index 5aa18c5..ecf24d4 100644
--- a/data-tc-scala/src/main/scala/fif/ArrayData.scala
+++ b/data-tc-scala/src/main/scala/fif/ArrayData.scala
@@ -1,6 +1,6 @@
package fif
-import scala.language.{ higherKinds, implicitConversions }
+import scala.language.{higherKinds, implicitConversions}
import scala.reflect.ClassTag
object ArrayData extends Data[Array] {
@@ -9,7 +9,8 @@ object ArrayData extends Data[Array] {
override def map[A, B: ClassTag](data: Array[A])(f: (A) => B): Array[B] =
data.map(f)
- override def mapParition[A, B: ClassTag](d: Array[A])(f: Iterable[A] => Iterable[B]): Array[B] =
+ override def mapParition[A, B: ClassTag](d: Array[A])(
+ f: Iterable[A] => Iterable[B]): Array[B] =
f(d.toIterable).toArray
/** Apply a side-effecting function to each element. */
@@ -23,11 +24,13 @@ object ArrayData extends Data[Array] {
override def filter[A](d: Array[A])(f: A => Boolean): Array[A] =
d.filter(f)
- override def aggregate[A, B: ClassTag](d: Array[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
+ override def aggregate[A, B: ClassTag](d: Array[A])(
+ zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
d.aggregate(zero)(seqOp, combOp)
/** Sort the dataset using a function f that evaluates each element to an orderable type */
- override def sortBy[A, B: ClassTag](d: Array[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Array[A] =
+ override def sortBy[A, B: ClassTag](d: Array[A])(f: (A) => B)(
+ implicit ord: math.Ordering[B]): Array[A] =
d.sortBy(f)
/** Construct a traversable for the first k elements of a dataset. Will load into main mem. */
@@ -41,13 +44,16 @@ object ArrayData extends Data[Array] {
override def toSeq[A](d: Array[A]): Seq[A] =
d.toSeq
- override def flatMap[A, B: ClassTag](d: Array[A])(f: A => TraversableOnce[B]): Array[B] =
+ override def flatMap[A, B: ClassTag](d: Array[A])(
+ f: A => TraversableOnce[B]): Array[B] =
d.flatMap(f)
- override def flatten[A, B: ClassTag](d: Array[A])(implicit asTraversable: A => TraversableOnce[B]): Array[B] =
+ override def flatten[A, B: ClassTag](d: Array[A])(
+ implicit asTraversable: A => TraversableOnce[B]): Array[B] =
d.flatMap(asTraversable)
- override def groupBy[A, B: ClassTag](d: Array[A])(f: A => B): Array[(B, Iterable[A])] =
+ override def groupBy[A, B: ClassTag](d: Array[A])(
+ f: A => B): Array[(B, Iterable[A])] =
d.groupBy(f).map { case (a, b) => (a, b.toIterable) }.toArray
/** This has type A as opposed to B >: A due to the RDD limitations */
@@ -60,10 +66,11 @@ object ArrayData extends Data[Array] {
override def isEmpty[A](d: Array[A]): Boolean =
d.isEmpty
- override def zip[A, B: ClassTag](d: Array[A])(that: Array[B]): Array[(A, B)] =
+ override def zip[A, B: ClassTag](d: Array[A])(
+ that: Array[B]): Array[(A, B)] =
d.zip(that)
override def zipWithIndex[A](d: Array[A]): Array[(A, Long)] =
d.zipWithIndex.map { case (a, i) => (a, i.toLong) }
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/src/main/scala/fif/Data.scala b/data-tc-scala/src/main/scala/fif/Data.scala
index 03a3e35..d88e4bd 100644
--- a/data-tc-scala/src/main/scala/fif/Data.scala
+++ b/data-tc-scala/src/main/scala/fif/Data.scala
@@ -6,11 +6,12 @@ import scala.reflect.ClassTag
import simulacrum._
/**
- * Trait that abstractly represents operations that can be performed on a dataset.
- * The implementation of Data is suitable for both large-scale, distributed data
- * or in-memory structures.
- */
-@typeclass trait Data[D[_]] extends Serializable {
+ * Trait that abstractly represents operations that can be performed on a dataset.
+ * The implementation of Data is suitable for both large-scale, distributed data
+ * or in-memory structures.
+ */
+@typeclass
+trait Data[D[_]] extends Serializable {
/** Transform a dataset by applying f to each element. */
def map[A, B: ClassTag](d: D[A])(f: A => B): D[B]
@@ -25,13 +26,15 @@ import simulacrum._
def filter[A](d: D[A])(f: A => Boolean): D[A]
/**
- * Starting from a defined zero value, perform an operation seqOp on each element
- * of a dataset. Combine results of seqOp using combOp for a final value.
- */
- def aggregate[A, B: ClassTag](d: D[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B
+ * Starting from a defined zero value, perform an operation seqOp on each element
+ * of a dataset. Combine results of seqOp using combOp for a final value.
+ */
+ def aggregate[A, B: ClassTag](d: D[A])(zero: B)(seqOp: (B, A) => B,
+ combOp: (B, B) => B): B
/** Sort the dataset using a function f that evaluates each element to an orderable type */
- def sortBy[A, B: ClassTag](d: D[A])(f: (A) => B)(implicit ord: math.Ordering[B]): D[A]
+ def sortBy[A, B: ClassTag](d: D[A])(f: (A) => B)(
+ implicit ord: math.Ordering[B]): D[A]
/** Construct a traversable for the first k elements of a dataset. Will load into main mem. */
def take[A](d: D[A])(k: Int): Traversable[A]
@@ -43,7 +46,8 @@ import simulacrum._
def flatMap[A, B: ClassTag](d: D[A])(f: A => TraversableOnce[B]): D[B]
- def flatten[A, B: ClassTag](d: D[A])(implicit asTraversable: A => TraversableOnce[B]): D[B]
+ def flatten[A, B: ClassTag](d: D[A])(
+ implicit asTraversable: A => TraversableOnce[B]): D[B]
def groupBy[A, B: ClassTag](d: D[A])(f: A => B): D[(B, Iterable[A])]
@@ -58,4 +62,3 @@ import simulacrum._
def zipWithIndex[A](d: D[A]): D[(A, Long)]
}
-
diff --git a/data-tc-scala/src/main/scala/fif/SeqData.scala b/data-tc-scala/src/main/scala/fif/SeqData.scala
index 9618b24..508cd94 100644
--- a/data-tc-scala/src/main/scala/fif/SeqData.scala
+++ b/data-tc-scala/src/main/scala/fif/SeqData.scala
@@ -1,6 +1,6 @@
package fif
-import scala.language.{ higherKinds, implicitConversions }
+import scala.language.{higherKinds, implicitConversions}
import scala.reflect.ClassTag
object SeqData extends Data[Seq] {
@@ -9,7 +9,8 @@ object SeqData extends Data[Seq] {
override def map[A, B: ClassTag](data: Seq[A])(f: (A) => B): Seq[B] =
data.map(f)
- override def mapParition[A, B: ClassTag](d: Seq[A])(f: Iterable[A] => Iterable[B]): Seq[B] =
+ override def mapParition[A, B: ClassTag](d: Seq[A])(
+ f: Iterable[A] => Iterable[B]): Seq[B] =
f(d.toIterable).toSeq
/** Apply a side-effecting function to each element. */
@@ -23,11 +24,13 @@ object SeqData extends Data[Seq] {
override def filter[A](d: Seq[A])(f: A => Boolean): Seq[A] =
d.filter(f)
- override def aggregate[A, B: ClassTag](d: Seq[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
+ override def aggregate[A, B: ClassTag](d: Seq[A])(
+ zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
d.aggregate(zero)(seqOp, combOp)
/** Sort the dataset using a function f that evaluates each element to an orderable type */
- override def sortBy[A, B: ClassTag](d: Seq[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Seq[A] =
+ override def sortBy[A, B: ClassTag](d: Seq[A])(f: (A) => B)(
+ implicit ord: math.Ordering[B]): Seq[A] =
d.toSeq.sortBy(f)
/** Construct a traversable for the first k elements of a dataset. Will load into main mem. */
@@ -41,13 +44,16 @@ object SeqData extends Data[Seq] {
override def toSeq[A](d: Seq[A]): Seq[A] =
d.toSeq
- override def flatMap[A, B: ClassTag](d: Seq[A])(f: A => TraversableOnce[B]): Seq[B] =
+ override def flatMap[A, B: ClassTag](d: Seq[A])(
+ f: A => TraversableOnce[B]): Seq[B] =
d.flatMap(f)
- override def flatten[A, B: ClassTag](d: Seq[A])(implicit asTraversable: A => TraversableOnce[B]): Seq[B] =
+ override def flatten[A, B: ClassTag](d: Seq[A])(
+ implicit asTraversable: A => TraversableOnce[B]): Seq[B] =
d.flatten
- override def groupBy[A, B: ClassTag](d: Seq[A])(f: A => B): Seq[(B, Iterable[A])] =
+ override def groupBy[A, B: ClassTag](d: Seq[A])(
+ f: A => B): Seq[(B, Iterable[A])] =
d.groupBy(f).toSeq.map { case (a, b) => (a, b.toIterable) }
/** This has type A as opposed to B >: A due to the RDD limitations */
diff --git a/data-tc-scala/src/main/scala/fif/TravData.scala b/data-tc-scala/src/main/scala/fif/TravData.scala
index 4e50cf3..a073513 100644
--- a/data-tc-scala/src/main/scala/fif/TravData.scala
+++ b/data-tc-scala/src/main/scala/fif/TravData.scala
@@ -1,33 +1,38 @@
package fif
-import scala.language.{ implicitConversions, higherKinds }
+import scala.language.{implicitConversions, higherKinds}
import scala.reflect.ClassTag
object TravData extends Data[Traversable] {
/** Transform a dataset by applying f to each element. */
- override def map[A, B: ClassTag](data: Traversable[A])(f: (A) => B): Traversable[B] =
+ override def map[A, B: ClassTag](data: Traversable[A])(
+ f: (A) => B): Traversable[B] =
data.map(f)
- override def mapParition[A, B: ClassTag](d: Traversable[A])(f: Iterable[A] => Iterable[B]): Traversable[B] =
+ override def mapParition[A, B: ClassTag](d: Traversable[A])(
+ f: Iterable[A] => Iterable[B]): Traversable[B] =
f(d.toIterable).toTraversable
/** Apply a side-effecting function to each element. */
override def foreach[A](d: Traversable[A])(f: A => Any): Unit =
d.foreach(f)
- override def foreachPartition[A](d: Traversable[A])(f: Iterable[A] => Any): Unit = {
+ override def foreachPartition[A](d: Traversable[A])(
+ f: Iterable[A] => Any): Unit = {
val _ = f(d.toIterable)
}
override def filter[A](d: Traversable[A])(f: A => Boolean): Traversable[A] =
d.filter(f)
- override def aggregate[A, B: ClassTag](d: Traversable[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
+ override def aggregate[A, B: ClassTag](d: Traversable[A])(
+ zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
d.aggregate(zero)(seqOp, combOp)
/** Sort the dataset using a function f that evaluates each element to an orderable type */
- override def sortBy[A, B: ClassTag](d: Traversable[A])(f: (A) => B)(implicit ord: math.Ordering[B]): Traversable[A] =
+ override def sortBy[A, B: ClassTag](d: Traversable[A])(f: (A) => B)(
+ implicit ord: math.Ordering[B]): Traversable[A] =
d.toSeq.sortBy(f)
/** Construct a traversable for the first k elements of a dataset. Will load into main mem. */
@@ -41,13 +46,16 @@ object TravData extends Data[Traversable] {
override def toSeq[A](d: Traversable[A]): Seq[A] =
d.toSeq
- override def flatMap[A, B: ClassTag](d: Traversable[A])(f: A => TraversableOnce[B]): Traversable[B] =
+ override def flatMap[A, B: ClassTag](d: Traversable[A])(
+ f: A => TraversableOnce[B]): Traversable[B] =
d.flatMap(f)
- override def flatten[A, B: ClassTag](d: Traversable[A])(implicit asTraversable: A => TraversableOnce[B]): Traversable[B] =
+ override def flatten[A, B: ClassTag](d: Traversable[A])(
+ implicit asTraversable: A => TraversableOnce[B]): Traversable[B] =
d.flatten
- override def groupBy[A, B: ClassTag](d: Traversable[A])(f: A => B): Traversable[(B, Iterable[A])] =
+ override def groupBy[A, B: ClassTag](d: Traversable[A])(
+ f: A => B): Traversable[(B, Iterable[A])] =
d.groupBy(f).toTraversable.map { case (a, b) => (a, b.toIterable) }
/** This has type A as opposed to B >: A due to the RDD limitations */
@@ -60,7 +68,8 @@ object TravData extends Data[Traversable] {
override def isEmpty[A](d: Traversable[A]): Boolean =
d.isEmpty
- override def zip[A, B: ClassTag](d: Traversable[A])(that: Traversable[B]): Traversable[(A, B)] =
+ override def zip[A, B: ClassTag](d: Traversable[A])(
+ that: Traversable[B]): Traversable[(A, B)] =
d.toSeq.zip(that.toSeq)
override def zipWithIndex[A](d: Traversable[A]): Traversable[(A, Long)] =
diff --git a/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala b/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala
index e65a914..a723b66 100644
--- a/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala
+++ b/data-tc-scala/src/test/scala/fif/ArrayDataTest.scala
@@ -11,4 +11,4 @@ class ArrayDataTest extends CollectionDataTest[Array] {
override val empty = Array.empty[Int]
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala b/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala
index 4b27529..a6a5ccd 100644
--- a/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala
+++ b/data-tc-scala/src/test/scala/fif/CollectionDataTest.scala
@@ -27,7 +27,9 @@ protected trait CollectionDataTest[D[_]] extends FunSuite {
test("mapPartition") {
def mapParition10(data: D[Int]): D[Int] =
- data.mapParition { elements => elements.map(_ + 10) }
+ data.mapParition { elements =>
+ elements.map(_ + 10)
+ }
val changed = mapParition10(data)
assert(changed.toSeq !== data)
@@ -69,7 +71,10 @@ protected trait CollectionDataTest[D[_]] extends FunSuite {
test("take") {
def testTake(data: D[Int]): Boolean =
- data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3)
+ data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(
+ 1,
+ 2,
+ 3)
assert(testTake(data))
assert(data.take(0).toSeq === Seq.empty[Int])
@@ -176,4 +181,4 @@ protected trait CollectionDataTest[D[_]] extends FunSuite {
assert(foo(data).toSeq === Seq((1, 1), (2, 2), (3, 3)))
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala b/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala
index 4f52876..f41df15 100644
--- a/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala
+++ b/data-tc-scala/src/test/scala/fif/ImplicitCollectionsDataTest.scala
@@ -24,4 +24,4 @@ class ImplicitCollectionsDataTest extends FunSuite {
def simpleTest[D[_]: Data](data: D[Int]): Unit =
data.foreach(x => assert(x > 0 && x < 10))
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/src/test/scala/fif/SeqDataTest.scala b/data-tc-scala/src/test/scala/fif/SeqDataTest.scala
index 3ffeb19..7a2d528 100644
--- a/data-tc-scala/src/test/scala/fif/SeqDataTest.scala
+++ b/data-tc-scala/src/test/scala/fif/SeqDataTest.scala
@@ -11,4 +11,4 @@ class SeqDataTest extends CollectionDataTest[Seq] {
override val empty = Seq.empty[Int]
-}
\ No newline at end of file
+}
diff --git a/data-tc-scala/src/test/scala/fif/TravDataTest.scala b/data-tc-scala/src/test/scala/fif/TravDataTest.scala
index faa20cc..41a622a 100644
--- a/data-tc-scala/src/test/scala/fif/TravDataTest.scala
+++ b/data-tc-scala/src/test/scala/fif/TravDataTest.scala
@@ -11,4 +11,4 @@ class TravDataTest extends CollectionDataTest[Traversable] {
override val empty = Traversable.empty[Int]
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/build.sbt b/data-tc-spark/build.sbt
index 260bcfb..aff17f5 100644
--- a/data-tc-spark/build.sbt
+++ b/data-tc-spark/build.sbt
@@ -2,23 +2,24 @@ name := "data-tc-spark"
import SharedBuild._
-com.typesafe.sbt.SbtScalariform.defaultScalariformSettings
-ScalariformKeys.preferences := sharedCodeFmt
-
// >>=
scalacOptions := {
val badOptionsWhenUsingSpark151 = Set("-Yopt:_")
- scalacOptions.value.filter { opt => !badOptionsWhenUsingSpark151.contains(opt) }
+ scalacOptions.value.filter { opt =>
+ !badOptionsWhenUsingSpark151.contains(opt)
+ }
}
addCompilerPlugin(scalaMacros)
-libraryDependencies ++=
+libraryDependencies ++=
sparkTcDeps ++
- testDeps
+ testDeps
// test & misc. configuration
//
-fork in Test := false
+fork in Test := false
parallelExecution in Test := false
-fork in run := false
+fork in run := false
+
+pomExtra := pomExtraInfo
diff --git a/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala b/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala
index ae35cbf..b923c63 100644
--- a/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala
+++ b/data-tc-spark/src/main/scala/fif/ImplicitRddData.scala
@@ -6,4 +6,4 @@ object ImplicitRddData extends Serializable {
implicit val rddIsData: Data[RDD] = RddData
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/main/scala/fif/RddData.scala b/data-tc-spark/src/main/scala/fif/RddData.scala
index 0d8cc1a..4d7ee5f 100644
--- a/data-tc-spark/src/main/scala/fif/RddData.scala
+++ b/data-tc-spark/src/main/scala/fif/RddData.scala
@@ -2,7 +2,7 @@ package fif
import org.apache.spark.rdd.RDD
-import scala.language.{ higherKinds, implicitConversions }
+import scala.language.{higherKinds, implicitConversions}
import scala.reflect.ClassTag
import scala.util.Try
@@ -12,7 +12,8 @@ object RddData extends Data[RDD] with Serializable {
override def map[A, B: ClassTag](data: RDD[A])(f: (A) => B): RDD[B] =
data.map(f)
- override def mapParition[A, B: ClassTag](d: RDD[A])(f: Iterable[A] => Iterable[B]): RDD[B] =
+ override def mapParition[A, B: ClassTag](d: RDD[A])(
+ f: Iterable[A] => Iterable[B]): RDD[B] =
d.mapPartitions { partition =>
f(partition.toIterable).toIterator
}
@@ -33,11 +34,13 @@ object RddData extends Data[RDD] with Serializable {
override def filter[A](d: RDD[A])(f: A => Boolean): RDD[A] =
d.filter(f)
- override def aggregate[A, B: ClassTag](d: RDD[A])(zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
+ override def aggregate[A, B: ClassTag](d: RDD[A])(
+ zero: B)(seqOp: (B, A) => B, combOp: (B, B) => B): B =
d.aggregate(zero)(seqOp, combOp)
/** Sort the dataset using a function f that evaluates each element to an orderable type */
- override def sortBy[A, B: ClassTag](d: RDD[A])(f: (A) ⇒ B)(implicit ord: math.Ordering[B]): RDD[A] =
+ override def sortBy[A, B: ClassTag](d: RDD[A])(f: (A) ⇒ B)(
+ implicit ord: math.Ordering[B]): RDD[A] =
d.sortBy(f)
/** Construct a traversable for the first k elements of a dataset. Will load into main mem. */
@@ -51,13 +54,16 @@ object RddData extends Data[RDD] with Serializable {
override def toSeq[A](d: RDD[A]): Seq[A] =
d.collect().toSeq
- override def flatMap[A, B: ClassTag](d: RDD[A])(f: A => TraversableOnce[B]): RDD[B] =
+ override def flatMap[A, B: ClassTag](d: RDD[A])(
+ f: A => TraversableOnce[B]): RDD[B] =
d.flatMap(f)
- override def flatten[A, B: ClassTag](d: RDD[A])(implicit asRDD: A => TraversableOnce[B]): RDD[B] =
+ override def flatten[A, B: ClassTag](d: RDD[A])(
+ implicit asRDD: A => TraversableOnce[B]): RDD[B] =
d.flatMap(asRDD)
- override def groupBy[A, B: ClassTag](d: RDD[A])(f: A => B): RDD[(B, Iterable[A])] =
+ override def groupBy[A, B: ClassTag](d: RDD[A])(
+ f: A => B): RDD[(B, Iterable[A])] =
d.groupBy(f).map { case (a, b) => (a, b) }
/** This has type A as opposed to B >: A due to the RDD limitations */
@@ -76,4 +82,4 @@ object RddData extends Data[RDD] with Serializable {
override def zipWithIndex[A](d: RDD[A]): RDD[(A, Long)] =
d.zipWithIndex()
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala b/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala
index 02f7ef4..f78d4c8 100644
--- a/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala
+++ b/data-tc-spark/src/main/scala/fif/spark/KryoSerializationWrapper.scala
@@ -3,18 +3,18 @@ package fif.spark
import scala.reflect.ClassTag
/**
- * Wraps a value of an unserialized type T in a KryoSerializationWrapper[T],
- * which gives one a way to serialize T.
- *
- *
- * NOTE:
- * The vast majority of this code is copied / based off of the classes with the same
- * name in the Apache Shark project.
- *
- * Original file is here (accessed on April 20, 2015):
- * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
- *
- */
+ * Wraps a value of an unserialized type T in a KryoSerializationWrapper[T],
+ * which gives one a way to serialize T.
+ *
+ *
+ * NOTE:
+ * The vast majority of this code is copied / based off of the classes with the same
+ * name in the Apache Shark project.
+ *
+ * Original file is here (accessed on April 20, 2015):
+ * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
+ *
+ */
object KryoSerializationWrapper extends Serializable {
def apply[T: ClassTag](value: T): KryoSerializationWrapper[T] =
@@ -22,19 +22,20 @@ object KryoSerializationWrapper extends Serializable {
}
/**
- * A wrapper around some unserializable objects that make them both Java
- * serializable. Internally, Kryo is used for serialization.
- *
- * Use KryoSerializationWrapper(value) to create a wrapper.
- *
- * Note that the value contained in the wrapper is mutable. It must be
- * initialized using Java Serialization (which calls a private readObject
- * method that does the byte-by-byte deserialization).
- *
- * Also note that this class is both abstract and sealed. The only valid place
- * to create such a wrapper is the companion object's apply method.
- */
-sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable {
+ * A wrapper around some unserializable objects that make them both Java
+ * serializable. Internally, Kryo is used for serialization.
+ *
+ * Use KryoSerializationWrapper(value) to create a wrapper.
+ *
+ * Note that the value contained in the wrapper is mutable. It must be
+ * initialized using Java Serialization (which calls a private readObject
+ * method that does the byte-by-byte deserialization).
+ *
+ * Also note that this class is both abstract and sealed. The only valid place
+ * to create such a wrapper is the companion object's apply method.
+ */
+sealed abstract class KryoSerializationWrapper[T: ClassTag]
+ extends Serializable {
// the wrapped value
// MUST BE TRANSIENT SO THAT IT IS NOT SERIALIZED
@@ -45,8 +46,8 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable
private var valueSerialized: Array[Byte] = _
/**
- * The only valid constructor. For safety, do not use the no-arg constructor.
- */
+ * The only valid constructor. For safety, do not use the no-arg constructor.
+ */
def this(initialValue: T) = {
this()
this.value = initialValue
@@ -57,11 +58,11 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable
value
/**
- * Gets the currently serialized value as a Sequence of bytes.
- *
- * If the sequence is empty, then it means that one has not called doSerializeValue().
- * Or the internal value may be null.
- */
+ * Gets the currently serialized value as a Sequence of bytes.
+ *
+ * If the sequence is empty, then it means that one has not called doSerializeValue().
+ * Or the internal value may be null.
+ */
def getValueSerialized: Seq[Byte] =
valueSerialized.toSeq
@@ -74,4 +75,4 @@ sealed abstract class KryoSerializationWrapper[T: ClassTag] extends Serializable
in.defaultReadObject()
this.value = KryoSerializer.deserialize[T](this.valueSerialized)
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala b/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala
index 723c769..9128772 100644
--- a/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala
+++ b/data-tc-spark/src/main/scala/fif/spark/KryoSerializer.scala
@@ -2,31 +2,29 @@ package fif.spark
import java.nio.ByteBuffer
-import org.apache.spark.serializer.{ KryoSerializer => SparkKryoSerializer }
-import org.apache.spark.{ SparkConf, SparkEnv }
+import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer}
+import org.apache.spark.{SparkConf, SparkEnv}
import scala.reflect.ClassTag
/**
- * JVM object serialization using Kryo. This is much more efficient, but Kryo
- * sometimes is buggy to use. We use this mainly to serialize the object
- * inspectors.
- *
- *
- * NOTE:
- * The vast majority of this code is copied / based off of the classes with the same
- * name in the Apache Shark project.
- *
- * Original file is here:
- * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
- */
+ * JVM object serialization using Kryo. This is much more efficient, but Kryo
+ * sometimes is buggy to use. We use this mainly to serialize the object
+ * inspectors.
+ *
+ *
+ * NOTE:
+ * The vast majority of this code is copied / based off of the classes with the same
+ * name in the Apache Shark project.
+ *
+ * Original file is here:
+ * https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
+ */
object KryoSerializer extends Serializable {
@transient private[this] lazy val s =
new SparkKryoSerializer(
- Option(SparkEnv.get)
- .map(_.conf)
- .getOrElse(new SparkConf())
+ Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
)
def serialize[T: ClassTag](o: T): Array[Byte] =
@@ -35,4 +33,4 @@ object KryoSerializer extends Serializable {
def deserialize[T: ClassTag](bytes: Array[Byte]): T =
s.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala b/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala
index 4fd5e7f..112e428 100644
--- a/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala
+++ b/data-tc-spark/src/main/scala/fif/spark/RddSerializedOps.scala
@@ -5,29 +5,29 @@ import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
/**
- * Methods that either wrap, or operate on wrapped, values so that
- * common RDD operations are available with a natural, functional syntax.
- *
- * Let's look at Map as an example:
- *
- * {{{
- * // implemented using librray that is not extendable and doesn't implement Serialzable
- * val f: A => B = ...
- *
- * // can be anywhere, error will occur even if in local mode
- * val data: RDD[A] = ...
- *
- * // cannot do
- * data.map(f)
- * // runtime exception :(
- * // as f does not implement Serializable
- *
- * // instead do
- * Map(f)(data)
- * // will serialize it using Kryo and safely
- * // deserialize to perform map on the data RDD
- * }}}
- */
+ * Methods that either wrap, or operate on wrapped, values so that
+ * common RDD operations are available with a natural, functional syntax.
+ *
+ * Let's look at Map as an example:
+ *
+ * {{{
+ * // implemented using librray that is not extendable and doesn't implement Serialzable
+ * val f: A => B = ...
+ *
+ * // can be anywhere, error will occur even if in local mode
+ * val data: RDD[A] = ...
+ *
+ * // cannot do
+ * data.map(f)
+ * // runtime exception :(
+ * // as f does not implement Serializable
+ *
+ * // instead do
+ * Map(f)(data)
+ * // will serialize it using Kryo and safely
+ * // deserialize to perform map on the data RDD
+ * }}}
+ */
object RddSerializedOps extends Serializable {
object Map extends Serializable {
@@ -35,7 +35,8 @@ object RddSerializedOps extends Serializable {
def apply[A, B: ClassTag](f: A => B): (RDD[A] => RDD[B]) =
apply(KryoSerializationWrapper(f))
- def apply[A, B: ClassTag](fnSerialized: KryoSerializationWrapper[A => B]): (RDD[A] => RDD[B]) =
+ def apply[A, B: ClassTag](
+ fnSerialized: KryoSerializationWrapper[A => B]): (RDD[A] => RDD[B]) =
(data: RDD[A]) =>
data.mapPartitions(partition => {
val f = fnSerialized.getValue
@@ -48,7 +49,9 @@ object RddSerializedOps extends Serializable {
def apply[A, B: ClassTag](f: A => TraversableOnce[B]): (RDD[A] => RDD[B]) =
apply(KryoSerializationWrapper(f))
- def apply[A, B: ClassTag](fnSerialized: KryoSerializationWrapper[A => TraversableOnce[B]]): (RDD[A] => RDD[B]) =
+ def apply[A, B: ClassTag](
+ fnSerialized: KryoSerializationWrapper[A => TraversableOnce[B]])
+ : (RDD[A] => RDD[B]) =
(data: RDD[A]) =>
data.mapPartitions(partition => {
val f = fnSerialized.getValue
@@ -61,7 +64,8 @@ object RddSerializedOps extends Serializable {
def apply[A](f: A => Any): (RDD[A] => Unit) =
apply(KryoSerializationWrapper(f))
- def apply[A](fnSerialized: KryoSerializationWrapper[A => Any]): (RDD[A] => Unit) =
+ def apply[A](
+ fnSerialized: KryoSerializationWrapper[A => Any]): (RDD[A] => Unit) =
(data: RDD[A]) =>
data.foreachPartition(partition => {
val f = fnSerialized.getValue
@@ -71,27 +75,29 @@ object RddSerializedOps extends Serializable {
object Aggregate extends Serializable {
- def apply[A, B: ClassTag](zero: B, seqOp: (B, A) => B, combOp: (B, B) => B): (RDD[A] => B) =
- apply(zero, KryoSerializationWrapper(seqOp), KryoSerializationWrapper(combOp))
+ def apply[A, B: ClassTag](zero: B,
+ seqOp: (B, A) => B,
+ combOp: (B, B) => B): (RDD[A] => B) =
+ apply(zero,
+ KryoSerializationWrapper(seqOp),
+ KryoSerializationWrapper(combOp))
def apply[A, B: ClassTag](
- zero: B,
- serSeqOp: KryoSerializationWrapper[(B, A) => B],
- serCombOp: KryoSerializationWrapper[(B, B) => B]): (RDD[A] => B) =
-
+ zero: B,
+ serSeqOp: KryoSerializationWrapper[(B, A) => B],
+ serCombOp: KryoSerializationWrapper[(B, B) => B]): (RDD[A] => B) =
(data: RDD[A]) =>
data.aggregate(zero)(
{
case (b, a) =>
val f = serSeqOp.getValue
f(b, a)
- },
- {
+ }, {
case (b1, b2) =>
val f = serCombOp.getValue
f(b1, b2)
}
- )
+ )
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala b/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala
index 06a00b0..de08004 100644
--- a/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala
+++ b/data-tc-spark/src/main/scala/fif/spark/avroparquet/RddHelpers.scala
@@ -1,6 +1,10 @@
package fif.spark.avroparquet
-import com.nitro.scalaAvro.runtime.{ GeneratedMessage, Message, GeneratedMessageCompanion }
+import com.nitro.scalaAvro.runtime.{
+ GeneratedMessage,
+ Message,
+ GeneratedMessageCompanion
+}
import fif.spark.RddSerializedOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
@@ -8,7 +12,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import parquet.avro._
-import parquet.hadoop.{ ParquetOutputFormat, ParquetInputFormat }
+import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
import scala.language.postfixOps
import scala.reflect.ClassTag
@@ -29,30 +33,32 @@ object RddHelpers extends Serializable {
(null, x)
/**
- * A unique path (either local or network) to a file or resource.
- */
+ * A unique path (either local or network) to a file or resource.
+ */
type Path = String
def getRdd[K: ClassTag, W: ClassTag, F <: FileInputFormat[K, W]: ClassTag](
- sc: SparkContext
+ sc: SparkContext
)(
- p: Path
+ p: Path
): RDD[(K, W)] =
sc.newAPIHadoopFile[K, W, F](p)
/**
- * Sets ParquetInputFormat's read support for a type of AvroReadSupport[V].
- * Evaluates to an RDD containg Vs, using Parquet + Avro for reading.
- */
- def rddFromParquet[V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion](
- sc: SparkContext
+ * Sets ParquetInputFormat's read support for a type of AvroReadSupport[V].
+ * Evaluates to an RDD containg Vs, using Parquet + Avro for reading.
+ */
+ def rddFromParquet[
+ V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion](
+ sc: SparkContext
)(
- p: Path
+ p: Path
): RDD[V] = {
// protect against deprecated error...can't get around not using Job
val job = Job.getInstance(new Configuration())
- ParquetInputFormat.setReadSupportClass(job, classOf[GenericAvroReadSupport[V]])
+ ParquetInputFormat
+ .setReadSupportClass(job, classOf[GenericAvroReadSupport[V]])
job.getConfiguration.set(
GenericAvroReadSupport.HAS_GENERIC_RECORD_KEY,
@@ -60,8 +66,7 @@ object RddHelpers extends Serializable {
)
// load up that RDD!
- sc
- .newAPIHadoopFile(
+ sc.newAPIHadoopFile(
p,
classOf[ParquetInputFormat[V]],
classOf[Void],
@@ -75,7 +80,9 @@ object RddHelpers extends Serializable {
.map { case (_, value) => value }
}
- def saveRddAsParquet[V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion](sc: SparkContext)(p: Path)(data: RDD[V]): Unit = {
+ def saveRddAsParquet[
+ V <: GeneratedMessage with Message[V]: ClassTag: GeneratedMessageCompanion](
+ sc: SparkContext)(p: Path)(data: RDD[V]): Unit = {
// protect against deprecated error...can't get around not using Job
val job = Job.getInstance(new Configuration())
@@ -85,7 +92,8 @@ object RddHelpers extends Serializable {
implicitly[GeneratedMessageCompanion[V]].schema
)
- val mapF = RddSerializedOps.Map(implicitly[GeneratedMessageCompanion[V]].toMutable _)
+ val mapF = RddSerializedOps.Map(
+ implicitly[GeneratedMessageCompanion[V]].toMutable _)
mapF(data)
.map(asVoidTuple)
@@ -98,4 +106,4 @@ object RddHelpers extends Serializable {
)
}
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/test/scala/fif/RddDataTest.scala b/data-tc-spark/src/test/scala/fif/RddDataTest.scala
index 7d87132..eb7ae58 100644
--- a/data-tc-spark/src/test/scala/fif/RddDataTest.scala
+++ b/data-tc-spark/src/test/scala/fif/RddDataTest.scala
@@ -1,9 +1,9 @@
package fif
import com.holdenkarau.spark.testing.SharedSparkContext
-import org.apache.spark.{ SparkConf, SparkContext }
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
-import org.scalatest.{ BeforeAndAfterAll, FunSuite }
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
import scala.language.higherKinds
import scala.reflect.ClassTag
@@ -47,7 +47,9 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable {
test("mapPartition") {
def mapParition10[D[_]: Data](data: D[Int]): D[Int] =
- data.mapParition { elements => elements.map(_ + 10) }
+ data.mapParition { elements =>
+ elements.map(_ + 10)
+ }
val changed = mapParition10(data)
assert(changed !== data)
@@ -95,7 +97,10 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable {
test("take") {
def testTake[D[_]: Data](data: D[Int]): Boolean =
- data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(1, 2, 3)
+ data.take(1) == Seq(1) && data.take(2) == Seq(1, 2) && data.take(3) == Seq(
+ 1,
+ 2,
+ 3)
assert(testTake(data))
}
@@ -132,7 +137,9 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable {
test("groupBy") {
def groupIt[D[_]: Data](data: D[Int]): D[(Boolean, Iterable[Int])] =
- data.groupBy { n => n % 2 == 0 }
+ data.groupBy { n =>
+ n % 2 == 0
+ }
val evenGroup = groupIt(data).toSeq.toMap
@@ -209,4 +216,4 @@ class RddDataTest extends FunSuite with SharedSparkContext with Serializable {
// assert(ToMap(data.map(x => (x, x))) === Map(1 -> 1, 2 -> 2, 3 -> 3))
// }
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala b/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala
index 4ee70ac..c31face 100644
--- a/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala
+++ b/data-tc-spark/src/test/scala/fif/spark/KryoSerializationTest.scala
@@ -3,21 +3,20 @@ package fif.spark
import org.scalatest.FunSuite
/**
- * Tests KryoSerializationWrapper's correctness and attempts
- * to test for presence of race conditions.
- */
+ * Tests KryoSerializationWrapper's correctness and attempts
+ * to test for presence of race conditions.
+ */
class KryoSerializationTest extends FunSuite {
test("test simple Kryo serialization with wrapper class") {
val serialized = KryoSerializationWrapper(KryoSerializationTest)
- serialized.getValue.dumbData
- .foreach(x => {
- val ba = serialized.getValue.foo(x)
- val serBa = KryoSerializationWrapper(ba)
- assert(ba == serBa.getValue)
- })
+ serialized.getValue.dumbData.foreach(x => {
+ val ba = serialized.getValue.foo(x)
+ val serBa = KryoSerializationWrapper(ba)
+ assert(ba == serBa.getValue)
+ })
}
}
@@ -30,4 +29,4 @@ object KryoSerializationTest {
def foo(x: String) =
new BadApple("foo foo!")
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala b/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala
index ba02af9..2eabef2 100644
--- a/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala
+++ b/data-tc-spark/src/test/scala/fif/spark/RddSerializedOpsTest.scala
@@ -4,21 +4,20 @@ import com.holdenkarau.spark.testing.SharedSparkContext
import org.scalatest.FunSuite
/**
- * Tests SparkModule higher-order-functions.
- */
+ * Tests SparkModule higher-order-functions.
+ */
class RddSerializedOpsTest extends FunSuite with SharedSparkContext {
import RddSerializedOpsTest._
/**
- * In class (vs. companion object) so that we have access to `assert` from `FunSuite`.
- */
+ * In class (vs. companion object) so that we have access to `assert` from `FunSuite`.
+ */
private def checkResults[A](correct: Seq[A], results: Seq[A]): Unit =
- correct.zip(results)
- .foreach {
- case (c, r) =>
- assert(c === r)
- }
+ correct.zip(results).foreach {
+ case (c, r) =>
+ assert(c === r)
+ }
test("Map") {
val f = (s: String) => s"${s}_$s"
@@ -43,7 +42,7 @@ class RddSerializedOpsTest extends FunSuite with SharedSparkContext {
if (!(dumbDataSet contains s))
throw new RuntimeException(s"unexpected input: $s")
else
- Unit
+ Unit
val foreacher = RddSerializedOps.Foreach(f)
foreacher(sc.parallelize(dumbData))
@@ -69,4 +68,4 @@ object RddSerializedOpsTest {
val dumbDataSet = dumbData.toSet
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala b/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala
index b370172..283a3e9 100644
--- a/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala
+++ b/data-tc-spark/src/test/scala/fif/spark/avroparquet/RddHelpersTest.scala
@@ -23,12 +23,13 @@ class RddHelpersTest extends FunSuite with SharedSparkContext {
test("serialize and deserialize same RDD, ensure contents do not change") {
runWithTemp(deleteBefore = true) { temp =>
-
val rddEntities = sc.parallelize(entities)
- RddHelpers.saveRddAsParquet[SampleEntity](sc)(temp.getAbsolutePath)(rddEntities)
+ RddHelpers.saveRddAsParquet[SampleEntity](sc)(temp.getAbsolutePath)(
+ rddEntities)
val loadedRddEntities =
- RddHelpers.rddFromParquet[SampleEntity](sc)(temp.getAbsolutePath)
+ RddHelpers
+ .rddFromParquet[SampleEntity](sc)(temp.getAbsolutePath)
.sortBy(sortFn)
rddEntities
@@ -45,11 +46,13 @@ class RddHelpersTest extends FunSuite with SharedSparkContext {
object RddHelpersTest {
- val preSerializedRddDocumentEntitiesPath = "data-tc-spark/src/test/resources/avroparquet_sample_entities_rdd/"
+ val preSerializedRddDocumentEntitiesPath =
+ "data-tc-spark/src/test/resources/avroparquet_sample_entities_rdd/"
def runWithTemp(deleteBefore: Boolean)(f: File => Unit): Unit =
synchronized {
- val temp = File.createTempFile("sparkmod-RddHelpersTest", UUID.randomUUID().toString)
+ val temp = File.createTempFile("sparkmod-RddHelpersTest",
+ UUID.randomUUID().toString)
try {
if (deleteBefore)
temp.delete()
@@ -99,7 +102,6 @@ object RddHelpersTest {
"MONEY",
Vector(42)
)
- )
- .sortBy(sortFn)
+ ).sortBy(sortFn)
-}
\ No newline at end of file
+}
diff --git a/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala b/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala
index 1f49c21..ac33bee 100644
--- a/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala
+++ b/data-tc-spark/src/test/scala/fif/spark/avroparquet/SampleEntity.scala
@@ -1,44 +1,60 @@
package fif.spark.avroparquet
-/**
- * Code generated from avro schemas by scalaAvro. Do not modify.
- * "ALL THESE FILES ARE YOURS—EXCEPT SAMPLEENTITY.SCALA / ATTEMPT NO MODIFICATIONS THERE"
- */
-final case class SampleEntity(
- entityName: String,
- entityType: String,
- pages: Vector[Int]) extends com.nitro.scalaAvro.runtime.GeneratedMessage with com.nitro.scalaAvro.runtime.Message[SampleEntity] {
+/**
+ * Code generated from avro schemas by scalaAvro. Do not modify.
+ * "ALL THESE FILES ARE YOURS—EXCEPT SAMPLEENTITY.SCALA / ATTEMPT NO MODIFICATIONS THERE"
+ */
+final case class SampleEntity(entityName: String,
+ entityType: String,
+ pages: Vector[Int])
+ extends com.nitro.scalaAvro.runtime.GeneratedMessage
+ with com.nitro.scalaAvro.runtime.Message[SampleEntity] {
def withEntityName(__v: String): SampleEntity = copy(entityName = __v)
def withEntityType(__v: String): SampleEntity = copy(entityType = __v)
def withPages(__v: Vector[Int]): SampleEntity = copy(pages = __v)
def toMutable: org.apache.avro.generic.GenericRecord = {
- val __out__ = new org.apache.avro.generic.GenericData.Record(SampleEntity.schema)
+ val __out__ =
+ new org.apache.avro.generic.GenericData.Record(SampleEntity.schema)
__out__.put("entityName", entityName)
__out__.put("entityType", entityType)
- __out__.put("pages", scala.collection.JavaConversions.asJavaCollection(pages.map(_e => _e)))
+ __out__.put(
+ "pages",
+ scala.collection.JavaConversions.asJavaCollection(pages.map(_e => _e)))
__out__
}
def companion = SampleEntity
}
-object SampleEntity extends com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] {
- implicit def messageCompanion: com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] = this
+object SampleEntity
+ extends com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[SampleEntity] {
+ implicit def messageCompanion: com.nitro.scalaAvro.runtime.GeneratedMessageCompanion[
+ SampleEntity] = this
def schema: org.apache.avro.Schema =
- new org.apache.avro.Schema.Parser().parse("""{"type":"record","name":"SampleEntity","namespace":"fif.spark.avroparquet","fields":[{"name":"entityName","type":"string"},{"name":"entityType","type":"string"},{"name":"pages","type":{"type":"array","items":"int"}}]}""")
+ new org.apache.avro.Schema.Parser().parse(
+ """{"type":"record","name":"SampleEntity","namespace":"fif.spark.avroparquet","fields":[{"name":"entityName","type":"string"},{"name":"entityType","type":"string"},{"name":"pages","type":{"type":"array","items":"int"}}]}""")
val _arbitrary: org.scalacheck.Gen[SampleEntity] = for {
entityName <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroString
entityType <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroString
pages <- com.nitro.scalaAvro.runtime.AvroGenUtils.genAvroArray(
org.scalacheck.Arbitrary.arbInt.arbitrary
)
- } yield SampleEntity(
- entityName = entityName,
- entityType = entityType,
- pages = pages
- )
- def fromMutable(generic: org.apache.avro.generic.GenericRecord): SampleEntity =
+ } yield
+ SampleEntity(
+ entityName = entityName,
+ entityType = entityType,
+ pages = pages
+ )
+ def fromMutable(
+ generic: org.apache.avro.generic.GenericRecord): SampleEntity =
SampleEntity(
entityName = convertString(generic.get("entityName")),
entityType = convertString(generic.get("entityType")),
- pages = scala.collection.JavaConversions.asScalaIterator(generic.get("pages").asInstanceOf[org.apache.avro.generic.GenericArray[Any]].iterator()).map(_elem => _elem.asInstanceOf[Int]).toVector
+ pages = scala.collection.JavaConversions
+ .asScalaIterator(
+ generic
+ .get("pages")
+ .asInstanceOf[org.apache.avro.generic.GenericArray[Any]]
+ .iterator())
+ .map(_elem => _elem.asInstanceOf[Int])
+ .toVector
)
}
diff --git a/project/SharedBuild.scala b/project/SharedBuild.scala
index 28cf432..5a3af15 100644
--- a/project/SharedBuild.scala
+++ b/project/SharedBuild.scala
@@ -52,29 +52,28 @@ object SharedBuild {
lazy val scalaMacros =
"org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full
- //////////////////////////////////////////////////
- // Code formatting settings for scalariform //
- //////////////////////////////////////////////////
- lazy val sharedCodeFmt = {
- import scalariform.formatter.preferences._
- FormattingPreferences()
- .setPreference(AlignParameters, true )
- .setPreference(AlignSingleLineCaseStatements, true )
- .setPreference(CompactControlReadability, false )
- .setPreference(CompactStringConcatenation, true )
- .setPreference(DoubleIndentClassDeclaration, true )
- .setPreference(FormatXml, true )
- .setPreference(IndentLocalDefs, true )
- .setPreference(IndentPackageBlocks, true )
- .setPreference(IndentSpaces, 2 )
- .setPreference(MultilineScaladocCommentsStartOnFirstLine, false )
- .setPreference(PreserveDanglingCloseParenthesis, true )
- .setPreference(PreserveSpaceBeforeArguments, false )
- .setPreference(RewriteArrowSymbols, false )
- .setPreference(SpaceBeforeColon, false )
- .setPreference(SpaceInsideBrackets, false )
- .setPreference(SpacesWithinPatternBinders, true )
- }
+ lazy val pomExtraInfo = {
+ https://github.com/malcolmgreaves/data-tc
+
+
+ Apache 2.0
+ https://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+ git@github.com:malcolmgreaves/data-tc.git
+ scm:git@github.com:malcolmgreaves/data-tc.git
+
+
+
+ malcolmgreaves
+ Malcolm Greaves
+ greaves.malcolm@gmail.com
+ https://malcolmgreaves.io/
+
+
+ }
-}
+}
\ No newline at end of file
diff --git a/project/plugins.sbt b/project/plugins.sbt
index ad130bb..7f05603 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,8 +1,8 @@
logLevel := Level.Warn
-addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
-addSbtPlugin("com.gonitro" % "avro-codegen-compiler" % "0.3.4")
+addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.4.10")
+
+addSbtPlugin("com.gonitro" % "avro-codegen-compiler" % "0.3.4")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.0.0")
-