-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py.deprecated
executable file
·122 lines (96 loc) · 3.47 KB
/
server.py.deprecated
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
#
# See also
# https://github.com/grpc/grpc/blob/v1.13.x/examples/python/route_guide/route_guide_server.py
from concurrent import futures
from proto_to_dict import proto_to_dict
from telemetry_pb2 import Telemetry
from threading import Lock
from walk_fields import walk_fields
import grpc
import json
import logging
import mdt_grpc_dialout_pb2
import mdt_grpc_dialout_pb2_grpc
import time
BUSY_WAIT = 24 * 60 * 60
SERVER_PORT = "0.0.0.0:57850"
logger = logging.getLogger(__name__)
# logger_grpc = logging.getLogger("grpc._server")
# logger_p2d = logging.getLogger("proto_to_dict")
logging_lock = Lock()
class TestServicer(mdt_grpc_dialout_pb2_grpc.gRPCMdtDialoutServicer):
def __init__(self):
super(TestServicer, self).__init__()
self.msgs_recvd = 0
def MdtDialout(self, request_iterator, context):
for req in request_iterator:
logger.debug('starting processing a req')
self.msgs_recvd += 1
t = Telemetry()
t.ParseFromString(req.data)
#
# Convert telemetry message as a whole into a Python dict
#
# d = proto_to_dict(t)
#
# Walk data_gpbkv fields per YANG Suite code
#
# lines = []
# for gpb in t.data_gpbkv:
# lines += self.walk_fields(gpb.fields)
with logging_lock:
logger.debug('--> CALLBACK START')
logger.debug('Messages Received = %d', self.msgs_recvd)
logger.debug('Node Id = "{}"'.format(t.node_id_str))
logger.debug('Msg Timestamp = "{}"'.format(t.msg_timestamp))
logger.debug('Subscription Id = "{}"'.format(t.subscription_id_str))
logger.debug('Encoding Path = "{}"'.format(t.encoding_path))
# yang suite lines
# for l in lines:
# logger.debug(l)
# json dict
# if d:
# for l in json.dumps(d, indent=2).splitlines():
# logger.debug(l)
print(t)
logger.debug('<-- CALLBACK END')
#
# Need to keep the stream going!!
#
retval = mdt_grpc_dialout_pb2.MdtDialoutArgs()
retval.ReqId = req.ReqId
yield retval
#
# Really simple gRPC server dor dialout telemetry. No TLS, plain TCP.
#
def serve():
logger.debug('Create gRPC server')
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
logger.debug('Add MDT Dialout Service to gRPC server')
mdt_grpc_dialout_pb2_grpc.add_gRPCMdtDialoutServicer_to_server(
TestServicer(), server)
logger.debug('Adding insecure port %s', SERVER_PORT)
server.add_insecure_port(SERVER_PORT)
logger.debug('Starting server')
server.start()
logger.debug('Entering infinite loop')
try:
while True:
time.sleep(BUSY_WAIT)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
#
# setup logging to have a wauy to see what's happening
#
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# logger_grpc.addHandler(handler)
# logger_grpc.setLevel(logging.DEBUG)
# logger_p2d.addHandler(handler)
# logger_p2d.setLevel(logging.DEBUG)
serve()