-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmavlog2csv.py
211 lines (173 loc) · 7.14 KB
/
mavlog2csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- coding: utf-8 -*-
"""
Used https://github.com/ArduPilot/pymavlink/blob/master/tools/mavlogdump.py for reference.
"""
import argparse
import collections
import contextlib
import csv
import datetime
import logging
import operator
import re
import sys
import textwrap
from typing import IO, Any, ContextManager, Dict, Iterator, List, Optional, Set, Tuple, Union
from pymavlink import mavutil
from pymavlink.CSVReader import CSVReader
from pymavlink.DFReader import DFReader
from pymavlink.dialects.v10.ardupilotmega import MAVLink_message
from pymavlink.mavutil import mavserial
logger = logging.getLogger(__name__)
def is_message_bad(message: Optional[MAVLink_message]) -> bool:
"""Check if message is bad and work working with"""
return bool(message is None or (message and message.get_type() == "BAD_DATA"))
@contextlib.contextmanager
def mavlink_connect(device: str) -> Union[mavserial, DFReader, CSVReader]:
"""
Create mavlink connection to a device. Device can be anything from comport, to .bin or .log ardupilot telemetry log file.
Will close the connection once context is exited
"""
conn = mavutil.mavlink_connection(device)
logger.debug(f"Connecting to {device}")
yield conn
logger.debug(f"Closing connection to {device}")
conn.close()
def parse_cli_column(cli_col: str) -> Tuple[str, str]:
"""
Parse CLI provided column into message type and column name parts.
"""
match = re.match("(?P<message_type>\w+)\.(?P<column>\w+)", cli_col)
if not match:
raise ValueError(
f"""\
Specified column is not correct format:
Column "{cli_col}" must be <Message type>.<Column>.
For example: GPS.Lat
"""
)
return match.group(1), match.group(2)
def open_output(output: Optional[str] = None) -> ContextManager[IO]:
"""
Either opens a file `output` for writing or returns STDOUT stream"""
if output:
return open(output, "w", newline="")
else:
return contextlib.nullcontext(sys.stdout)
def iter_mavlink_messages(device: str, types: Set[str], skip_n_arms: int = 0) -> Iterator[MAVLink_message]:
"""
Return iterator over mavlink messages of `types` from `device`.
If skip_n_arms is not zero, will return messages only after skip_n_arms ARM messages has been seen.
"""
types = types.copy()
types.add("EV") # EV are events like ARM (id=10) or DISARM (id=11)
n_message = 0
n_armed = 0
with mavlink_connect(device) as mav_conn:
while True:
message: Optional[MAVLink_message] = mav_conn.recv_match(blocking=False, type=types)
n_message += 1
if message is None:
logger.debug(f"Stopping processing at {n_message} message")
break
if is_message_bad(message):
continue
if message.get_type() == "EV" and message.Id == 10: # arm
logger.debug(f"Found ARM event: {message}")
n_armed += 1
if n_armed < skip_n_arms or message.get_type() == "EV":
continue
yield message
def message_to_row(message: MAVLink_message, columns: List[str]) -> Dict[str, Any]:
"""Convert mavlink message to output row"""
row: Dict[str, Any] = {}
row["TimeUS"] = message.TimeUS
row["TimeS"] = round(message.TimeUS / 1_000_000, 2)
dt = datetime.datetime.fromtimestamp(message._timestamp)
row["Date"] = dt.date().isoformat()
row["Time"] = dt.time().isoformat()
for col in columns:
col_value = getattr(message, col, None) or "" # Place for improvement
row[f"{message.get_type()}.{col}"] = col_value
return row
def mavlog2csv(device: str, columns: List[str], output: Optional[str] = None, skip_n_arms: int = 0):
"""
Convert ardupilot telemetry log into csv with selected columns.
Specify the input file (.bin telemetry log), some desired telemetry columns (like GPS.Lat), and observe the magic.
You can find message types and their column reference here: https://ardupilot.org/copter/docs/logmessages.html.
"""
parsed_columns: List[Tuple[str, str]] = list(map(parse_cli_column, columns))
# Collects all required message types like {'GPS', 'ATT', 'ASPD'}
# Used to filter mavlink messages
message_type_filter: Set[str] = set(map(operator.itemgetter(0), parsed_columns))
# Collects a mapping message type -> columns
# Used to quickly extract required columns from message
message_type_columns: Dict[str, List[str]] = collections.defaultdict(list)
for message_type, column in parsed_columns:
message_type_columns[message_type].append(column)
header = [
"TimeUS", # Original time in US after boot
"TimeS", # Seconds after boot
"Date", # Calculated date of the event
"Time", # Calculated time of the event,
*columns, # User specified columns
]
with open_output(output) as output_file:
csv_writer = csv.DictWriter(
output_file,
fieldnames=header,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_ALL,
)
csv_writer.writeheader()
for message in iter_mavlink_messages(device=device, types=message_type_filter, skip_n_arms=skip_n_arms):
message_type = message.get_type()
if message_type in message_type_columns:
row = message_to_row(message, message_type_columns[message_type])
csv_writer.writerow(row)
def main():
parser = argparse.ArgumentParser(
description=textwrap.dedent(mavlog2csv.__doc__), # type: ignore
epilog=textwrap.dedent(
"""\
Example usage:
# Output GPS Longitude and latitude and airspeed sensor readings
python mavlog2csv.py -c GPS.Lng -c GPS.Lat -c ARSP.Airspeed -o output.csv "2023-09-17 13-34-16.bin"
# Redirecting stdout into a file on windows. Not recommended, use -o instead.
python mavlog2csv.py -c GPS.Lng "2023-09-17 13-34-16.bin" 1> output.csv
"""
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("input", help="Input file name.")
parser.add_argument(
"-o",
"--output",
help="Output file name. If not set, script will output into stdout.",
)
parser.add_argument(
"-c",
"--col",
action="append",
required=True,
help="Specify telemetry columns to output. You can specify this multiple times. "
"Format: <Message type>.<Column>. For example: GPS.Lng",
)
parser.add_argument(
"--skip-n-arms",
type=int,
default=0,
help="If there are multiple arm events in the log, skip this number of arms before writing any rows at all. "
"If you setup to log only after autopilot was armed, then first arm event wont be stored in the log.",
)
args = parser.parse_args()
mavlog2csv(
device=args.input,
columns=args.col,
skip_n_arms=args.skip_n_arms,
output=args.output,
)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
main()