Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws msk configs for hmda-platform #4914

Draft
wants to merge 31 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5cf31bb
Update build.sbt
zencircle Aug 1, 2024
30964ba
Update Dependencies.scala
zencircle Aug 1, 2024
883bf4e
Update build.sbt
zencircle Aug 1, 2024
4c959da
Update persistence.conf
zencircle Aug 1, 2024
7430876
Update KafkaUtils.scala
zencircle Aug 1, 2024
13da838
Update Dependencies.scala
zencircle Aug 1, 2024
fa91acb
Update Dependencies.scala
zencircle Aug 3, 2024
eee4aa2
Update KafkaUtils.scala
zencircle Aug 3, 2024
fd54652
Update persistence.conf
zencircle Aug 3, 2024
26f0b38
Update KafkaUtils.scala
zencircle Sep 13, 2024
c777902
Update persistence.conf
zencircle Sep 13, 2024
cd46b87
Update KafkaUtils.scala
zencircle Oct 8, 2024
f751b37
Update HmdaAnalyticsApp.scala
zencircle Oct 8, 2024
98c8d18
Update IrsPublisherApp.scala
zencircle Oct 8, 2024
960aee4
Update application.conf
zencircle Oct 8, 2024
a1f3241
Update application.conf
zencircle Oct 8, 2024
b8125b8
Update ModifiedLarApp.scala
zencircle Oct 8, 2024
504afb2
Update application.conf
zencircle Oct 9, 2024
dd2cc74
Update application.conf
zencircle Oct 9, 2024
e81328b
Update application.conf
zencircle Oct 9, 2024
3928aab
Update HmdaInstitutionApi.scala
zencircle Oct 9, 2024
7bff36b
Update build.sbt
zencircle Oct 9, 2024
0c05ae9
Update Stream.scala
zencircle Oct 9, 2024
fa534e0
Update application.conf
zencircle Oct 9, 2024
f34241e
Update application.conf
zencircle Oct 9, 2024
3eedacc
Update SubmissionErrorsApp.scala
zencircle Oct 9, 2024
f4f0c6a
Update persistence.conf
zencircle Oct 10, 2024
38d0fa8
Update KafkaUtils.scala
zencircle Oct 10, 2024
50b175f
Update application.conf
zencircle Oct 10, 2024
efa0fa7
Update application.conf
zencircle Oct 10, 2024
142e996
Update KafkaUtils.scala
zencircle Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lazy val akkaDeps = Seq(
akkaTestkitTyped,
akkaStreamsTestKit,
akkaCors,
mskdriver,
akkaKafkaStreams,
embeddedKafka,
alpakkaS3,
Expand Down Expand Up @@ -136,6 +137,9 @@ lazy val common = (project in file("common"))
)
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
// addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"),
// unmanagedJars in Compile ++= Seq(new java.io.File("/tmp/aws-msk-iam-auth-2.2.0-all.jar")).classpath,
// unmanagedJars in Runtime ++= Seq(new java.io.File("/tmp/aws-msk-iam-auth-2.2.0-all.jar")).classpath
)
.enablePlugins(BuildInfoPlugin)
.settings(
Expand Down Expand Up @@ -798,4 +802,4 @@ lazy val `hmda-quarterly-data-service` = (project in file ("hmda-quarterly-data-
packageSettings
)
.dependsOn(common % "compile->compile;test->test")
.dependsOn(`hmda-protocol` % "compile->compile;test->test")
.dependsOn(`hmda-protocol` % "compile->compile;test->test")
19 changes: 13 additions & 6 deletions common/src/main/resources/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ kafka {
idle-timeout = ${?KAFKA_IDLE_TIMEOUT}
security.protocol=""
security.protocol=${?KAFKA_SECURITY}
ssl.truststore.location = ""
ssl.truststore.location = ${?TRUSTSTORE_PATH}
ssl.truststore.password = ""
ssl.truststore.password = ${?TRUSTSTORE_PASSWORD}
ssl.endpoint = ""
ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG}
// ssl.truststore.location = ""
// ssl.truststore.location = ${?TRUSTSTORE_PATH}
// ssl.truststore.password = ""
// ssl.truststore.password = ${?TRUSTSTORE_PASSWORD}
// ssl.endpoint = ""
// ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG}
sasl.mechanism="AWS_MSK_IAM"
//sasl.mechanism=${?KAFKA_SASL_MECHANISM}
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
//sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
//sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}"
}

37 changes: 26 additions & 11 deletions common/src/main/scala/hmda/publication/KafkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import hmda.messages.institution.InstitutionEvents.InstitutionKafkaEvent
import hmda.serialization.kafka.InstitutionKafkaEventsSerializer
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{ProducerRecord, Producer => KafkaProducer}
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.config.{SslConfigs,SaslConfigs}
//import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.StringSerializer

import scala.concurrent.Future
Expand All @@ -22,9 +22,13 @@ object KafkaUtils {

val config = ConfigFactory.load()
val kafkaHosts = config.getString("kafka.hosts")
val truststoreLocation = config.getString("kafka.ssl.truststore.location")
val truststorePassword = config.getString("kafka.ssl.truststore.password")
val endpointIdAlgo = config.getString("kafka.ssl.endpoint")
// val truststoreLocation = config.getString("kafka.ssl.truststore.location")
// val truststorePassword = config.getString("kafka.ssl.truststore.password")
// val endpointIdAlgo = config.getString("kafka.ssl.endpoint")
val securityprotocol = config.getString("kafka.security.protocol")
val saslmechanism = config.getString("kafka.sasl.mechanism")
val sasljaasconfig= config.getString("kafka.sasl.jaas.config")
val saslclientcallbackhandler= config.getString("kafka.sasl.client.callback.handler.class")

def getStringKafkaProducer(system: ActorSystem[_]): KafkaProducer[String, String] = {

Expand All @@ -45,13 +49,22 @@ object KafkaUtils {
producerSettings.createKafkaProducer()
}

private def getKafkaConfig: Map[String, String] = {
if (!truststoreLocation.isEmpty && !truststorePassword.isEmpty) {
def getKafkaConfig: Map[String, String] = {

if( securityprotocol=="SASL_SSL") {
//if (!truststoreLocation.isEmpty && !truststorePassword.isEmpty) {
Map(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> SecurityProtocol.SSL.name,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreLocation,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> truststorePassword,
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> endpointIdAlgo
//CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> SecurityProtocol.SSL.name,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> securityprotocol,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL -> saslmechanism,

//SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreLocation,
//SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> truststorePassword,
//SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> endpointIdAlgo,

SaslConfigs.SASL_MECHANISM -> saslmechanism,
SaslConfigs.SASL_JAAS_CONFIG -> sasljaasconfig,
SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS -> saslclientcallbackhandler
)
} else {
Map()
Expand All @@ -68,6 +81,7 @@ object KafkaUtils {
val producerSettings =
ProducerSettings(system.toClassic, new StringSerializer, new InstitutionKafkaEventsSerializer)
.withBootstrapServers(kafkaHosts)
.withProperties(getKafkaConfig)
.withProducer(kafkaProducer)

Source
Expand All @@ -84,6 +98,7 @@ object KafkaUtils {
val producerSettings =
ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer)
.withBootstrapServers(kafkaHosts)
.withProperties(getKafkaConfig)
.withProducer(producer)

Source
Expand Down
16 changes: 15 additions & 1 deletion email-service/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ kafka {
hosts = ${?KAFKA_CLUSTER_HOSTS}
idle-timeout = 5
idle-timeout = ${?KAFKA_IDLE_TIMEOUT}
security.protocol=""
security.protocol=${?KAFKA_SECURITY}
// ssl.truststore.location = ""
// ssl.truststore.location = ${?TRUSTSTORE_PATH}
// ssl.truststore.password = ""
// ssl.truststore.password = ${?TRUSTSTORE_PASSWORD}
// ssl.endpoint = ""
// ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG}
sasl.mechanism="AWS_MSK_IAM"
//sasl.mechanism=${?KAFKA_SASL_MECHANISM}
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
//sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
//sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}"

topic = "hmda-email"
topic = ${?KAFKA_EMAIL_TOPIC}
Expand Down Expand Up @@ -48,4 +62,4 @@ hmda {
filter {
bank-filter-list="BANK1LEIFORTEST12345,BANK3LEIFORTEST12345,BANK4LEIFORTEST12345,999999LE3ZOZXUS7W648,28133080042813308004,B90YWS6AFX2LGWOXJ1LD,FAKE0SWARM0BANK00000,FAKE0SWARM0BANK00001,FAKE0SWARM0BANK00002,FAKE0SWARM0BANK00003,NEWMANLEITEST1234678"
bank-filter-list=${?BANK_FILTER_LIST}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.TimeZone
import scala.concurrent.Future
import scala.concurrent.duration._
import org.slf4j.LoggerFactory
import hmda.publication.KafkaUtils._

object Stream {
val log = LoggerFactory.getLogger("hmda")
Expand All @@ -36,7 +37,8 @@ object Stream {
.withBootstrapServers(bootstrapServers)
.withGroupId(HmdaGroups.emailGroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

.withProperties(getKafkaConfig)

Consumer
.committableSource(settings, Subscriptions.topics(HmdaTopics.emailTopic))
.asSourceWithContext(_.committableOffset) // hide context
Expand Down
17 changes: 1 addition & 16 deletions hmda-analytics/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,6 @@ cassandra-snapshot-store {
authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD}
}

kafka {
hosts = "localhost:9092"
hosts = ${?KAFKA_CLUSTER_HOSTS}
idle-timeout = 5
idle-timeout = ${?KAFKA_IDLE_TIMEOUT}
ssl {
truststore.location=""
truststore.location = ${?KAFKA_SSL_LOCATION}
truststore.password=""
truststore.password = ${?KAFKA_SSL_PASSWORD}
endpoint=""
endpoint = ${?KAFKA_SSL_ENDPOINT}
}
}

hmda {
analytics {
parallelism = 1
Expand Down Expand Up @@ -178,4 +163,4 @@ hmda {
filter {
bank-filter-list=""
bank-filter-list=${?BANK_FILTER_LIST}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import hmda.model.filing.submission.SubmissionId
import hmda.model.filing.ts.TransmittalSheet
import hmda.parser.filing.lar.LarCsvParser
import hmda.parser.filing.ts.TsCsvParser
import hmda.publication.KafkaUtils.kafkaHosts
//import hmda.publication.KafkaUtils.kafkaHosts
import hmda.publication.KafkaUtils._
import hmda.query.DbConfiguration.dbConfig
import hmda.query.HmdaQuery.{readRawData, readSubmission}
import hmda.query.ts.{TransmittalSheetConverter, TransmittalSheetEntity}
Expand Down Expand Up @@ -185,6 +186,7 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo
.withBootstrapServers(kafkaHosts)
.withGroupId(HmdaGroups.analyticsGroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperties(getKafkaConfig)

Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.analyticsTopic))
Expand Down
16 changes: 1 addition & 15 deletions institutions-api/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ institution_db {
}
}

kafka {
hosts = "localhost:9092"
hosts = ${?KAFKA_CLUSTER_HOSTS}
idle-timeout = 5
idle-timeout = ${?KAFKA_IDLE_TIMEOUT}
ssl {
truststore.location=""
truststore.location = ${?KAFKA_SSL_LOCATION}
truststore.password=""
truststore.password = ${?KAFKA_SSL_PASSWORD}
endpoint=""
endpoint = ${?KAFKA_SSL_ENDPOINT}
}
}

filter {
bank-filter-list="BANK1LEIFORTEST12345,BANK3LEIFORTEST12345,BANK4LEIFORTEST12345,999999LE3ZOZXUS7W648,28133080042813308004,B90YWS6AFX2LGWOXJ1LD"
Expand All @@ -75,4 +61,4 @@ counts:{
2019 = "transmittalsheet2019_three_year_04122023"
2018 = "transmittalsheet2018_qpub_06082022"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object HmdaInstitutionApi extends App {
.withBootstrapServers(kafkaHosts)
.withGroupId(HmdaGroups.institutionsGroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

.withProperties(getKafkaConfig)
val control: DrainingControl[Done] = Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.institutionTopic))
.mapAsync(1)(msg => processData(msg.record.value()).map(_ => msg.committableOffset))
Expand Down Expand Up @@ -86,4 +86,4 @@ object HmdaInstitutionApi extends App {
.run()

}
// $COVERAGE-ON$
// $COVERAGE-ON$
15 changes: 0 additions & 15 deletions irs-publisher/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,6 @@ cassandra-snapshot-store {
authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD}
}

kafka {
hosts = "localhost:9092"
hosts = ${?KAFKA_CLUSTER_HOSTS}
idle-timeout = 5
idle-timeout = ${?KAFKA_IDLE_TIMEOUT}
ssl {
truststore.location=""
truststore.location = ${?KAFKA_SSL_LOCATION}
truststore.password=""
truststore.password = ${?KAFKA_SSL_PASSWORD}
endpoint=""
endpoint = ${?KAFKA_SSL_ENDPOINT}
}
}

aws {
access-key-id = ""
access-key-id = ${?AWS_ACCESS_KEY_ID}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object IrsPublisherApp extends App {
.withBootstrapServers(kafkaHosts)
.withGroupId(HmdaGroups.irsGroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperties(getKafkaConfig)

Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.irsTopic))
Expand All @@ -83,4 +84,4 @@ object IrsPublisherApp extends App {
.toMat(Sink.ignore)(Keep.right)
.run()

}
}
15 changes: 1 addition & 14 deletions modified-lar/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ cassandra-snapshot-store {
authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD}
}

kafka {
hosts = "localhost:9092"
hosts = ${?KAFKA_CLUSTER_HOSTS}
ssl {
truststore.location=""
truststore.location = ${?KAFKA_SSL_LOCATION}
truststore.password=""
truststore.password = ${?KAFKA_SSL_PASSWORD}
endpoint=""
endpoint = ${?KAFKA_SSL_ENDPOINT}
}
}

aws {
access-key-id = ""
access-key-id = ${?AWS_ACCESS_KEY_ID}
Expand All @@ -77,7 +64,7 @@ hmda {
modified {
parallelism = 1
regenerateMlar = false
regenerateMlar = ${IS_REGENERATE_MLAR}
regenerateMlar = ${?IS_REGENERATE_MLAR}
generateS3Files = true
generateS3Files = ${?IS_GENERATE_MLAR_S3_FIES}
creteDispositionRecord = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ object ModifiedLarApp extends App {
.withBootstrapServers(kafkaHosts)
.withGroupId(HmdaGroups.modifiedLarGroup)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
.withProperties(getKafkaConfig)

val (control, streamCompleted) =
Consumer
Expand Down Expand Up @@ -143,4 +144,4 @@ object ModifiedLarApp extends App {
}

}
// $COVERAGE-ON$
// $COVERAGE-ON$
6 changes: 4 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ object Dependencies {
lazy val scalacheckShapeless = "com.github.alexarchambault" %% "scalacheck-shapeless_1.14" % Version.scalacheckShapeless % Test
lazy val diffx = "com.softwaremill.diffx" %% "diffx-core" % Version.diffx % Test
lazy val kubernetesApi = "io.kubernetes" % "client-java" % Version.kubernetesApi

// https://mvnrepository.com/artifact/software.amazon.msk/aws-msk-iam-auth
lazy val mskdriver = "software.amazon.msk" % "aws-msk-iam-auth" % "2.2.0"

// overriding the log4j-slf4j bridge used by spring, transitively brought in by s3mock
// this is needed because of CVE-2021-44228 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-44228
lazy val log4jToSlf4j = "org.apache.logging.log4j" % "log4j-to-slf4j" % Version.log4j % Test
}
}
16 changes: 15 additions & 1 deletion submission-errors/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
kafka {
hosts = "localhost:9092"
hosts = ${?KAFKA_CLUSTER_HOSTS}
security.protocol=""
security.protocol=${?KAFKA_SECURITY}
// ssl.truststore.location = ""
// ssl.truststore.location = ${?TRUSTSTORE_PATH}
// ssl.truststore.password = ""
// ssl.truststore.password = ${?TRUSTSTORE_PASSWORD}
// ssl.endpoint = ""
// ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG}
sasl.mechanism="AWS_MSK_IAM"
//sasl.mechanism=${?KAFKA_SASL_MECHANISM}
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
//sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
//sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}"

topic = "submission-errors"
topic = ${?KAFKA_TOPIC}
Expand Down Expand Up @@ -60,4 +74,4 @@ submission-errors-db {
connectionTimeout = 20000
validationTimeout = 10000
}
}
}
Loading
Loading