-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmessage_handler.py
70 lines (59 loc) · 1.14 KB
/
message_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
class MsgStack:
"""
Stack of messages
"""
def __init__(self):
self.messages = []
def push(self, msg):
"""
Push method
"""
self.messages.append(msg)
def top(self):
"""
Top method
"""
return self.messages[-1]
def pop(self):
"""
Pop method
"""
return self.messages.pop()
def empty(self):
"""
Is empty method
"""
return len(self.messages) == 0
class MsgHandler:
"""
Message handler
"""
def __init__(self):
self.scanMsgStack = MsgStack()
self.odomMsgStack = MsgStack()
def accept_message(self, topic, msg):
"""
Accepting message method
"""
if topic == '/scan':
self.scanMsgStack.push(msg)
elif topic == '/odom':
self.odomMsgStack.push(msg)
def pair_check(self):
"""
Checking if /odom and /scan messages are paired
"""
if self.scanMsgStack.empty() or self.odomMsgStack.empty():
return False
else:
return True
def process_paired_messages(self):
"""
Processing paired messages
"""
msgOdom = self.odomMsgStack.pop()
msgScan = self.scanMsgStack.pop()
return (msgOdom, msgScan)