-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.rs
80 lines (68 loc) · 2.49 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#![forbid(unsafe_code)]
//! An example app that subscribes to analytics scene description data using [`mdb`].
use std::{ffi::CStr, process::abort, thread::sleep, time::Duration};
use log::{error, info};
use mdb::{Connection, Error, Subscriber, SubscriberConfig};
const TOPIC: &CStr = c"com.axis.analytics_scene_description.v0.beta";
const SOURCE: &CStr = c"1";
fn main() {
acap_logging::init_logger();
let connection = Connection::try_new(Some(|e: &Error| {
error!("Not connected because {e:?}");
abort();
}))
.unwrap();
let config = SubscriberConfig::try_new(
TOPIC,
SOURCE,
|message| {
let payload = String::from_utf8_lossy(message.payload());
let libc::timespec{ tv_sec, tv_nsec } = message.timestamp();
info!("message received from topic: {TOPIC:?} on source: {SOURCE:?}: Monotonic time - {tv_sec}.{tv_nsec:0>9}. Data - {payload}");
},
)
.unwrap();
let _subscriber = Subscriber::try_new(&connection, config, |e| match e {
None => info!("Subscribed"),
Some(e) => {
error!("Not subscribed because {e:?}");
abort();
}
})
.unwrap();
loop {
sleep(Duration::from_secs(1));
}
}
#[cfg(test)]
mod tests {
use log::{debug, warn};
use super::*;
#[test]
fn receives_analytics_scene_description_promptly() {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut droppable_tx = Some(tx);
let connection =
Connection::try_new(Some(|e: &Error| println!("Not connected because {e:?}"))).unwrap();
let config = SubscriberConfig::try_new(TOPIC, SOURCE, move |message| {
let payload = String::from_utf8(message.payload().to_vec());
let Some(tx) = &droppable_tx else {
debug!("Dropping message because sender was previously dropped");
return;
};
if tx.try_send(payload).is_err() {
warn!("Dropping sender because receiver has been deallocated");
droppable_tx = None;
}
})
.unwrap();
let _subscriber = Subscriber::try_new(&connection, config, |e| match e {
None => println!("Subscribed"),
Some(e) => println!("Not subscribed because {e:?}"),
})
.unwrap();
let payload = rx.recv_timeout(Duration::from_secs(10)).unwrap().unwrap();
assert!(!payload.is_empty());
println!("{payload}");
}
}