Skip to content

Commit

Permalink
[matchbook-util] sequence messages within a topic
Browse files Browse the repository at this point in the history
  • Loading branch information
wbjohnston committed Mar 26, 2021
1 parent ae99a94 commit 931f7ec
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 35 deletions.
1 change: 1 addition & 0 deletions packages/matchbook-util/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/matchbook-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ bytes = "1"
tokio-util = {version = "0.6.4", features = ["codec", "net"]}
serde_json = "1.0"
fixer-upper = { path = "../fixer-upper"}
itertools = "0.10.0"
130 changes: 95 additions & 35 deletions packages/matchbook-util/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,80 @@
use async_stream::stream;
use futures::{SinkExt, Stream, StreamExt};
use matchbook_types::*;
use std::collections::HashMap;

pub fn message_sequencer_stream(
mut stream: impl futures::Stream<Item = Message> + Unpin,
mut rerequest_sink: impl futures::Sink<usize> + Unpin,
mut rerequest_sink: impl futures::Sink<(String, usize)> + Unpin,
buf_size: usize,
// need to add param to re-request messages
) -> impl Stream<Item = Message> {
stream! {
let mut next_expected_id = 0;
let mut cursor = 0;
let mut buf: Vec<Option<Message>> = vec![None; buf_size];
let mut len = buf.len();
let mut topics = HashMap::new();

struct TopicBuffer {
buf: Vec<Option<Message>>,
next_expected_id: usize,
cursor: usize
}

// TODO(will): need to buffer by topic id
while let Some(message) = stream.next().await {
let topic = message.id.topic_id.clone();
let id = message.id.topic_sequence_n as usize;

if id == next_expected_id {
let entry = topics.entry(topic.clone()).or_insert(TopicBuffer {
buf: vec![None; buf_size],
next_expected_id: 0,
cursor: 0,
});

if id == entry.next_expected_id {
yield message;

next_expected_id += 1;
entry.next_expected_id += 1;

// emit any cached messages
loop {
if let Some(msg) = buf[cursor].take() {
cursor = (cursor + 1) % len;
next_expected_id += 1;
if let Some(msg) = entry.buf[entry.cursor].take() {
entry.cursor = (entry.cursor + 1) % entry.buf.len();
entry.next_expected_id += 1;
yield msg;
} else {
break
}
}
}
// received element out of order
else if id > next_expected_id {
let offset = id - next_expected_id - 1;
else if id > entry.next_expected_id {
let offset = id - entry.next_expected_id - 1;

// if we are going to have an entry that may overwrite an element in our buffer, we need to resize
let should_resize_buffer = offset > (len - 1);
let should_resize_buffer = offset > (entry.buf.len() - 1);
if should_resize_buffer {
let mut new_buf = vec![None; buf.len() * 2];
for (dst, x) in buf.iter().cloned().cycle().skip(cursor).take(buf.len()).enumerate() {
let mut new_buf = vec![None; entry.buf.len() * 2];
// FIXME(will): this causes unnecessary copies
for (dst, x) in entry.buf.iter().cloned().cycle().skip(entry.cursor).take(entry.buf.len()).enumerate() {
new_buf[dst] = x;
}
buf = new_buf;
len = buf.len();
cursor = 0;
entry.buf = new_buf;
entry.cursor = 0;
}
let write_idx = (offset + cursor) % buf.len();
buf[write_idx] = Some(message.clone());
let write_idx = (offset + entry.cursor) % entry.buf.len();
entry.buf[write_idx] = Some(message.clone());

// re-request everything that we're missing
let mut offset = 1;
let mut scan_cursor = cursor;
let mut id_offset = 1;
let mut scan_cursor = entry.cursor;

// TODO(will): error handling
let _ = rerequest_sink.send(next_expected_id).await;
let _ = rerequest_sink.send((topic.clone(), entry.next_expected_id)).await;
while scan_cursor != write_idx {
if buf[scan_cursor].is_none() {
let id_to_rerequest = next_expected_id + offset;
let _ = rerequest_sink.send(id_to_rerequest).await;
if entry.buf[scan_cursor].is_none() {
let id_to_rerequest = entry.next_expected_id + id_offset;
let _ = rerequest_sink.send((topic.clone(), id_to_rerequest)).await;
}

offset += 1;
scan_cursor = (scan_cursor + 1) % len;
id_offset += 1;
scan_cursor = (scan_cursor + 1) % entry.buf.len();
}
}
}
Expand All @@ -84,9 +93,9 @@ mod test {
let (mut tx, rx) = futures::channel::mpsc::unbounded();
let (r_tx, r_rx) = futures::channel::mpsc::unbounded();

let stream = message_sequencer_stream(rx, r_tx, 2);
let stream = message_sequencer_stream(rx, r_tx, 1);

let messages: Vec<_> = (0..5)
let ordered_client1_messages: Vec<_> = (0..3)
.map(|x| Message {
id: MessageId {
publisher_id: ServiceId {
Expand All @@ -105,7 +114,29 @@ mod test {
})
.collect();

let sending_order = vec![0, 3, 2, 4, 1, 2];
let ordered_client2_messages: Vec<_> = (0..3)
.map(|x| Message {
id: MessageId {
publisher_id: ServiceId {
kind: ServiceKind::Port,
number: 0,
},
topic_id: "client2".to_string(),
topic_sequence_n: x,
},
kind: MessageKind::LimitOrderSubmitRequest {
price: 100,
quantity: 100,
symbol: ['A', 'D', 'B', 'E'],
side: Side::Ask,
},
})
.collect();

let mut messages: Vec<_> = ordered_client1_messages.clone();
messages.extend(ordered_client2_messages.clone());

let sending_order = vec![0, 5, 0, 4, 3, 2, 1, 3, 4];

for i in sending_order {
tx.send(messages[i].clone()).await.unwrap();
Expand All @@ -117,12 +148,41 @@ mod test {
futures::pin_mut!(stream);
let sampled: Vec<_> = stream.take(messages.len()).collect().await;

assert_eq!(messages, sampled);
dbg!(&sampled);

let sampled_client1_messages: Vec<_> = sampled
.iter()
.cloned()
.filter(|x| x.id.topic_id == "client1")
.collect();

let sampled_client2_messages: Vec<_> = sampled
.iter()
.cloned()
.filter(|x| x.id.topic_id == "client2")
.collect();

assert_eq!(
sampled_client1_messages,
ordered_client1_messages.clone(),
"client1 messages not received in order"
);
assert_eq!(
sampled_client2_messages,
ordered_client2_messages.clone(),
"client2 messages not received in order"
);
});

let x = tokio::spawn(async move {
let expected_rerequests = vec![
("client2".to_string(), 0),
("client2".to_string(), 1),
("client2".to_string(), 0),
("client1".to_string(), 1),
];
let rerequests: Vec<_> = r_rx.collect().await;
assert_eq!(rerequests, vec![1, 2, 1, 1])
assert_eq!(rerequests, expected_rerequests)
});

tokio::try_join!(x, y).unwrap();
Expand Down
1 change: 1 addition & 0 deletions services/matching-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/port/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/retransmitter/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 931f7ec

Please sign in to comment.