Skip to content

Commit

Permalink
Support pubsub epoch timestamp attribute in test
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Sep 12, 2024
1 parent d485c98 commit e592000
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.spotify.scio.pubsub

import com.google.api.client.util.DateTime
import com.google.protobuf.Message
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
Expand All @@ -31,6 +32,7 @@ import org.apache.beam.sdk.io.gcp.{pubsub => beam}
import org.apache.beam.sdk.util.CoderUtils
import org.joda.time.Instant

import scala.util.Try
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -318,11 +320,21 @@ final private case class PubsubIOWithAttributes[T: Coder](
}
}

// timestamp is either epoch or RFC 3339
// use same implementation as beam
private def parseTimestampAttribute(ts: String): Instant =
Try(ts.toLong)
.recover { case _: IllegalArgumentException =>
DateTime.parseRfc3339(ts).getValue
}
.map(Instant.ofEpochMilli)
.get

override def readTest(sc: ScioContext, params: ReadP): SCollection[WithAttributeMap] = {
val read = TestDataManager.getInput(sc.testId.get)(this).toSCollection(sc)

Option(timestampAttribute)
.map(att => read.timestampBy(kv => new Instant(kv._2(att))))
.map(att => read.timestampBy(kv => parseTimestampAttribute(kv._2(att))))
.getOrElse(read)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class PubsubIOTest extends PipelineSpec with ScioIOSpec {
.run()
}

it should "pass correct PubsubIO with attributes" in {
it should "pass correct PubsubIO with attributes and ISO timestamp" in {
testPubsubWithAttributesJob(
Map(PubsubWithAttributesJob.timestampAttribute -> new Instant().toString),
"aX",
Expand All @@ -130,6 +130,15 @@ class PubsubIOTest extends PipelineSpec with ScioIOSpec {
)
}

it should "pass correct PubsubIO with attributes and epoch timestamp" in {
testPubsubWithAttributesJob(
Map(PubsubWithAttributesJob.timestampAttribute -> new Instant().getMillis.toString),
"aX",
"bX",
"cX"
)
}

it should "fail incorrect PubsubIO with attributes" in {
an[AssertionError] should be thrownBy {
testPubsubWithAttributesJob(
Expand Down

0 comments on commit e592000

Please sign in to comment.