From e592000052759f5ec00e394e11371c12a7742fe7 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Thu, 12 Sep 2024 18:11:16 +0200 Subject: [PATCH] Support pubsub epoch timestamp attribute in test --- .../scala/com/spotify/scio/pubsub/PubsubIO.scala | 14 +++++++++++++- .../com/spotify/scio/pubsub/PubsubIOTest.scala | 11 ++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala index 55c5933567..d5f58271a2 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala @@ -17,6 +17,7 @@ package com.spotify.scio.pubsub +import com.google.api.client.util.DateTime import com.google.protobuf.Message import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} @@ -31,6 +32,7 @@ import org.apache.beam.sdk.io.gcp.{pubsub => beam} import org.apache.beam.sdk.util.CoderUtils import org.joda.time.Instant +import scala.util.Try import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -318,11 +320,21 @@ final private case class PubsubIOWithAttributes[T: Coder]( } } + // timestamp is either epoch or RFC 3339 + // use same implementation as beam + private def parseTimestampAttribute(ts: String): Instant = + Try(ts.toLong) + .recover { case _: IllegalArgumentException => + DateTime.parseRfc3339(ts).getValue + } + .map(Instant.ofEpochMilli) + .get + override def readTest(sc: ScioContext, params: ReadP): SCollection[WithAttributeMap] = { val read = TestDataManager.getInput(sc.testId.get)(this).toSCollection(sc) Option(timestampAttribute) - .map(att => read.timestampBy(kv => new Instant(kv._2(att)))) + .map(att => read.timestampBy(kv => parseTimestampAttribute(kv._2(att)))) .getOrElse(read) } diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/pubsub/PubsubIOTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/pubsub/PubsubIOTest.scala index f35c916418..aec76679c6 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/pubsub/PubsubIOTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/pubsub/PubsubIOTest.scala @@ -121,7 +121,7 @@ class PubsubIOTest extends PipelineSpec with ScioIOSpec { .run() } - it should "pass correct PubsubIO with attributes" in { + it should "pass correct PubsubIO with attributes and ISO timestamp" in { testPubsubWithAttributesJob( Map(PubsubWithAttributesJob.timestampAttribute -> new Instant().toString), "aX", @@ -130,6 +130,15 @@ class PubsubIOTest extends PipelineSpec with ScioIOSpec { ) } + it should "pass correct PubsubIO with attributes and epoch timestamp" in { + testPubsubWithAttributesJob( + Map(PubsubWithAttributesJob.timestampAttribute -> new Instant().getMillis.toString), + "aX", + "bX", + "cX" + ) + } + it should "fail incorrect PubsubIO with attributes" in { an[AssertionError] should be thrownBy { testPubsubWithAttributesJob(