Skip to content

Commit

Permalink
working and not working example
Browse files Browse the repository at this point in the history
  • Loading branch information
Anaisdg committed Jul 2, 2024
1 parent e75b559 commit 77da4db
Show file tree
Hide file tree
Showing 25 changed files with 1,499 additions and 1 deletion.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
# CSTR_InfluxDB
# CSTR_InfluxDB
This project aims to use Kafka, Faust, and InfluxDB to monitor a digital twin of a CSTR (continous stirred tank reactor) and a PID controller for a cooling jacket for the CSTR.



`dockerized` contains a working example and integration with InfluxDB but the CSTR and controller logic is wrong.

`dockerized2` contains the correct CSTR logic but there are issues with

Both repos contain this structure:
├── docker-compose.yml
├── faust_app
│   ├── Dockerfile
│   ├── faust_app.py
│   └── requirements.txt
├── python-script
│   ├── Dockerfile
│   ├── cstr_controller.py
│   ├── kafka_consumer_test.py
│   ├── kafka_producer_test.py
│   └── requirements.txt
├── telegraf
│   └── telegraf.conf

You can also run the CSTR logic sperately with `python3 pid_control.py`
Binary file added __pycache__/cstr_reactor.cpython-311.pyc
Binary file not shown.
40 changes: 40 additions & 0 deletions cstr_reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy as np
from scipy.integrate import odeint

# Define CSTR model
def cstr(x, t, u, Tf, Caf):
Ca = x[0]
T = x[1]

q = 100
V = 100
rho = 1000
Cp = 0.239
mdelH = 5e4
EoverR = 8750
k0 = 7.2e10
UA = 5e4
rA = k0 * np.exp(-EoverR / T) * Ca

dCadt = q / V * (Caf - Ca) - rA
dTdt = q / V * (Tf - T) \
+ mdelH / (rho * Cp) * rA \
+ UA / V / rho / Cp * (u - T)

xdot = np.zeros(2)
xdot[0] = dCadt
xdot[1] = dTdt
return xdot

# Simulation function
def simulate_cstr(u, Tf, Caf, x0, t):
Ca = np.ones(len(t)) * x0[0]
T = np.ones(len(t)) * x0[1]
for i in range(len(t) - 1):
ts = [t[i], t[i + 1]]
y = odeint(cstr, x0, ts, args=(u[i+1], Tf, Caf))
Ca[i + 1] = y[-1][0]
T[i + 1] = y[-1][1]
x0[0] = Ca[i + 1]
x0[1] = T[i + 1]
return Ca, T
301 changes: 301 additions & 0 deletions data_doublet_steps.txt

Large diffs are not rendered by default.

93 changes: 93 additions & 0 deletions dockerized/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka-net

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-net
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "9092"]
interval: 10s
timeout: 10s
retries: 5

kafka-connect:
image: confluentinc/cp-kafka-connect:latest
depends_on:
- kafka
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
networks:
- kafka-net

telegraf:
image: telegraf:latest
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
depends_on:
kafka:
condition: service_healthy
networks:
- kafka-net

python-script:
build:
context: ./python-script
dockerfile: Dockerfile
depends_on:
kafka:
condition: service_healthy
environment:
KAFKA_BROKER_ADDRESS: kafka:9092
networks:
- kafka-net
healthcheck:
test: ["CMD-SHELL", "test -f /healthcheck"]
interval: 10s
timeout: 5s
retries: 5

faust-app:
build:
context: ./faust_app
dockerfile: Dockerfile
depends_on:
python-script:
condition: service_healthy
environment:
KAFKA_BROKER_ADDRESS: kafka:9092
networks:
- kafka-net

networks:
kafka-net:
driver: bridge
10 changes: 10 additions & 0 deletions dockerized/faust_app/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["faust", "-A", "faust_app", "worker", "-l", "info"]
47 changes: 47 additions & 0 deletions dockerized/faust_app/faust_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import faust
import json
import logging

class CSTRData(faust.Record, serializer='json'):
Ca: float
Reactor_Temperature: float

app = faust.App('cstr-controller', broker='kafka://kafka:9092')
data_topic = app.topic('cstr_data', value_type=CSTRData)
tc_topic = app.topic('cstr_control')

# Maintain integral of error (ie) globally
ie = 0

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.agent(data_topic)
async def process(stream):
global ie
async for event in stream:
ca = event.Ca
temp_reactor = event.Reactor_Temperature

# Compute Tc value based on your control logic
tc = compute_tc(ca, temp_reactor)

# Produce the result to the control topic
await tc_topic.send(value=json.dumps({"Tc": tc}))

def compute_tc(ca, temp_reactor):
global ie
Kc = 4.61730615181 * 2.0
tauI = 0.913444964569 / 4.0
e = temp_reactor - 324.475443431599
ie += e # Update integral of error
tc = 300.0 + Kc * e + Kc / tauI * ie

# Log the PID components and Tc value
logger.info(f"e: {e}, ie: {ie}, P: {Kc * e}, I: {Kc / tauI * ie}, tc: {tc}")

return tc

if __name__ == '__main__':
app.main()
2 changes: 2 additions & 0 deletions dockerized/faust_app/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
faust
kafka-python
18 changes: 18 additions & 0 deletions dockerized/python-script/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Debugging: List the contents of the /app directory
RUN ls -al /app

# CMD ["python", "kafka_producer_test.py"] # Change this to test the producer
# CMD ["python", "kafka_consumer_test.py"] # Change this to test the consumer
CMD ["python", "cstr_controller.py"]

# Keep the container running for debugging
# CMD ["tail", "-f", "/dev/null"]
Loading

0 comments on commit 77da4db

Please sign in to comment.