Skip to content

Commit

Permalink
Merge pull request #2 from reugn/develop
Browse files Browse the repository at this point in the history
v0.2.0
  • Loading branch information
reugn authored Oct 23, 2021
2 parents 00a6ebc + 6b0bb9a commit 908f713
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 101 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Build
on:
pull_request:
push:

jobs:
build:
strategy:
fail-fast: false
matrix:
os: [ ubuntu-latest ]
scala: [ 2.12.15, 2.13.6 ]
java:
- [email protected]
platform: [ jvm ]

runs-on: ${{ matrix.os }}

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v12
with:
java-version: ${{ matrix.java }}

- name: Cache sbt
uses: actions/cache@v2
with:
path: |
~/.sbt
~/.ivy2/cache
~/.coursier/cache/v1
~/.cache/coursier/v1
~/AppData/Local/Coursier/Cache/v1
~/Library/Caches/Coursier/v1
key: ${{ runner.os }}-sbt-cache-v2-${{ hashFiles('**/*.sbt') }}-${{ hashFiles('project/build.properties') }}

- name: Build and test
run: sbt ++${{ matrix.scala }} test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ build
.classpath
.project
.settings
.bsp
12 changes: 0 additions & 12 deletions .travis.yml

This file was deleted.

20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# memento
[![Build Status](https://travis-ci.com/reugn/memento.svg?branch=master)](https://travis-ci.com/reugn/memento)
[![Build](https://github.com/reugn/memento/actions/workflows/build.yml/badge.svg)](https://github.com/reugn/memento/actions/workflows/build.yml)

With [Kafka](https://kafka.apache.org/), we can't keep on retry a message without blocking the whole partition. But what if we would like
to delay/reprocess the current message and go ahead.
Kafka lacks a delayed message consumption mechanism as well.
What if we could hack the Kafka Streams and turn the flow to a delayed Kafka producer...
Using [Apache Kafka](https://kafka.apache.org/), we can't keep on retrying a message without blocking the entire partition. But what if we would like
to delay/reprocess the current message and move on.
Kafka does not have a delayed message reception mechanism out of the box.
What if we could hack Kafka Streams and turn the flow into a delayed Kafka producer...

## Introduction
`memento` is a Kafka Streams application that could come in handy when you want to:
* reprocess particular Kafka messages without blocking the partition
* submit a delayed Kafka message
`memento` is a Kafka Streams application that could come in handy when you need to:
* Reprocess particular Kafka messages without blocking the partition
* Submit a delayed Kafka message

The message should contain the following headers:
* `origin` - a target topic name
* `ts` - a timestamp to emit the message

Delayed message submission via HTTP is also available.
Delayed message submission over HTTP is also supported.
```
curl --location --request POST 'localhost:8080/store' \
--header 'Content-Type: application/json' \
Expand All @@ -27,7 +27,7 @@ curl --location --request POST 'localhost:8080/store' \
}'
```

The project utilizes the `KeyValueStore` as a messages storage. Inject your own implementation using `Guice` module.
The project utilizes the `KeyValueStore` as message storage. Inject your own implementation using the `Guice` module.
More persistent KeyValueStores:
* [kafka-aerospike-state-store](https://github.com/reugn/kafka-aerospike-state-store)

Expand Down
40 changes: 21 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,29 @@ import sbtassembly.AssemblyPlugin.autoImport.assemblyMergeStrategy

name := "memento"
organization := "com.github.reugn"
scalaVersion := "2.12.11"
crossScalaVersions := Seq(scalaVersion.value, "2.13.2")
scalaVersion := "2.12.15"
crossScalaVersions := Seq(scalaVersion.value, "2.13.6")

val kafkaVersion = "2.5.0"
val kafkaVersion = "2.8.1"
val akkaVersion = "2.6.15"
val akkaHttpVersion = "10.2.6"

libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.4.0",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"com.typesafe" % "config" % "1.4.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.4",
"javax.inject" % "javax.inject" % "1",
"com.google.inject" % "guice" % "4.2.3",
"com.typesafe.akka" %% "akka-actor" % "2.6.5",
"com.typesafe.akka" %% "akka-stream" % "2.6.5",
"com.typesafe.akka" %% "akka-http" % "10.1.12",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.12",
"com.google.inject" % "guice" % "5.0.1",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.apache.kafka" %% "kafka" % kafkaVersion,
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion,
"net.logstash.logback" % "logstash-logback-encoder" % "3.5",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.scalatest" %% "scalatest" % "3.1.2" % Test
"net.logstash.logback" % "logstash-logback-encoder" % "6.6",
"ch.qos.logback" % "logback-classic" % "1.2.6",
"org.scalatest" %% "scalatest" % "3.2.10" % Test
)

excludeDependencies ++= Seq(
Expand All @@ -40,18 +42,18 @@ scalacOptions := Seq(
"-Xlint:-missing-interpolator"
)

mainClass in(Compile, run) := Some("com.github.reugn.memento.MementoApp")
Compile / run / mainClass := Some("com.github.reugn.memento.MementoApp")

licenses += ("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0.html"))

assemblyOutputPath in assembly := baseDirectory.value /
assembly / assemblyOutputPath := baseDirectory.value /
"assembly" / (name.value + "-" + version.value + ".jar")

assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs@_*) => MergeStrategy.discard
assembly / assemblyMergeStrategy := {
case PathList("META-INF", _*) => MergeStrategy.discard
case PathList("reference.conf") => MergeStrategy.concat
case _ => MergeStrategy.first
}

unmanagedResourceDirectories in Compile += baseDirectory.value / "conf"
cancelable in Global := true
Compile / unmanagedResourceDirectories += baseDirectory.value / "conf"
Global / cancelable := true
15 changes: 11 additions & 4 deletions conf/application.conf
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
kafka {
consumer {
topic = "test"
topic = "test"
group = "memento-app"
}
bootstrap.servers = "127.0.0.1:9092"
parallelism.factor = 16
}

emit_interval_millis = 1000
http {
server {
host = "0.0.0.0"
port = 8080
}
routes {
store = true
}
}

http_server_host = "0.0.0.0"
http_server_port = 8080
emit_interval_millis = 1000
1 change: 0 additions & 1 deletion project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
resolvers += Resolver.url("bintray-sbt-plugins", url("https://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.10
sbt.version = 1.5.5
3 changes: 1 addition & 2 deletions src/main/scala/com/github/reugn/memento/Module.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.reugn.memento

import java.util

import akka.actor.ActorSystem
import com.github.reugn.memento.state.{DelayRegulator, LocalRegulator}
import com.github.reugn.memento.utils.StateStoreProxy
Expand All @@ -10,6 +8,7 @@ import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder, Stores}

import java.util
import scala.concurrent.ExecutionContext

class Module extends AbstractModule {
Expand Down
54 changes: 32 additions & 22 deletions src/main/scala/com/github/reugn/memento/http/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import com.github.reugn.memento.utils
import com.github.reugn.memento.utils.{Serde, StateStoreProxy}
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import javax.inject.{Inject, Singleton}

import javax.inject.{Inject, Singleton}
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

Expand All @@ -23,17 +23,25 @@ final class HttpServer @Inject()(config: Config,
private implicit val materializer: Materializer = Materializer.createMaterializer(actorSystem)
private implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher

private val http_server_host = config.getString("http_server_host")
private val http_server_port = config.getInt("http_server_port")
private val http_server_host = config.getString("http.server.host")
private val http_server_port = config.getInt("http.server.port")
private val http_store_route_enable = config.getBoolean("http.routes.store")

def init(): Unit = {
Http().bindAndHandle(route, http_server_host, http_server_port) onComplete {
Http().newServerAt(http_server_host, http_server_port).bindFlow(getRoutes) onComplete {
case Success(serverBinding) => logger.info(s"Server bound to ${serverBinding.localAddress}")
case Failure(_) => logger.error(s"Failed to bind to $http_server_host:$http_server_port")
}
}

private val route: Route =
private def getRoutes: Route = {
if (http_store_route_enable)
stdRoutes ~ storeRoute
else
stdRoutes
}

private val stdRoutes: Route =
path("") {
get {
logger.trace("/ route call")
Expand All @@ -51,24 +59,26 @@ final class HttpServer @Inject()(config: Config,
logger.trace("/ready route call")
complete("Ok")
}
} ~
path("store") {
post {
entity(as[HttpMessage]) { msg =>
logger.trace("/store route call")
val ctx = RecordContext(
msg.ts,
0,
"http",
0,
Array(
RawRecordHeader(utils.ORIGIN_TOPIC_HEADER, msg.origin.getBytes),
RawRecordHeader(utils.TIMESTAMP_HEADER, String.valueOf(msg.ts).getBytes)
)
}

private val storeRoute: Route =
path("store") {
post {
entity(as[HttpMessage]) { msg =>
logger.trace("/store route call")
val ctx = RecordContext(
msg.ts,
0,
"http",
0,
Array(
RawRecordHeader(utils.ORIGIN_TOPIC_HEADER, msg.origin.getBytes),
RawRecordHeader(utils.TIMESTAMP_HEADER, String.valueOf(msg.ts).getBytes)
)
proxy.put(msg.ts, Serde.serialize(Message(msg.key.orNull, msg.value, ctx)))
complete("Message successfully settled")
}
)
proxy.put(msg.ts, Serde.serialize(Message(msg.key.orNull, msg.value, ctx)))
complete("Message successfully stored")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class EmitTopicNameExtractor extends TopicNameExtractor[String, String] {
/**
* Extracts the topic name to send to from the 'origin' header. The topic name must already exist,
* since the Kafka Streams library will not try to automatically create the topic with the extracted name.
**/
*/
override def extract(key: String, value: String, recordContext: RecordContext): String = {
new String(recordContext.headers().toArray.filter(_.key() == utils.ORIGIN_TOPIC_HEADER).apply(0).value())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package com.github.reugn.memento.kafka

import java.util.Properties

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.processor.ProcessorSupplier
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
import org.apache.kafka.streams.processor.api.ProcessorSupplier
import org.apache.kafka.streams.state.{KeyValueStore, StoreBuilder}
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}

import java.util.Properties
import javax.inject.{Inject, Provider}
import scala.concurrent.ExecutionContext

class MementoConsumer @Inject()(
config: Config,
processor: SuspendingProcessor,
processorProvider: Provider[SuspendingProcessor],
storeBuilder: StoreBuilder[KeyValueStore[java.lang.Long, Array[Byte]]],
implicit val ec: ExecutionContext
) extends LazyLogging {
Expand All @@ -39,7 +39,7 @@ class MementoConsumer @Inject()(
}

protected val inputTopic: String = config.getString(s"kafka.consumer.topic")
protected val collectSupplier: ProcessorSupplier[String, String] = () => processor
protected val collectSupplier: ProcessorSupplier[String, String, String, String] = () => processorProvider.get()

protected def createTopology(): KafkaStreams = {
logger.info(s"Creating a topology for the topic $inputTopic")
Expand All @@ -54,10 +54,11 @@ class MementoConsumer @Inject()(

def runFlow(): Unit = {
val stream = createTopology()
stream.setUncaughtExceptionHandler((_: Thread, e: Throwable) => {
stream.setUncaughtExceptionHandler((e: Throwable) => {
logger.error(s"Error in ${getClass.getSimpleName}", e)
stream.close()
runFlow()
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
})
logger.info(s"Starting ${getClass.getSimpleName} flow")
stream.cleanUp()
Expand Down
Loading

0 comments on commit 908f713

Please sign in to comment.