docker build -t=building-pyflink-apps:1.17.1 .
Implement the applications of the Building Apache Flink Applications in Java course by Confluent using Pyflink.
Unlike the course, the source data is sent by a Kafka producer application because the DataGen DataStream connector is not available in Pyflink. The other apps are implemented gradually while performing course exercises. See below for details.
Below describes course contents. ✅ and ☑️ indicate exercises and course materials respectively. The lesson 3 covers how to set up Kafka and Flink clusters using Docker Compose. The Kafka producer app is created as the lesson 5 exercise. The final versions of the flight importer job and usage statistics calculator can be found as exercises of the lesson 16 and 20 respectively.
- Apache Flink with Java - An Introduction
- Datastream Programming
- ✅ How to Start Flink and Get Setup (Exercise)
-
Built Kafka and Flink clusters using Docker
-
Bitnami images are used for the Kafka cluster - see this page for details.
-
A custom Docker image (building-pyflink-apps:1.17.1) is created to install Python and the Pyflink package as well as to save dependent Jar files
- See the Dockerfile, and it can be built by
docker build -t=building-pyflink-apps:1.17.1 .
- See the Dockerfile, and it can be built by
-
See the docker-compose.yml and the clusters can be started by
docker-compose up -d
-
- ☑️ The Flink Job Lifecycle
- A minimal example of executing a Pyflink app is added.
- See course content(s) below
- ✅ Running a Flink Job (Exercise)
- Pyflink doesn't have the DataGen DataStream connector. Used a Kafka producer instead to create topics and send messages.
- 4 topics are created (skyone, sunset, flightdata and userstatistics) and messages are sent to the first two topics.
- See course content(s) below
- s05_data_gen.py
- Topics are created by a flag argument so add it if it is the first time running it. i.e.
python src/s05_data_gen.py --create
. Basically it deletes the topics if exits and creates them.
- Topics are created by a flag argument so add it if it is the first time running it. i.e.
- s05_data_gen.py
- Pyflink doesn't have the DataGen DataStream connector. Used a Kafka producer instead to create topics and send messages.
- Anatomy of a Stream
- Flink Data Sources
- ✅ Creating a Flink Data Source (Exercise)
- It reads from the skyone topic and prints the values. The values are deserialized as string in this exercise.
- This and all the other Pyflink applications can be executed locally or run in the Flink cluster. See the script for details.
- See course content(s) below
- Serializers & Deserializers
- ✅ Deserializing Messages in Flink (Exercise)
- The skyone message values are deserialized as Json string and they are returned as the named Row type. As the Flink type is not convenient for processing, it is converted into a Python object, specifically Data Classes.
- See course content(s) below
- ☑️ Transforming Data in Flink
- Map, FlatMap, Filter and Reduce transformations are illustrated using built-in operators and process functions.
- See course content(s) below
- ✅ Flink Data Transformations (Exercise)
- The source data is transformed into the flight data. Later data from skyone and sunset will be converted into this schema for merging them.
- The transformation is performed in a function called define_workflow for being tested. This function will be updated gradually.
- See course content(s) below
- s12_transformation.py
- test_s12_transformation.py
- Expected to run testing scripts individually eg)
pytest src/test_s12_transformation.py -svv
- Expected to run testing scripts individually eg)
- Flink Data Sinks
- ✅ Creating a Flink Data Sink (Exercise)
- The converted data from skyone will be pushed into a Kafka topic (flightdata).
- Note that, as the Python Data Classes cannot be serialized, records are converted into the named Row type before being sent.
- See course content(s) below
- ☑️ Creating Branching Data Streams in Flink
- Various branching methods are illustrated, which covers Union, CoProcessFunction, CoMapFunction, CoFlatMapFunction, and Side Outputs.
- See course content(s) below
- ✅ Merging Flink Data Streams (Exercise)
- Records from the skyone and sunset topics are merged and sent into the flightdata topic after being converted into the flight data.
- See course content(s) below
- Windowing and Watermarks in Flink
- ✅ Aggregating Flink Data using Windowing (Exercise)
- Usage statistics (total flight duration and number of flights) are calculated by email address, and they are sent into the userstatistics topic.
- Note the transformation is stateless in a sense that aggregation is entirely within a one-minute tumbling window.
- See course content(s) below
- Working with Keyed State in Flink
- ✅ Managing State in Flink (Exercise)
- The transformation gets stateful so that usage statistics are continuously updated by accessing the state values.
- The reduce function includes a window function that allows you to access the global state. The window function takes the responsibility to keep updating the global state and to return updated values.
- See course content(s) below
- Closing Remarks
#### build docker image for Pyflink
docker build -t=building-pyflink-apps:1.17.1 .
#### create kafka and flink clusters and kafka-ui
docker-compose up -d
#### start kafka producer in one terminal
python -m venv venv
source venv/bin/activate
# upgrade pip (optional)
pip install pip --upgrade
# install required packages
pip install -r requirements-dev.txt
## start with --create flag to create topics before sending messages
python src/s05_data_gen.py --create
#### submit pyflink apps in another terminal
## flight importer
docker exec jobmanager /opt/flink/bin/flink run \
--python /tmp/src/s16_merge.py \
--pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
-d
## usage calculator
docker exec jobmanager /opt/flink/bin/flink run \
--python /tmp/src/s20_manage_state.py \
--pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
-d