diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index f7f3221493..13a83efaee 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -807,6 +807,16 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { def min(implicit ord: Ordering[T]): SCollection[T] = this.reduce(ord.min) + + /** + * Return the latest of this SCollection according to its event time, or null if there are no elements. + * @return + * a new SCollection with the latest element + * @group transform + */ + def latest: SCollection[T] = + this.pApply(Latest.globally()) + /** * Compute the SCollection's data distribution using approximate `N`-tiles. * @return diff --git a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala index ba4ada0fb9..35e91a1335 100644 --- a/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala +++ b/scio-core/src/test/scala/com/spotify/scio/values/SCollectionTest.scala @@ -527,6 +527,14 @@ class SCollectionTest extends PipelineSpec { } } + it should "support latest" in { + runWithContext { sc => + def latest(elems: Long*): SCollection[Long] = + sc.parallelize(elems).timestampBy(Instant.ofEpochMilli).latest + latest(1L, 2L, 3L) should containInAnyOrder(Seq(3L)) + } + } + it should "support quantilesApprox()" in { runWithContext { sc => val p = sc.parallelize(0 to 100).quantilesApprox(5)