From 77da4dbd11afe74390b79e36270b302b493ae394 Mon Sep 17 00:00:00 2001 From: Anaisdg Date: Tue, 2 Jul 2024 13:37:13 -0500 Subject: [PATCH] working and not working example --- README.md | 26 +- __pycache__/cstr_reactor.cpython-311.pyc | Bin 0 -> 1983 bytes cstr_reactor.py | 40 +++ data_doublet_steps.txt | 301 ++++++++++++++++++ dockerized/docker-compose.yml | 93 ++++++ dockerized/faust_app/Dockerfile | 10 + dockerized/faust_app/faust_app.py | 47 +++ dockerized/faust_app/requirements.txt | 2 + dockerized/python-script/Dockerfile | 18 ++ dockerized/python-script/cstr_controller.py | 217 +++++++++++++ .../python-script/kafka_consumer_test.py | 35 ++ .../python-script/kafka_producer_test.py | 21 ++ dockerized/python-script/requirements.txt | 5 + dockerized/telegraf/telegraf.conf | 29 ++ dockerized2/docker-compose.yml | 93 ++++++ dockerized2/faust_app/Dockerfile | 10 + dockerized2/faust_app/faust_app.py | 200 ++++++++++++ dockerized2/faust_app/requirements.txt | 2 + dockerized2/python-script/Dockerfile | 18 ++ dockerized2/python-script/cstr_controller.py | 130 ++++++++ .../python-script/kafka_consumer_test.py | 35 ++ .../python-script/kafka_producer_test.py | 21 ++ dockerized2/python-script/requirements.txt | 5 + dockerized2/telegraf/telegraf.conf | 29 ++ pid_control.py | 113 +++++++ 25 files changed, 1499 insertions(+), 1 deletion(-) create mode 100644 __pycache__/cstr_reactor.cpython-311.pyc create mode 100644 cstr_reactor.py create mode 100644 data_doublet_steps.txt create mode 100644 dockerized/docker-compose.yml create mode 100644 dockerized/faust_app/Dockerfile create mode 100644 dockerized/faust_app/faust_app.py create mode 100644 dockerized/faust_app/requirements.txt create mode 100644 dockerized/python-script/Dockerfile create mode 100644 dockerized/python-script/cstr_controller.py create mode 100644 dockerized/python-script/kafka_consumer_test.py create mode 100644 dockerized/python-script/kafka_producer_test.py create mode 100644 dockerized/python-script/requirements.txt create mode 100644 dockerized/telegraf/telegraf.conf create mode 100644 dockerized2/docker-compose.yml create mode 100644 dockerized2/faust_app/Dockerfile create mode 100644 dockerized2/faust_app/faust_app.py create mode 100644 dockerized2/faust_app/requirements.txt create mode 100644 dockerized2/python-script/Dockerfile create mode 100644 dockerized2/python-script/cstr_controller.py create mode 100644 dockerized2/python-script/kafka_consumer_test.py create mode 100644 dockerized2/python-script/kafka_producer_test.py create mode 100644 dockerized2/python-script/requirements.txt create mode 100644 dockerized2/telegraf/telegraf.conf create mode 100644 pid_control.py diff --git a/README.md b/README.md index 5e5476f..4455784 100644 --- a/README.md +++ b/README.md @@ -1 +1,25 @@ -# CSTR_InfluxDB \ No newline at end of file +# CSTR_InfluxDB +This project aims to use Kafka, Faust, and InfluxDB to monitor a digital twin of a CSTR (continous stirred tank reactor) and a PID controller for a cooling jacket for the CSTR. + + + +`dockerized` contains a working example and integration with InfluxDB but the CSTR and controller logic is wrong. + +`dockerized2` contains the correct CSTR logic but there are issues with + +Both repos contain this structure: +├── docker-compose.yml +├── faust_app +│   ├── Dockerfile +│   ├── faust_app.py +│   └── requirements.txt +├── python-script +│   ├── Dockerfile +│   ├── cstr_controller.py +│   ├── kafka_consumer_test.py +│   ├── kafka_producer_test.py +│   └── requirements.txt +├── telegraf +│   └── telegraf.conf + +You can also run the CSTR logic sperately with `python3 pid_control.py` diff --git a/__pycache__/cstr_reactor.cpython-311.pyc b/__pycache__/cstr_reactor.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bae64a20d60f7600f6d3bafce27e80df7f91730d GIT binary patch literal 1983 zcmb_dQES^)6uy!yS+XTNcDjx-meko&=4~`hyL4eK+w3G=M@L7Kv>V+rz3=W|`=%6j^zOj1@jJpUU%v8`-2HtFiUEr>A48q)PbTx)! zpn1&9pgg}#lP=~NBGj-}u9|cFJ_~(O8)iYfZ!?t90>&d0kHqF-%pMpEn6u;X6LHLz zFmETYU?*|hma%9nV5cm_1fI6s?AVr^M3%6YMCbuvKXmXlnBU^5w$oPJ5^01L0|aUi z`O?&~`#L@jSlYg%C0Pj?6=kI(3#PVG`#e&FCnC*u^VT57&h)IaT~~yq-u2m8tIx_} z<}BsSfW<~u7V4U~sMEAQq5J*_S zTIDmoR9Rk`ERM_>rhZM=)H$=JA75N0F$(B|z(h}r@f#=yg;K&DZ5(onp(wez zM$9QZ5tB|Kj8r1!OoUPjoO0Tk3YCm=E|k(Rlw_)scdj=|Udj8+Uuu34q;ii_xj@Qo z6a(p)Q+y&T9#fz4X8e=Q&jWd2BOA!C0ZCGJm+E6)(a$u`1<6-JCF{-k#paDb8Qd5O zl-Hkej6D2ANqf2a6>qNorC$t`fsNdTdVe@j-gw5da^9KzTSRiIG42)WXBuam$uQOL zOuCtGKMGUXeIuE6F6_b%QBRvl07Px_Cw%U^sORu>4zo$ivoBVx_aOC-Ebd^NMaQ52 zA2<)RYQ!m+@%DNU`mfbtmNr%I8Yj_LwjGCNjCU^pI)Ydr?PBw+Et<4G`(gr9TP)5J zE%aiJ#aL{K`;mJPgUbMZdtytLgkxRigEg}FHcQliy>SQjZ3#Vbovbb^t$oDNiR|%3 z4c_<$<`YURy*~HaT&Ld_?$DU3&l}tSfaC>T1dPOLRm~t=MXM5CSF7`yP7g}dF8PG6 zP&-9cng+8gCv=&P`l!lKh$$1+Gzhb5(86_FRUe>U#V9WgSqT>-?tjs>^Q4+4Xn)tUtZ6 z^f2>q^iltJw;t+0pV~b3?xs8*$m5UYaY#M%l6So|<>y-GUDh3?Om%TzJ!@P5wZkzC6Qb;9 c`w7v|X795j9A^4= 1: + dpv[i] = (pv[i] - pv[i - 1]) / delta_t + ie[i] = ie[i - 1] + e[i] * delta_t + P[i] = Kc * e[i] + I[i] = Kc / tauI * ie[i] + D[i] = -Kc * tauD * dpv[i] + op[i] = op[0] + P[i] + I[i] + D[i] + if op[i] > op_hi: + op[i] = op_hi + ie[i] = ie[i] - e[i] * delta_t + if op[i] < op_lo: + op[i] = op_lo + ie[i] = ie[i] - e[i] * delta_t + ts = [t[i], t[i + 1]] + logger.info("Waiting to receive message from Kafka...") + try: + u[i + 1] = receive_tc_from_kafka() + if u[i + 1] is None: + logger.error("No valid Tc value received, breaking loop.") + break + except Exception as e: + logger.error(f"Error receiving Tc: {e}") + break + y = odeint(cstr, x0, ts, args=(u[i + 1], Tf, Caf)) + Ca[i + 1] = y[-1][0] + T[i + 1] = y[-1][1] + if not np.isnan(Ca[i + 1]) and not np.isnan(T[i + 1]): + x0[0] = Ca[i + 1] + x0[1] = T[i + 1] + pv[i + 1] = T[i + 1] + send_data_to_kafka(Ca[i + 1], T[i + 1]) + else: + logger.error(f"Encountered NaN values in iteration {i}: Ca={Ca[i + 1]}, T={T[i + 1]}") + break + +op[len(t) - 1] = op[len(t) - 2] +ie[len(t) - 1] = ie[len(t) - 2] +P[len(t) - 1] = P[len(t) - 2] +I[len(t) - 1] = I[len(t) - 2] +D[len(t) - 1] = D[len(t) - 2] + +data = np.vstack((t, u, T)).T +np.savetxt('data_doublet_steps.txt', data, delimiter=',') + +producer.close() +consumer.close() + +# Add a simple test run to check message reception +if __name__ == "__main__": + logger.info("Starting test run...") + try: + tc = receive_tc_from_kafka() + logger.info(f"Received Tc: {tc}") + except Exception as e: + logger.error(f"Error receiving message from Kafka: {e}") diff --git a/dockerized/python-script/kafka_consumer_test.py b/dockerized/python-script/kafka_consumer_test.py new file mode 100644 index 0000000..b51a1fb --- /dev/null +++ b/dockerized/python-script/kafka_consumer_test.py @@ -0,0 +1,35 @@ +# kafka_consumer_test.py +import logging +import json +from kafka import KafkaConsumer +import os + +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Kafka setup +consumer = KafkaConsumer( + 'cstr_control', + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + auto_offset_reset='earliest', + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None, + consumer_timeout_ms=10000, # Increased timeout +) + +# Function to receive a test message from Kafka +def receive_test_message_from_kafka(): + logger.info("Waiting to receive test message from Kafka...") + for message in consumer: + logger.debug(f"Raw message from Kafka: {message}") + if message.value is not None: + logger.info(f"Received test message from Kafka: {message.value}") + return message.value + else: + logger.warning("Received an empty message or invalid JSON") + +# Receive the test message +test_message = receive_test_message_from_kafka() +logger.info(f"Test message received: {test_message}") + diff --git a/dockerized/python-script/kafka_producer_test.py b/dockerized/python-script/kafka_producer_test.py new file mode 100644 index 0000000..9137d48 --- /dev/null +++ b/dockerized/python-script/kafka_producer_test.py @@ -0,0 +1,21 @@ +# kafka_producer_test.py +import logging +import json +from kafka import KafkaProducer +import os + +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Kafka setup +producer = KafkaProducer( + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + value_serializer=lambda v: json.dumps(v).encode('utf-8') +) + +# Send a test message to Kafka +test_data = {"Tc": 300.0} +producer.send('cstr_control', value=test_data) +producer.flush() +logger.info(f"Sent test data to Kafka: {test_data}") diff --git a/dockerized/python-script/requirements.txt b/dockerized/python-script/requirements.txt new file mode 100644 index 0000000..487c357 --- /dev/null +++ b/dockerized/python-script/requirements.txt @@ -0,0 +1,5 @@ +numpy +matplotlib +scipy +kafka-python +faust diff --git a/dockerized/telegraf/telegraf.conf b/dockerized/telegraf/telegraf.conf new file mode 100644 index 0000000..f037013 --- /dev/null +++ b/dockerized/telegraf/telegraf.conf @@ -0,0 +1,29 @@ +[agent] + interval = "10s" + round_interval = true + debug = true + +[[inputs.kafka_consumer]] + brokers = ["kafka:9092"] + topics = ["cstr_data"] + data_format = "json_v2" + + [[inputs.kafka_consumer.json_v2]] + measurement_name = "cstr_data" + [[inputs.kafka_consumer.json_v2.field]] + path = "Ca" + type = "float" + [[inputs.kafka_consumer.json_v2.field]] + path = "Reactor_Temperature" + type = "float" + +[[outputs.file]] + files = ["stdout"] + + +[[outputs.influxdb_v2]] + urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"] + token = "d-17JiPCEX66F0t7F3NNnfdZjPAF4tp6DmFyC8VFmizmBOi874Ao_bdwW_7wklicSqMqZCcNXzaOffU5bXFj9Q==" + organization = "89711f17730122e0" + bucket = "CSTR" + diff --git a/dockerized2/docker-compose.yml b/dockerized2/docker-compose.yml new file mode 100644 index 0000000..848741c --- /dev/null +++ b/dockerized2/docker-compose.yml @@ -0,0 +1,93 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + networks: + - kafka-net + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + networks: + - kafka-net + healthcheck: + test: ["CMD", "nc", "-z", "localhost", "9092"] + interval: 10s + timeout: 10s + retries: 5 + + kafka-connect: + image: confluentinc/cp-kafka-connect:latest + depends_on: + - kafka + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka:9092 + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: "quickstart-avro" + CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config" + CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets" + CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status" + CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + networks: + - kafka-net + + telegraf: + image: telegraf:latest + volumes: + - ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro + depends_on: + kafka: + condition: service_healthy + networks: + - kafka-net + + python-script: + build: + context: ./python-script + dockerfile: Dockerfile + depends_on: + kafka: + condition: service_healthy + environment: + KAFKA_BROKER_ADDRESS: kafka:9092 + networks: + - kafka-net + healthcheck: + test: ["CMD-SHELL", "test -f /healthcheck"] + interval: 10s + timeout: 5s + retries: 5 + + faust-app: + build: + context: ./faust_app + dockerfile: Dockerfile + depends_on: + python-script: + condition: service_healthy + environment: + KAFKA_BROKER_ADDRESS: kafka:9092 + networks: + - kafka-net + +networks: + kafka-net: + driver: bridge diff --git a/dockerized2/faust_app/Dockerfile b/dockerized2/faust_app/Dockerfile new file mode 100644 index 0000000..05a2846 --- /dev/null +++ b/dockerized2/faust_app/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["faust", "-A", "faust_app", "worker", "-l", "info"] diff --git a/dockerized2/faust_app/faust_app.py b/dockerized2/faust_app/faust_app.py new file mode 100644 index 0000000..cbd6fbd --- /dev/null +++ b/dockerized2/faust_app/faust_app.py @@ -0,0 +1,200 @@ +import faust +import logging +import json +import numpy as np +from scipy.integrate import odeint + +class CSTRData(faust.Record, serializer='json'): + Ca: float + Reactor_Temperature: float + +app = faust.App('cstr-controller', broker='kafka://kafka:9092') +data_topic = app.topic('cstr_data', value_type=CSTRData) +tc_topic = app.topic('cstr_control') + +# PID parameters +Kc = 4.61730615181 * 2.0 +tauI = 0.913444964569 / 4.0 +tauD = 0.0 + +# Control loop function +def pid_control(T_ss, u_ss, t, Tf, Caf, x0): + u = np.ones(len(t)) * u_ss + op = np.ones(len(t)) * u_ss + pv = np.zeros(len(t)) + e = np.zeros(len(t)) + ie = np.zeros(len(t)) + dpv = np.zeros(len(t)) + P = np.zeros(len(t)) + I = np.zeros(len(t)) + D = np.zeros(len(t)) + + # Initialize Ca and T arrays + Ca = np.ones(len(t)) * x0[0] + T = np.ones(len(t)) * x0[1] + + # Upper and Lower limits on OP + op_hi = 350.0 + op_lo = 250.0 + + pv[0] = T_ss + + # Define the setpoint ramp or steps + sp = np.ones(len(t)) * T_ss + for i in range(15): + sp[i * 20:(i + 1) * 20] = 300 + i * 7.0 + sp[300] = sp[299] + + for i in range(len(t) - 1): + delta_t = t[i + 1] - t[i] + e[i] = sp[i] - pv[i] + if i >= 1: + dpv[i] = (pv[i] - pv[i - 1]) / delta_t + ie[i] = ie[i - 1] + e[i] * delta_t + P[i] = Kc * e[i] + I[i] = Kc / tauI * ie[i] + D[i] = -Kc * tauD * dpv[i] + op[i] = op[0] + P[i] + I[i] + D[i] + if op[i] > op_hi: + op[i] = op_hi + ie[i] = ie[i] - e[i] * delta_t + if op[i] < op_lo: + op[i] = op_lo + ie[i] = ie[i] - e[i] * delta_t + u[i + 1] = op[i] + + # Use the current Ca and T for the initial condition of the next step + x0 = [Ca[i], T[i]] + ts = [t[i], t[i + 1]] + y = odeint(cstr, x0, ts, args=(u[i + 1], Tf, Caf)) # Use u[i + 1] + Ca[i + 1] = y[-1][0] + T[i + 1] = y[-1][1] + pv[i + 1] = T[i + 1] + + # Send Tc to Kafka + send_tc_to_kafka(op[i]) + + # Debugging information + if i % 50 == 0: + print(f"Time: {t[i]:.2f}, Setpoint: {sp[i]:.2f}, PV: {pv[i]:.2f}, OP: {op[i]:.2f}, Ca: {Ca[i]:.2f}, T: {T[i]:.2f}") + + op[len(t) - 1] = op[len(t) - 2] + ie[len(t) - 1] = ie[len(t) - 2] + P[len(t) - 1] = P[len(t) - 2] + I[len(t) - 1] = I[len(t) - 2] + D[len(t) - 1] = D[len(t) - 2] + + return u, T + +# Define CSTR model +def cstr(x, t, u, Tf, Caf): + Ca = x[0] + T = x[1] + + q = 100 + V = 100 + rho = 1000 + Cp = 0.239 + mdelH = 5e4 + EoverR = 8750 + k0 = 7.2e10 + UA = 5e4 + rA = k0 * np.exp(-EoverR / T) * Ca + + dCadt = q / V (Caf - Ca) - rA + dTdt = q / V (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T) + + xdot = np.zeros(2) + xdot[0] = dCadt + xdot[1] = dTdt + return xdot + +# Maintain integral of error (ie) globally +ie = 0 + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Function to send Tc value to Kafka +def send_tc_to_kafka(tc_value): + producer = KafkaProducer(bootstrap_servers='kafka:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) + producer.send('cstr_control', value={"Tc": tc_value}) + producer.flush() + +@app.agent(data_topic) +async def process(stream): + async for event in stream: + ca = event.Ca + temp_reactor = event.Reactor_Temperature + + # Define the PID control parameters and initial conditions + T_ss = 300 # Steady-state temperature + u_ss = 300 # Steady-state control input + t = np.linspace(0, 20, 300) # Time vector + Tf = 300 # Feed temperature + Caf = 1 # Feed concentration + x0 = [ca, temp_reactor] # Initial conditions + + # Compute Tc value based on your control logic + _, T = pid_control(T_ss, u_ss, t, Tf, Caf, x0) + tc = T[-1] # Use the last temperature value as the control signal + + # Produce the result to the control topic + await tc_topic.send(value={"Tc": tc}) + +if __name__ == '__main__': + app.main() + + +# Function to send Tc to Kafka +def send_tc_to_kafka(tc): + if not np.isnan(tc): + data = { + "Tc": tc + } + producer.send('cstr_control', value=data) + producer.flush() + logger.info(f"Sent Tc to Kafka: {data}") + else: + logger.error(f"Attempted to send NaN value to Kafka: Tc={tc}") + +# Function to receive data from Kafka +def receive_data_from_kafka(): + logger.info("Waiting to receive message from Kafka...") + for attempt in range(5): # Retry up to 5 times + for message in consumer: + logger.debug(f"Raw message from Kafka: {message}") + if message.value is not None: + logger.info(f"Received message from Kafka: {message.value}") + try: + value = message.value + if isinstance(value, str): + value = json.loads(value) + if not np.isnan(value["Ca"]) and not np.isnan(value["Reactor_Temperature"]): + return value["Ca"], value["Reactor_Temperature"] + else: + logger.error(f"Received NaN value for Ca or Reactor_Temperature: {value}") + except (KeyError, json.JSONDecodeError) as e: + logger.error(f"Error processing message: {e}") + else: + logger.warning("Received an empty message or invalid JSON") + logger.info(f"Attempt {attempt + 1} failed, retrying...") + consumer.poll(timeout_ms=5000) + logger.info("Exiting receive_data_from_kafka after 5 attempts") + return None, None + +# Main execution +if __name__ == "__main__": + t = np.linspace(0, 10, 301) + u_ss = 300.0 + T_ss = 324.475443431599 + + while True: + Ca, T = receive_data_from_kafka() + if Ca is not None and T is not None: + x0 = [Ca, T] + u, T = pid_control(T_ss, u_ss, t, 350, 1, x0) + else: + logger.error("No valid Ca and T values received, terminating loop") + break diff --git a/dockerized2/faust_app/requirements.txt b/dockerized2/faust_app/requirements.txt new file mode 100644 index 0000000..73f1221 --- /dev/null +++ b/dockerized2/faust_app/requirements.txt @@ -0,0 +1,2 @@ +faust +kafka-python diff --git a/dockerized2/python-script/Dockerfile b/dockerized2/python-script/Dockerfile new file mode 100644 index 0000000..607672a --- /dev/null +++ b/dockerized2/python-script/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements.txt requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +# Debugging: List the contents of the /app directory +RUN ls -al /app + +# CMD ["python", "kafka_producer_test.py"] # Change this to test the producer +# CMD ["python", "kafka_consumer_test.py"] # Change this to test the consumer +CMD ["python", "cstr_controller.py"] + +# Keep the container running for debugging +# CMD ["tail", "-f", "/dev/null"] diff --git a/dockerized2/python-script/cstr_controller.py b/dockerized2/python-script/cstr_controller.py new file mode 100644 index 0000000..b54eafd --- /dev/null +++ b/dockerized2/python-script/cstr_controller.py @@ -0,0 +1,130 @@ +import numpy as np +from scipy.integrate import odeint +from kafka import KafkaProducer, KafkaConsumer +import json +import logging +import os + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Kafka setup +producer = KafkaProducer( + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + value_serializer=lambda v: json.dumps(v).encode('utf-8') +) + +consumer = KafkaConsumer( + 'cstr_control', + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + auto_offset_reset='earliest', + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None, + consumer_timeout_ms=10000, # Increased timeout +) + +# Define CSTR model +def cstr(x, t, u, Tf, Caf): + Ca = x[0] + T = x[1] + + q = 100 + V = 100 + rho = 1000 + Cp = 0.239 + mdelH = 5e4 + EoverR = 8750 + k0 = 7.2e10 + UA = 5e4 + rA = k0 * np.exp(-EoverR / T) * Ca + + dCadt = q / V * (Caf - Ca) - rA + dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T) + + xdot = np.zeros(2) + xdot[0] = dCadt + xdot[1] = dTdt + return xdot + +# Function to send data to Kafka +def send_data_to_kafka(ca, temp_reactor): + if not np.isnan(ca) and not np.isnan(temp_reactor): + data = { + "Ca": float(ca), + "Reactor_Temperature": float(temp_reactor) + } + producer.send('cstr_data', value=data) + producer.flush() + logger.info(f"Sent data to Kafka: {data}") + else: + logger.error(f"Attempted to send NaN values to Kafka: Ca={ca}, Reactor_Temperature={temp_reactor}") + +# Function to receive Tc from Kafka +def receive_tc_from_kafka(): + logger.info("Waiting to receive message from Kafka...") + for attempt in range(5): # Retry up to 5 times + for message in consumer: + logger.debug(f"Raw message from Kafka: {message}") + if message.value is not None: + logger.info(f"Received message from Kafka: {message.value}") + try: + value = message.value + if isinstance(value, str): + value = json.loads(value) + if not np.isnan(value["Tc"]): + return value["Tc"] + else: + logger.error(f"Received NaN value for Tc: {value}") + except (KeyError, json.JSONDecodeError) as e: + logger.error(f"Error processing message: {e}") + else: + logger.warning("Received an empty message or invalid JSON") + logger.info(f"Attempt {attempt + 1} failed, retrying...") + consumer.poll(timeout_ms=5000) + logger.info("Exiting receive_tc_from_kafka after 5 attempts") + return None + +# Simulation function +def simulate_cstr(u, Tf, Caf, x0, t): + Ca = np.ones(len(t)) * x0[0] + T = np.ones(len(t)) * x0[1] + for i in range(len(t) - 1): + ts = [t[i], t[i + 1]] + y = odeint(cstr, x0, ts, args=(u[i+1], Tf, Caf)) + Ca[i + 1] = y[-1][0] + T[i + 1] = y[-1][1] + x0[0] = Ca[i + 1] + x0[1] = T[i + 1] + + logger.debug(f"Iteration {i}: Ca={Ca[i + 1]}, T={T[i + 1]}") + + # Send data to Kafka + send_data_to_kafka(Ca[i + 1], T[i + 1]) + + # Receive Tc from Kafka + tc = receive_tc_from_kafka() + if tc is not None: + u[i + 1] = tc + else: + logger.error("No valid Tc value received, using previous value") + u[i + 1] = u[i] + + return Ca, T + +# Main loop to continuously run the simulation +if __name__ == "__main__": + t = np.linspace(0, 10, 301) + x0 = [0.87725294608097, 324.475443431599] + u_ss = 300.0 + + while True: + # Initial Tc value + initial_tc = 300.0 + u = np.ones(301) * initial_tc + + # Run simulation + Ca, T = simulate_cstr(u, 350, 1, x0, t) + + # Update x0 for the next iteration + x0 = [Ca[-1], T[-1]] diff --git a/dockerized2/python-script/kafka_consumer_test.py b/dockerized2/python-script/kafka_consumer_test.py new file mode 100644 index 0000000..b51a1fb --- /dev/null +++ b/dockerized2/python-script/kafka_consumer_test.py @@ -0,0 +1,35 @@ +# kafka_consumer_test.py +import logging +import json +from kafka import KafkaConsumer +import os + +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Kafka setup +consumer = KafkaConsumer( + 'cstr_control', + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + auto_offset_reset='earliest', + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode('utf-8')) if m else None, + consumer_timeout_ms=10000, # Increased timeout +) + +# Function to receive a test message from Kafka +def receive_test_message_from_kafka(): + logger.info("Waiting to receive test message from Kafka...") + for message in consumer: + logger.debug(f"Raw message from Kafka: {message}") + if message.value is not None: + logger.info(f"Received test message from Kafka: {message.value}") + return message.value + else: + logger.warning("Received an empty message or invalid JSON") + +# Receive the test message +test_message = receive_test_message_from_kafka() +logger.info(f"Test message received: {test_message}") + diff --git a/dockerized2/python-script/kafka_producer_test.py b/dockerized2/python-script/kafka_producer_test.py new file mode 100644 index 0000000..9137d48 --- /dev/null +++ b/dockerized2/python-script/kafka_producer_test.py @@ -0,0 +1,21 @@ +# kafka_producer_test.py +import logging +import json +from kafka import KafkaProducer +import os + +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Kafka setup +producer = KafkaProducer( + bootstrap_servers=os.getenv('KAFKA_BROKER_ADDRESS'), + value_serializer=lambda v: json.dumps(v).encode('utf-8') +) + +# Send a test message to Kafka +test_data = {"Tc": 300.0} +producer.send('cstr_control', value=test_data) +producer.flush() +logger.info(f"Sent test data to Kafka: {test_data}") diff --git a/dockerized2/python-script/requirements.txt b/dockerized2/python-script/requirements.txt new file mode 100644 index 0000000..487c357 --- /dev/null +++ b/dockerized2/python-script/requirements.txt @@ -0,0 +1,5 @@ +numpy +matplotlib +scipy +kafka-python +faust diff --git a/dockerized2/telegraf/telegraf.conf b/dockerized2/telegraf/telegraf.conf new file mode 100644 index 0000000..f037013 --- /dev/null +++ b/dockerized2/telegraf/telegraf.conf @@ -0,0 +1,29 @@ +[agent] + interval = "10s" + round_interval = true + debug = true + +[[inputs.kafka_consumer]] + brokers = ["kafka:9092"] + topics = ["cstr_data"] + data_format = "json_v2" + + [[inputs.kafka_consumer.json_v2]] + measurement_name = "cstr_data" + [[inputs.kafka_consumer.json_v2.field]] + path = "Ca" + type = "float" + [[inputs.kafka_consumer.json_v2.field]] + path = "Reactor_Temperature" + type = "float" + +[[outputs.file]] + files = ["stdout"] + + +[[outputs.influxdb_v2]] + urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"] + token = "d-17JiPCEX66F0t7F3NNnfdZjPAF4tp6DmFyC8VFmizmBOi874Ao_bdwW_7wklicSqMqZCcNXzaOffU5bXFj9Q==" + organization = "89711f17730122e0" + bucket = "CSTR" + diff --git a/pid_control.py b/pid_control.py new file mode 100644 index 0000000..fa04f61 --- /dev/null +++ b/pid_control.py @@ -0,0 +1,113 @@ +import numpy as np +import matplotlib.pyplot as plt +from cstr_reactor import simulate_cstr, cstr # Ensure cstr is imported +from scipy.integrate import odeint + + +# PID parameters and initial conditions +Kc = 4.61730615181 * 2.0 +tauI = 0.913444964569 / 4.0 +tauD = 0.0 + +# Control loop +def pid_control(T_ss, u_ss, t, Tf, Caf, x0): + u = np.ones(len(t)) * u_ss + op = np.ones(len(t)) * u_ss + pv = np.zeros(len(t)) + e = np.zeros(len(t)) + ie = np.zeros(len(t)) + dpv = np.zeros(len(t)) + P = np.zeros(len(t)) + I = np.zeros(len(t)) + D = np.zeros(len(t)) + + # Initialize Ca and T arrays + Ca = np.ones(len(t)) * x0[0] + T = np.ones(len(t)) * x0[1] + + # Upper and Lower limits on OP + op_hi = 350.0 + op_lo = 250.0 + + pv[0] = T_ss + + # Define the setpoint ramp or steps + sp = np.ones(len(t)) * T_ss + for i in range(15): + sp[i * 20:(i + 1) * 20] = 300 + i * 7.0 + sp[300] = sp[299] + + # Create plot + plt.figure(figsize=(10, 7)) + plt.ion() + plt.show() + + for i in range(len(t) - 1): + delta_t = t[i + 1] - t[i] + e[i] = sp[i] - pv[i] + if i >= 1: + dpv[i] = (pv[i] - pv[i - 1]) / delta_t + ie[i] = ie[i - 1] + e[i] * delta_t + P[i] = Kc * e[i] + I[i] = Kc / tauI * ie[i] + D[i] = -Kc * tauD * dpv[i] + op[i] = op[0] + P[i] + I[i] + D[i] + if op[i] > op_hi: + op[i] = op_hi + ie[i] = ie[i] - e[i] * delta_t + if op[i] < op_lo: + op[i] = op_lo + ie[i] = ie[i] - e[i] * delta_t + u[i + 1] = op[i] + + # Use the current Ca and T for the initial condition of the next step + x0 = [Ca[i], T[i]] + ts = [t[i], t[i+1]] + y = odeint(cstr, x0, ts, args=(u[i + 1], Tf, Caf)) # Use u[i + 1] + Ca[i + 1] = y[-1][0] + T[i + 1] = y[-1][1] + pv[i + 1] = T[i + 1] + + # Debugging information + if i % 50 == 0: + print(f"Time: {t[i]:.2f}, Setpoint: {sp[i]:.2f}, PV: {pv[i]:.2f}, OP: {op[i]:.2f}, Ca: {Ca[i]:.2f}, T: {T[i]:.2f}") + + # Plotting + plt.clf() + plt.subplot(2, 1, 1) + plt.plot(t[:i + 1], sp[:i + 1], 'r--', label='Setpoint') + plt.plot(t[:i + 1], pv[:i + 1], 'b-', label='Process Variable') + plt.ylabel('Temperature CSTR') + plt.legend(loc='best') + + plt.subplot(2, 1, 2) + plt.plot(t[:i + 1], op[:i + 1], 'k-', label='Control Output') + plt.ylabel('Temperature Reactor') + plt.xlabel('Time (sec)') + plt.legend(loc='best') + + plt.pause(0.01) + + op[len(t) - 1] = op[len(t) - 2] + ie[len(t) - 1] = ie[len(t) - 2] + P[len(t) - 1] = P[len(t) - 2] + I[len(t) - 1] = I[len(t) - 2] + D[len(t) - 1] = D[len(t) - 2] + + # Save data to file + data = np.vstack((t, u, T, Ca, op)).T + np.savetxt('data_doublet_steps.txt', data, delimiter=',') + + plt.ioff() + plt.show() + + return u, T + +# Main execution +if __name__ == "__main__": + t = np.linspace(0, 10, 301) + x0 = [0.87725294608097, 324.475443431599] + u_ss = 300.0 + T_ss = 324.475443431599 + + u, T = pid_control(T_ss, u_ss, t, 350, 1, x0)