Skip to content

Commit

Permalink
Add config builder for DownstreamTarget to avoid ambiguity
Browse files Browse the repository at this point in the history
  • Loading branch information
kenoir committed Jan 9, 2025
1 parent 26dc81a commit 46427de
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 23 deletions.
25 changes: 25 additions & 0 deletions common/lambda/src/main/scala/weco/lambda/Downstream.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package weco.lambda

import com.typesafe.config.Config
import grizzled.slf4j.Logging
import io.circe.Encoder
import software.amazon.awssdk.services.sns.SnsClient
import weco.messaging.sns.{SNSConfig, SNSMessageSender}
import weco.json.JsonUtil.toJson
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig

import scala.util.Try

Expand Down Expand Up @@ -44,3 +47,25 @@ object Downstream {
}
def apply(): Downstream = STDIODownstream
}

// Typesafe specific configuration builder
object DownstreamBuilder extends Logging {
import weco.typesafe.config.builders.EnrichConfig._

def buildDownstreamTarget(config: Config): DownstreamTarget = {
config.getStringOption("downstream.target") match {
case Some("sns") =>
val snsConfig = buildSNSConfig(config)
info(s"Building SNS downstream with config: $snsConfig")
SNS(snsConfig)
case Some("stdio") =>
info("Building StdOut downstream")
StdOut
case Some(unknownTarget) =>
throw new IllegalArgumentException(s"Invalid downstream target: $unknownTarget")
case None =>
warn("No downstream target specified, defaulting to StdOut")
StdOut
}
}
}
55 changes: 55 additions & 0 deletions common/lambda/src/test/scala/weco/lambda/DownstreamTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package weco.lambda

import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.lambda.helpers.ConfigurationTestHelpers
import weco.messaging.sns.SNSConfig

class DownstreamTest extends AnyFunSpec
with ConfigurationTestHelpers
with Matchers {

describe("DownstreamBuilder") {
it("builds a StdOut downstream target") {
val config =
"""
|downstream.target = "stdio"
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut
}

it("builds an SNS downstream target") {
val config =
"""
|downstream.target = "sns"
|aws.sns.topic.arn = "arn:aws:sns:eu-west-1:123456789012:my-topic"
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe SNS(
config = SNSConfig(
topicArn = "arn:aws:sns:eu-west-1:123456789012:my-topic"
)
)
}

it("builds a StdOut downstream target if no downstream target is specified") {
val config =
"""
|""".asConfig

DownstreamBuilder.buildDownstreamTarget(config) shouldBe StdOut
}

it("throws an exception if the downstream target is not recognised") {
val config =
"""
|downstream.target = "invalid"
|""".asConfig

intercept[IllegalArgumentException] {
DownstreamBuilder.buildDownstreamTarget(config)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,4 @@ aws.sns.topic.arn=${?output_topic_arn}
batcher.flush_interval_minutes=${?flush_interval_minutes}
batcher.max_processed_paths=${?max_processed_paths}
batcher.max_batch_size=${?max_batch_size}
batcher.use_downstream=sns
batcher.use_downstream=${?use_downstream}
downstream.target=${?use_downstream}
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package weco.pipeline.batcher.lib

import com.typesafe.config.Config
import weco.lambda.{
ApplicationConfig,
DownstreamTarget,
LambdaConfigurable,
SNS,
StdOut
}
import weco.lambda.DownstreamBuilder.buildDownstreamTarget
import weco.lambda.{ApplicationConfig, DownstreamTarget, LambdaConfigurable, SNS, StdOut}
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig

case class BatcherConfig(
Expand All @@ -21,11 +16,6 @@ trait BatcherConfigurable extends LambdaConfigurable[BatcherConfig] {
def build(rawConfig: Config): BatcherConfig =
BatcherConfig(
maxBatchSize = rawConfig.requireInt("batcher.max_batch_size"),
downstreamTarget = {
rawConfig.requireString("batcher.use_downstream") match {
case "sns" => SNS(buildSNSConfig(rawConfig))
case "stdio" => StdOut
}
}
downstreamTarget = buildDownstreamTarget(rawConfig)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ es.host=${?es_host}
es.port=${?es_port}
es.protocol=${?es_protocol}
es.apikey=${?es_apikey}
relation_embedder.use_downstream=${?use_downstream}
downstream.target=${?use_downstream}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package weco.pipeline.relation_embedder.lib

import com.typesafe.config.Config
import weco.elasticsearch.typesafe.ElasticBuilder.buildElasticClientConfig
import weco.lambda.DownstreamBuilder.buildDownstreamTarget
import weco.elasticsearch.typesafe.ElasticConfig
import weco.lambda._
import weco.messaging.typesafe.SNSBuilder.buildSNSConfig

case class RelationEmbedderConfig(
mergedWorkIndex: String,
Expand All @@ -31,11 +31,6 @@ trait RelationEmbedderConfigurable
affectedWorksScroll =
rawConfig.requireInt("es.works.scroll.affected_works"),
elasticConfig = buildElasticClientConfig(rawConfig),
downstreamTarget = {
rawConfig.requireString("relation_embedder.use_downstream") match {
case "sns" => SNS(buildSNSConfig(rawConfig))
case "stdio" => StdOut
}
}
downstreamTarget = buildDownstreamTarget(rawConfig)
)
}

0 comments on commit 46427de

Please sign in to comment.