-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy paths12_transformation.py
82 lines (63 loc) · 2.58 KB
/
s12_transformation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import datetime
import logging
from pyflink.common import WatermarkStrategy
from pyflink.datastream import DataStream
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from models import SkyoneData
RUNTIME_ENV = os.getenv("RUNTIME_ENV", "local")
BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:29092")
def define_workflow(source_stream: DataStream):
flight_stream = source_stream.map(SkyoneData.to_flight_data).filter(
lambda data: datetime.datetime.fromisoformat(data.arrival_time) > datetime.datetime.now()
)
return flight_stream
if __name__ == "__main__":
"""
## local execution
python src/s12_transformation.py
## cluster execution
docker exec jobmanager /opt/flink/bin/flink run \
--python /tmp/src/s12_transformation.py \
--pyFiles file:///tmp/src/models.py,file:///tmp/src/utils.py \
-d
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logging.info(f"RUNTIME_ENV - {RUNTIME_ENV}, BOOTSTRAP_SERVERS - {BOOTSTRAP_SERVERS}")
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
if RUNTIME_ENV != "docker":
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
jar_files = ["flink-sql-connector-kafka-1.17.1.jar"]
jar_paths = tuple(
[f"file:///{os.path.join(CURRENT_DIR, 'jars', name)}" for name in jar_files]
)
logging.info(f"adding local jars - {', '.join(jar_files)}")
env.add_jars(*jar_paths)
skyone_source = (
KafkaSource.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_topics("skyone")
.set_group_id("group.skyone")
.set_starting_offsets(KafkaOffsetsInitializer.latest())
.set_value_only_deserializer(
JsonRowDeserializationSchema.builder()
.type_info(SkyoneData.get_value_type_info())
.build()
)
.build()
)
skyone_stream = env.from_source(
skyone_source, WatermarkStrategy.no_watermarks(), "skyone_source"
)
# skyone_stream.print()
define_workflow(skyone_stream).print()
# skyone_stream.map(SkyoneData.to_flight_data).filter(SkyoneData.is_after).print()
env.execute("flight_importer")