Skip to content

Commit

Permalink
reference
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohan Singh authored and Rohan Singh committed Apr 12, 2021
0 parents commit 13cdcaf
Show file tree
Hide file tree
Showing 94 changed files with 2,034 additions and 0 deletions.
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## README for the Demo App

#### To run on MAC
`export DOCKER_KAFKA_HOST=$(ipconfig getifaddr en0)`

`docker-compose -f docker-compose-project.yml up` # for kafka

`docker-compose up` # for api

#### Install java and run the apps inside main/java
`mvn exec:java -Dexec.mainClass="myapps.Claim"`

#### Application Endpoints
1. Swagger: `localhost/docs`
2. Kafka Admin UI: `localhost:8080/ui`

#### Generate Kafka Streams Topology
Cut and paste the output of System.out.println(topology) to the following website

`https://zz85.github.io/kafka-streams-viz/`
10 changes: 10 additions & 0 deletions app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.8

WORKDIR /app

COPY requirements.txt /app/requirements.txt

RUN pip install -r requirements.txt

COPY . /app/

1 change: 1 addition & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__author__ = 'Rohan Singh'
Binary file added app/__pycache__/__init__.cpython-37.pyc
Binary file not shown.
Binary file added app/__pycache__/main.cpython-37.pyc
Binary file not shown.
27 changes: 27 additions & 0 deletions app/claims.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
__author__ = 'Rohan Singh'
from random import choices, randint
from string import ascii_letters, digits
from faker import Faker
account_chars: str = digits + ascii_letters
fake = Faker()


def get_random_id(k=12) -> str:
"""Return a random id."""
return "".join(choices(account_chars, k=k))


def _random_amount() -> float:
"""Return a random amount for the claim"""
return randint(5000, 100000)


def create_random_claim() -> dict:
"""Create a fake, random transaction"""
return {
"account_id": get_random_id(),
"name": fake.name(),
"address": fake.address(),
"claim_id": get_random_id(8),
"insured_amount": _random_amount()
}
25 changes: 25 additions & 0 deletions app/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from time import sleep
from kafka import KafkaProducer
import json


class Generator:
def __init__(self, broker_url, tps=10):
self.broker_url = broker_url
self.tps = tps
self.sleep_time = 1 / tps
self.producer = KafkaProducer(bootstrap_servers=broker_url,
value_serializer=lambda value: json.dumps(value).encode())

def send_message(self, topic, message):
self.producer.send(topic, value=message)


# if __name__ == "__main__":
# producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER_URL,
# value_serializer=lambda value: json.dumps(value).encode())
# while True:
# claim: dict = create_random_claim()
# producer.send(CLAIMS_TOPIC, value=claim)
# print(claim)
# sleep(SLEEP_TIME)
26 changes: 26 additions & 0 deletions app/iot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
__author__ = 'Rohan Singh'
from random import choices, randint
from string import ascii_letters, digits
from faker import Faker
import random

account_chars: str = digits + ascii_letters
fake = Faker()


def get_random_id(k=12) -> str:
"""Return a random id."""
return "".join(choices(account_chars, k=k))


def _random_amount() -> float:
"""Return a random amount for the claim"""
return randint(5000, 100000)


def random_telematics(customers=10, records=10) -> list:
"""Create a fake, random transaction"""
names = [fake.unique.name() for _ in range(customers)]

return [{random.choice(names): random.gauss(60, 20)}
for _ in range(records * customers)]
62 changes: 62 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
__author__ = 'Rohan Singh'
from fastapi import FastAPI
import os
from dotenv import load_dotenv
from generator import Generator
from pydantic import BaseModel
from typing import Optional
from claims import get_random_id, create_random_claim
from iot import random_telematics
from time import sleep

load_dotenv()


class Claim(BaseModel):
account_id: str
name: str
address: str
claim_id: Optional[str]
insured_amount: float


app = FastAPI()

KAFKA_BROKER_URL = os.getenv('KAFKA_BROKER_URL')
CLAIMS_TOPIC = os.getenv('CLAIMS_TOPIC')
TELEMATICS_TOPIC = os.getenv("TELEMATICS_TOPIC")
LEGIT_CLAIMS_TOPIC = os.getenv('LEGIT_CLAIMS_TOPIC')
FRAUD_CLAIMS_TOPIC = os.getenv('FRAUD_CLAIMS_TOPIC')
TRANSACTIONS_PER_SECOND = float(os.getenv('TRANSACTIONS_PER_SECOND'))
SLEEP_TIME = 1 / TRANSACTIONS_PER_SECOND


@app.get("/")
async def welcome():
return {"Hello, Hartford"}


@app.post("/create_claim")
async def create_claim(claim: Claim):
generator = Generator(KAFKA_BROKER_URL, TRANSACTIONS_PER_SECOND)
if claim.claim_id is None or len(claim.claim_id) == 0:
claim.claim_id = get_random_id(k=10)
print(claim)
generator.send_message(CLAIMS_TOPIC, claim.dict())


@app.get("/bulk_create_claims/{quantity}")
async def bulk_create_claims(quantity: int):
generator = Generator(KAFKA_BROKER_URL)

for _ in range(quantity):
claim: dict = create_random_claim()
generator.send_message(CLAIMS_TOPIC, claim)


@app.get("/get_telematics_data")
async def get_telematics_data(customers: Optional[int] = 10):
data = random_telematics(customers=customers)
generator = Generator(KAFKA_BROKER_URL)
for record in data:
generator.send_message(TELEMATICS_TOPIC, record)
60 changes: 60 additions & 0 deletions app/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
click==7.1.2 \
--hash=sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc \
--hash=sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a
faker==7.0.1 \
--hash=sha256:08c4cfbfd498c0e90aff6741771c01803d894013df858db6a573182c6a47951f \
--hash=sha256:20c6e4253b73ef2a783d38e085e7c8d8916295fff31c7403116d2af8f908f7ca
fastapi==0.63.0 \
--hash=sha256:98d8ea9591d8512fdadf255d2a8fa56515cdd8624dca4af369da73727409508e \
--hash=sha256:63c4592f5ef3edf30afa9a44fa7c6b7ccb20e0d3f68cd9eba07b44d552058dcb
h11==0.12.0 \
--hash=sha256:36a3cb8c0a032f56e2da7084577878a035d3b61d104230d4bd49c0c6b555a9c6 \
--hash=sha256:47222cb6067e4a307d535814917cd98fd0a57b6788ce715755fa2b6c28b56042
kafka-python==2.0.2 \
--hash=sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3 \
--hash=sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e
pydantic==1.8.1 \
--hash=sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850 \
--hash=sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3 \
--hash=sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e \
--hash=sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771 \
--hash=sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f \
--hash=sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d \
--hash=sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f \
--hash=sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8 \
--hash=sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4 \
--hash=sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a \
--hash=sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683 \
--hash=sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2 \
--hash=sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125 \
--hash=sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0 \
--hash=sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99 \
--hash=sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9 \
--hash=sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c \
--hash=sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e \
--hash=sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f \
--hash=sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58 \
--hash=sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520 \
--hash=sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3
python-dateutil==2.8.1 \
--hash=sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c \
--hash=sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a
python-dotenv==0.17.0 \
--hash=sha256:471b782da0af10da1a80341e8438fca5fadeba2881c54360d5fd8d03d03a4f4a \
--hash=sha256:49782a97c9d641e8a09ae1d9af0856cc587c8d2474919342d5104d85be9890b2
six==1.15.0 \
--hash=sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced \
--hash=sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259
starlette==0.13.6 \
--hash=sha256:bd2ffe5e37fb75d014728511f8e68ebf2c80b0fa3d04ca1479f4dc752ae31ac9 \
--hash=sha256:ebe8ee08d9be96a3c9f31b2cb2a24dbdf845247b745664bd8a3f9bd0c977fdbc
text-unidecode==1.3 \
--hash=sha256:bad6603bb14d279193107714b288be206cac565dfa49aa5b105294dd5c4aab93 \
--hash=sha256:1311f10e8b895935241623731c2ba64f4c455287888b18189350b67134a822e8
typing-extensions==3.7.4.3 \
--hash=sha256:dafc7639cde7f1b6e1acc0f457842a83e722ccca8eef5270af2d74792619a89f \
--hash=sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918 \
--hash=sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c
uvicorn==0.13.4 \
--hash=sha256:7587f7b08bd1efd2b9bad809a3d333e972f1d11af8a5e52a9371ee3a5de71524 \
--hash=sha256:3292251b3c7978e8e4a7868f4baf7f7f7bb7e40c759ecc125c37e99cdea34202
1 change: 1 addition & 0 deletions app/tuba/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__author__ = 'Rohan Singh'
37 changes: 37 additions & 0 deletions app/tuba/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
import os

from dotenv import load_dotenv
from kafka import KafkaConsumer, KafkaProducer

load_dotenv()

KAFKA_BROKER_URL = os.getenv('KAFKA_BROKER_URL')
CLAIMS_TOPIC = os.getenv('CLAIMS_TOPIC')
LEGIT_CLAIMS_TOPIC = os.getenv('LEGIT_CLAIMS_TOPIC')
FRAUD_CLAIMS_TOPIC = os.getenv('FRAUD_CLAIMS_TOPIC')


def is_suspicious(claim_: dict) -> bool:
"""Return if a transactions in suspicious"""
return True \
if "doe" in claim_["name"] or claim_["insured_amount"] > 60000 \
else False


if __name__ == "__main__":
consumer = KafkaConsumer(
CLAIMS_TOPIC,
bootstrap_servers=KAFKA_BROKER_URL,
value_deserializer=lambda value: json.loads(value),
)

producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER_URL,
value_serializer=lambda value: json.dumps(value).encode(),
)
for message in consumer:
claim: dict = message.value
topic = FRAUD_CLAIMS_TOPIC if is_suspicious(claim) else LEGIT_CLAIMS_TOPIC
producer.send(topic, value=claim)
print(topic, claim)
35 changes: 35 additions & 0 deletions docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: '2'
services:
kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${DOCKER_KAFKA_HOST}:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
depends_on:
- zookeeper
- kafka
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_KAFKA_HOST}
KAFKA_CREATE_TOPICS: "testme:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- "zookeeper"
networks:
default:
external:
name: kafka-network
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: "3"
services:
tuba:
build: ./app
ports:
- "80:80"
environment:
KAFKA_BROKER_URL: ${DOCKER_KAFKA_HOST}:9092
CLAIMS_TOPIC: queueing.claims
LEGIT_CLAIMS_TOPIC: streaming.claims.legit
FRAUD_CLAIMS_TOPIC: streaming.claims.fraud
TELEMATICS_TOPIC: streaming.driving.speed
TRANSACTIONS_PER_SECOND: 10
networks:
default:
external:
name: kafka-network
Loading

0 comments on commit 13cdcaf

Please sign in to comment.