diff --git a/build.sbt b/build.sbt index 231d05ce9a..d096674d1a 100644 --- a/build.sbt +++ b/build.sbt @@ -31,6 +31,7 @@ lazy val akkaDeps = Seq( akkaTestkitTyped, akkaStreamsTestKit, akkaCors, + mskdriver, akkaKafkaStreams, embeddedKafka, alpakkaS3, @@ -136,6 +137,9 @@ lazy val common = (project in file("common")) ) ), addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") + // addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"), + // unmanagedJars in Compile ++= Seq(new java.io.File("/tmp/aws-msk-iam-auth-2.2.0-all.jar")).classpath, + // unmanagedJars in Runtime ++= Seq(new java.io.File("/tmp/aws-msk-iam-auth-2.2.0-all.jar")).classpath ) .enablePlugins(BuildInfoPlugin) .settings( @@ -798,4 +802,4 @@ lazy val `hmda-quarterly-data-service` = (project in file ("hmda-quarterly-data- packageSettings ) .dependsOn(common % "compile->compile;test->test") - .dependsOn(`hmda-protocol` % "compile->compile;test->test") \ No newline at end of file + .dependsOn(`hmda-protocol` % "compile->compile;test->test") diff --git a/common/src/main/resources/persistence.conf b/common/src/main/resources/persistence.conf index 6feadb5045..274efec3dd 100644 --- a/common/src/main/resources/persistence.conf +++ b/common/src/main/resources/persistence.conf @@ -82,10 +82,17 @@ kafka { idle-timeout = ${?KAFKA_IDLE_TIMEOUT} security.protocol="" security.protocol=${?KAFKA_SECURITY} - ssl.truststore.location = "" - ssl.truststore.location = ${?TRUSTSTORE_PATH} - ssl.truststore.password = "" - ssl.truststore.password = ${?TRUSTSTORE_PASSWORD} - ssl.endpoint = "" - ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG} + // ssl.truststore.location = "" + // ssl.truststore.location = ${?TRUSTSTORE_PATH} + // ssl.truststore.password = "" + // ssl.truststore.password = ${?TRUSTSTORE_PASSWORD} + // ssl.endpoint = "" + // ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG} + sasl.mechanism="AWS_MSK_IAM" + //sasl.mechanism=${?KAFKA_SASL_MECHANISM} + sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" + //sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}" + sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" + //sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}" } + diff --git a/common/src/main/scala/hmda/publication/KafkaUtils.scala b/common/src/main/scala/hmda/publication/KafkaUtils.scala index b3fa725cab..d62612a481 100644 --- a/common/src/main/scala/hmda/publication/KafkaUtils.scala +++ b/common/src/main/scala/hmda/publication/KafkaUtils.scala @@ -12,8 +12,8 @@ import hmda.messages.institution.InstitutionEvents.InstitutionKafkaEvent import hmda.serialization.kafka.InstitutionKafkaEventsSerializer import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.producer.{ProducerRecord, Producer => KafkaProducer} -import org.apache.kafka.common.config.SslConfigs -import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.config.{SslConfigs,SaslConfigs} +//import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.StringSerializer import scala.concurrent.Future @@ -22,9 +22,13 @@ object KafkaUtils { val config = ConfigFactory.load() val kafkaHosts = config.getString("kafka.hosts") - val truststoreLocation = config.getString("kafka.ssl.truststore.location") - val truststorePassword = config.getString("kafka.ssl.truststore.password") - val endpointIdAlgo = config.getString("kafka.ssl.endpoint") + // val truststoreLocation = config.getString("kafka.ssl.truststore.location") + // val truststorePassword = config.getString("kafka.ssl.truststore.password") + // val endpointIdAlgo = config.getString("kafka.ssl.endpoint") + val securityprotocol = config.getString("kafka.security.protocol") + val saslmechanism = config.getString("kafka.sasl.mechanism") + val sasljaasconfig= config.getString("kafka.sasl.jaas.config") + val saslclientcallbackhandler= config.getString("kafka.sasl.client.callback.handler.class") def getStringKafkaProducer(system: ActorSystem[_]): KafkaProducer[String, String] = { @@ -45,13 +49,22 @@ object KafkaUtils { producerSettings.createKafkaProducer() } - private def getKafkaConfig: Map[String, String] = { - if (!truststoreLocation.isEmpty && !truststorePassword.isEmpty) { + def getKafkaConfig: Map[String, String] = { + + if( securityprotocol=="SASL_SSL") { + //if (!truststoreLocation.isEmpty && !truststorePassword.isEmpty) { Map( - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> SecurityProtocol.SSL.name, - SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreLocation, - SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> truststorePassword, - SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> endpointIdAlgo + //CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> SecurityProtocol.SSL.name, + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> securityprotocol, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL -> saslmechanism, + + //SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> truststoreLocation, + //SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> truststorePassword, + //SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> endpointIdAlgo, + + SaslConfigs.SASL_MECHANISM -> saslmechanism, + SaslConfigs.SASL_JAAS_CONFIG -> sasljaasconfig, + SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS -> saslclientcallbackhandler ) } else { Map() @@ -68,6 +81,7 @@ object KafkaUtils { val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new InstitutionKafkaEventsSerializer) .withBootstrapServers(kafkaHosts) + .withProperties(getKafkaConfig) .withProducer(kafkaProducer) Source @@ -84,6 +98,7 @@ object KafkaUtils { val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer) .withBootstrapServers(kafkaHosts) + .withProperties(getKafkaConfig) .withProducer(producer) Source diff --git a/email-service/src/main/resources/application.conf b/email-service/src/main/resources/application.conf index 1c6c64f7a8..edcef95c6c 100644 --- a/email-service/src/main/resources/application.conf +++ b/email-service/src/main/resources/application.conf @@ -9,6 +9,20 @@ kafka { hosts = ${?KAFKA_CLUSTER_HOSTS} idle-timeout = 5 idle-timeout = ${?KAFKA_IDLE_TIMEOUT} + security.protocol="" + security.protocol=${?KAFKA_SECURITY} + // ssl.truststore.location = "" + // ssl.truststore.location = ${?TRUSTSTORE_PATH} + // ssl.truststore.password = "" + // ssl.truststore.password = ${?TRUSTSTORE_PASSWORD} + // ssl.endpoint = "" + // ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG} + sasl.mechanism="AWS_MSK_IAM" + //sasl.mechanism=${?KAFKA_SASL_MECHANISM} + sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" + //sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}" + sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" + //sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}" topic = "hmda-email" topic = ${?KAFKA_EMAIL_TOPIC} @@ -48,4 +62,4 @@ hmda { filter { bank-filter-list="BANK1LEIFORTEST12345,BANK3LEIFORTEST12345,BANK4LEIFORTEST12345,999999LE3ZOZXUS7W648,28133080042813308004,B90YWS6AFX2LGWOXJ1LD,FAKE0SWARM0BANK00000,FAKE0SWARM0BANK00001,FAKE0SWARM0BANK00002,FAKE0SWARM0BANK00003,NEWMANLEITEST1234678" bank-filter-list=${?BANK_FILTER_LIST} -} \ No newline at end of file +} diff --git a/email-service/src/main/scala/hmda/publication/lar/streams/Stream.scala b/email-service/src/main/scala/hmda/publication/lar/streams/Stream.scala index c38f2a5863..b2a9d96b9d 100644 --- a/email-service/src/main/scala/hmda/publication/lar/streams/Stream.scala +++ b/email-service/src/main/scala/hmda/publication/lar/streams/Stream.scala @@ -23,6 +23,7 @@ import java.util.TimeZone import scala.concurrent.Future import scala.concurrent.duration._ import org.slf4j.LoggerFactory +import hmda.publication.KafkaUtils._ object Stream { val log = LoggerFactory.getLogger("hmda") @@ -36,7 +37,8 @@ object Stream { .withBootstrapServers(bootstrapServers) .withGroupId(HmdaGroups.emailGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - + .withProperties(getKafkaConfig) + Consumer .committableSource(settings, Subscriptions.topics(HmdaTopics.emailTopic)) .asSourceWithContext(_.committableOffset) // hide context diff --git a/hmda-analytics/src/main/resources/application.conf b/hmda-analytics/src/main/resources/application.conf index 3cc1f5c774..316bbe824e 100644 --- a/hmda-analytics/src/main/resources/application.conf +++ b/hmda-analytics/src/main/resources/application.conf @@ -38,21 +38,6 @@ cassandra-snapshot-store { authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD} } -kafka { - hosts = "localhost:9092" - hosts = ${?KAFKA_CLUSTER_HOSTS} - idle-timeout = 5 - idle-timeout = ${?KAFKA_IDLE_TIMEOUT} - ssl { - truststore.location="" - truststore.location = ${?KAFKA_SSL_LOCATION} - truststore.password="" - truststore.password = ${?KAFKA_SSL_PASSWORD} - endpoint="" - endpoint = ${?KAFKA_SSL_ENDPOINT} - } -} - hmda { analytics { parallelism = 1 @@ -178,4 +163,4 @@ hmda { filter { bank-filter-list="" bank-filter-list=${?BANK_FILTER_LIST} -} \ No newline at end of file +} diff --git a/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala b/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala index bfcb29ca54..4640c3f8b4 100644 --- a/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala +++ b/hmda-analytics/src/main/scala/hmda/analytics/HmdaAnalyticsApp.scala @@ -18,7 +18,8 @@ import hmda.model.filing.submission.SubmissionId import hmda.model.filing.ts.TransmittalSheet import hmda.parser.filing.lar.LarCsvParser import hmda.parser.filing.ts.TsCsvParser -import hmda.publication.KafkaUtils.kafkaHosts +//import hmda.publication.KafkaUtils.kafkaHosts +import hmda.publication.KafkaUtils._ import hmda.query.DbConfiguration.dbConfig import hmda.query.HmdaQuery.{readRawData, readSubmission} import hmda.query.ts.{TransmittalSheetConverter, TransmittalSheetEntity} @@ -185,6 +186,7 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo .withBootstrapServers(kafkaHosts) .withGroupId(HmdaGroups.analyticsGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperties(getKafkaConfig) Consumer .committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.analyticsTopic)) diff --git a/institutions-api/src/main/resources/application.conf b/institutions-api/src/main/resources/application.conf index ed4a1a600c..c3772f3548 100644 --- a/institutions-api/src/main/resources/application.conf +++ b/institutions-api/src/main/resources/application.conf @@ -45,20 +45,6 @@ institution_db { } } -kafka { - hosts = "localhost:9092" - hosts = ${?KAFKA_CLUSTER_HOSTS} - idle-timeout = 5 - idle-timeout = ${?KAFKA_IDLE_TIMEOUT} - ssl { - truststore.location="" - truststore.location = ${?KAFKA_SSL_LOCATION} - truststore.password="" - truststore.password = ${?KAFKA_SSL_PASSWORD} - endpoint="" - endpoint = ${?KAFKA_SSL_ENDPOINT} - } -} filter { bank-filter-list="BANK1LEIFORTEST12345,BANK3LEIFORTEST12345,BANK4LEIFORTEST12345,999999LE3ZOZXUS7W648,28133080042813308004,B90YWS6AFX2LGWOXJ1LD" @@ -75,4 +61,4 @@ counts:{ 2019 = "transmittalsheet2019_three_year_04122023" 2018 = "transmittalsheet2018_qpub_06082022" } -} \ No newline at end of file +} diff --git a/institutions-api/src/main/scala/hmda/institution/HmdaInstitutionApi.scala b/institutions-api/src/main/scala/hmda/institution/HmdaInstitutionApi.scala index c311480f4e..55e9ed7981 100644 --- a/institutions-api/src/main/scala/hmda/institution/HmdaInstitutionApi.scala +++ b/institutions-api/src/main/scala/hmda/institution/HmdaInstitutionApi.scala @@ -56,7 +56,7 @@ object HmdaInstitutionApi extends App { .withBootstrapServers(kafkaHosts) .withGroupId(HmdaGroups.institutionsGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - + .withProperties(getKafkaConfig) val control: DrainingControl[Done] = Consumer .committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.institutionTopic)) .mapAsync(1)(msg => processData(msg.record.value()).map(_ => msg.committableOffset)) @@ -86,4 +86,4 @@ object HmdaInstitutionApi extends App { .run() } -// $COVERAGE-ON$ \ No newline at end of file +// $COVERAGE-ON$ diff --git a/irs-publisher/src/main/resources/application.conf b/irs-publisher/src/main/resources/application.conf index 3970bb53e6..a0314d4b4c 100644 --- a/irs-publisher/src/main/resources/application.conf +++ b/irs-publisher/src/main/resources/application.conf @@ -49,21 +49,6 @@ cassandra-snapshot-store { authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD} } -kafka { - hosts = "localhost:9092" - hosts = ${?KAFKA_CLUSTER_HOSTS} - idle-timeout = 5 - idle-timeout = ${?KAFKA_IDLE_TIMEOUT} - ssl { - truststore.location="" - truststore.location = ${?KAFKA_SSL_LOCATION} - truststore.password="" - truststore.password = ${?KAFKA_SSL_PASSWORD} - endpoint="" - endpoint = ${?KAFKA_SSL_ENDPOINT} - } -} - aws { access-key-id = "" access-key-id = ${?AWS_ACCESS_KEY_ID} diff --git a/irs-publisher/src/main/scala/hmda/publication/lar/IrsPublisherApp.scala b/irs-publisher/src/main/scala/hmda/publication/lar/IrsPublisherApp.scala index 8233f3a5a3..e9dde4bb8b 100644 --- a/irs-publisher/src/main/scala/hmda/publication/lar/IrsPublisherApp.scala +++ b/irs-publisher/src/main/scala/hmda/publication/lar/IrsPublisherApp.scala @@ -60,6 +60,7 @@ object IrsPublisherApp extends App { .withBootstrapServers(kafkaHosts) .withGroupId(HmdaGroups.irsGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperties(getKafkaConfig) Consumer .committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.irsTopic)) @@ -83,4 +84,4 @@ object IrsPublisherApp extends App { .toMat(Sink.ignore)(Keep.right) .run() -} \ No newline at end of file +} diff --git a/modified-lar/src/main/resources/application.conf b/modified-lar/src/main/resources/application.conf index 10fbceb1a1..9525b561dd 100644 --- a/modified-lar/src/main/resources/application.conf +++ b/modified-lar/src/main/resources/application.conf @@ -46,19 +46,6 @@ cassandra-snapshot-store { authentication.password = ${?CASSANDRA_CLUSTER_PASSWORD} } -kafka { - hosts = "localhost:9092" - hosts = ${?KAFKA_CLUSTER_HOSTS} - ssl { - truststore.location="" - truststore.location = ${?KAFKA_SSL_LOCATION} - truststore.password="" - truststore.password = ${?KAFKA_SSL_PASSWORD} - endpoint="" - endpoint = ${?KAFKA_SSL_ENDPOINT} - } -} - aws { access-key-id = "" access-key-id = ${?AWS_ACCESS_KEY_ID} @@ -77,7 +64,7 @@ hmda { modified { parallelism = 1 regenerateMlar = false - regenerateMlar = ${IS_REGENERATE_MLAR} + regenerateMlar = ${?IS_REGENERATE_MLAR} generateS3Files = true generateS3Files = ${?IS_GENERATE_MLAR_S3_FIES} creteDispositionRecord = false diff --git a/modified-lar/src/main/scala/hmda/publication/lar/ModifiedLarApp.scala b/modified-lar/src/main/scala/hmda/publication/lar/ModifiedLarApp.scala index 3c020975e6..cf8ce6a428 100644 --- a/modified-lar/src/main/scala/hmda/publication/lar/ModifiedLarApp.scala +++ b/modified-lar/src/main/scala/hmda/publication/lar/ModifiedLarApp.scala @@ -107,6 +107,7 @@ object ModifiedLarApp extends App { .withBootstrapServers(kafkaHosts) .withGroupId(HmdaGroups.modifiedLarGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + .withProperties(getKafkaConfig) val (control, streamCompleted) = Consumer @@ -143,4 +144,4 @@ object ModifiedLarApp extends App { } } -// $COVERAGE-ON$ \ No newline at end of file +// $COVERAGE-ON$ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 452dbaf0f0..84457d8c3a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -84,8 +84,10 @@ object Dependencies { lazy val scalacheckShapeless = "com.github.alexarchambault" %% "scalacheck-shapeless_1.14" % Version.scalacheckShapeless % Test lazy val diffx = "com.softwaremill.diffx" %% "diffx-core" % Version.diffx % Test lazy val kubernetesApi = "io.kubernetes" % "client-java" % Version.kubernetesApi - + // https://mvnrepository.com/artifact/software.amazon.msk/aws-msk-iam-auth + lazy val mskdriver = "software.amazon.msk" % "aws-msk-iam-auth" % "2.2.0" + // overriding the log4j-slf4j bridge used by spring, transitively brought in by s3mock // this is needed because of CVE-2021-44228 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-44228 lazy val log4jToSlf4j = "org.apache.logging.log4j" % "log4j-to-slf4j" % Version.log4j % Test -} \ No newline at end of file +} diff --git a/submission-errors/src/main/resources/application.conf b/submission-errors/src/main/resources/application.conf index 05a3311d51..d34e473c70 100644 --- a/submission-errors/src/main/resources/application.conf +++ b/submission-errors/src/main/resources/application.conf @@ -1,6 +1,20 @@ kafka { hosts = "localhost:9092" hosts = ${?KAFKA_CLUSTER_HOSTS} + security.protocol="" + security.protocol=${?KAFKA_SECURITY} + // ssl.truststore.location = "" + // ssl.truststore.location = ${?TRUSTSTORE_PATH} + // ssl.truststore.password = "" + // ssl.truststore.password = ${?TRUSTSTORE_PASSWORD} + // ssl.endpoint = "" + // ssl.endpoint = ${?KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG} + sasl.mechanism="AWS_MSK_IAM" + //sasl.mechanism=${?KAFKA_SASL_MECHANISM} + sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;" + //sasl.jaas.config="{?KAFKA_SASL_JAAS_CONFIG}" + sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler" + //sasl.client.callback.handler.class="{?KAFKA_SASL_CLASS}" topic = "submission-errors" topic = ${?KAFKA_TOPIC} @@ -60,4 +74,4 @@ submission-errors-db { connectionTimeout = 20000 validationTimeout = 10000 } -} \ No newline at end of file +} diff --git a/submission-errors/src/main/scala/hmda/submissionerrors/SubmissionErrorsApp.scala b/submission-errors/src/main/scala/hmda/submissionerrors/SubmissionErrorsApp.scala index 34e3b5b5b2..1f83ce50f7 100644 --- a/submission-errors/src/main/scala/hmda/submissionerrors/SubmissionErrorsApp.scala +++ b/submission-errors/src/main/scala/hmda/submissionerrors/SubmissionErrorsApp.scala @@ -14,6 +14,8 @@ import hmda.submissionerrors.streams.SubmissionProcessor.{ handleMessages, proce import monix.execution.Scheduler import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer +import hmda.publication.KafkaUtils._ + // $COVERAGE-OFF$ object SubmissionErrorsApp extends App { val config = ConfigFactory.load() @@ -36,6 +38,7 @@ object SubmissionErrorsApp extends App { ).withBootstrapServers(kafkaHosts) .withGroupId(HmdaGroups.submissionErrorsGroup) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withProperties(getKafkaConfig) val kafkaCommitterSettings: CommitterSettings = CommitterSettings(classicSystem) @@ -56,4 +59,4 @@ object SubmissionErrorsApp extends App { CoordinatedShutdown(classicSystem) .addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "stop-kafka-consumer")(() => killSwitch.shutdown()) } -// $COVERAGE-ON$ \ No newline at end of file +// $COVERAGE-ON$