-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paths14_sink.py
107 lines (90 loc) · 3.41 KB
/
s14_sink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import datetime
import logging
from pyflink.common import WatermarkStrategy
from pyflink.datastream import DataStream
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import (
KafkaSource,
KafkaOffsetsInitializer,
KafkaSink,
KafkaRecordSerializationSchema,
DeliveryGuarantee,
)
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
from models import SkyoneData, FlightData
RUNTIME_ENV = os.getenv("RUNTIME_ENV", "local")
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")
def define_workflow(source_stream: DataStream):
flight_stream = source_stream.map(SkyoneData.to_flight_data).filter(
lambda data: datetime.datetime.fromisoformat(data.arrival_time) > datetime.datetime.now()
)
return flight_stream
if __name__ == "__main__":
"""
## local execution
python src/s14_sink.py
## cluster execution
docker exec jobmanager /opt/flink/bin/flink run \
--python /tmp/src/s14_sink.py \
--pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
-d
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logging.info(f"RUNTIME_ENV - {RUNTIME_ENV}, BOOTSTRAP_SERVERS - {BOOTSTRAP_SERVERS}")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
if RUNTIME_ENV != "docker":
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
jar_files = ["flink-sql-connector-kafka-1.17.1.jar"]
jar_paths = tuple(
[f"file:///{os.path.join(CURRENT_DIR, 'jars', name)}" for name in jar_files]
)
logging.info(f"adding local jars - {', '.join(jar_files)}")
env.add_jars(*jar_paths)
skyone_source = (
KafkaSource.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_topics("skyone")
.set_group_id("group.skyone")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(
JsonRowDeserializationSchema.builder()
.type_info(SkyoneData.get_value_type_info())
.build()
)
.build()
)
skyone_stream = env.from_source(
skyone_source, WatermarkStrategy.no_watermarks(), "skyone_source"
)
flight_sink = (
KafkaSink.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("flightdata")
.set_key_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(FlightData.get_key_type_info())
.build()
)
.set_value_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(FlightData.get_value_type_info())
.build()
)
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
define_workflow(skyone_stream).map(
lambda d: d.to_row(), output_type=FlightData.get_value_type_info()
).sink_to(flight_sink).name("flightdata_sink")
env.execute("flight_importer")