From 931f7ec7ec65bbb656f97c097d1d0f6b9c0a4d2d Mon Sep 17 00:00:00 2001 From: Will Johnston Date: Fri, 26 Mar 2021 13:49:36 -0700 Subject: [PATCH] [matchbook-util] sequence messages within a topic --- packages/matchbook-util/Cargo.lock | 1 + packages/matchbook-util/Cargo.toml | 1 + packages/matchbook-util/src/stream.rs | 130 +++++++++++++++++++------- services/matching-engine/Cargo.lock | 1 + services/port/Cargo.lock | 1 + services/retransmitter/Cargo.lock | 1 + 6 files changed, 100 insertions(+), 35 deletions(-) diff --git a/packages/matchbook-util/Cargo.lock b/packages/matchbook-util/Cargo.lock index ee8b208..d6d24bd 100644 --- a/packages/matchbook-util/Cargo.lock +++ b/packages/matchbook-util/Cargo.lock @@ -327,6 +327,7 @@ dependencies = [ "chrono", "fixer-upper", "futures", + "itertools", "matchbook-types", "pretty_assertions", "rand", diff --git a/packages/matchbook-util/Cargo.toml b/packages/matchbook-util/Cargo.toml index 2bfd8f9..07884c3 100644 --- a/packages/matchbook-util/Cargo.toml +++ b/packages/matchbook-util/Cargo.toml @@ -21,3 +21,4 @@ bytes = "1" tokio-util = {version = "0.6.4", features = ["codec", "net"]} serde_json = "1.0" fixer-upper = { path = "../fixer-upper"} +itertools = "0.10.0" diff --git a/packages/matchbook-util/src/stream.rs b/packages/matchbook-util/src/stream.rs index e94c35a..84c5afd 100644 --- a/packages/matchbook-util/src/stream.rs +++ b/packages/matchbook-util/src/stream.rs @@ -1,33 +1,43 @@ use async_stream::stream; use futures::{SinkExt, Stream, StreamExt}; use matchbook_types::*; +use std::collections::HashMap; pub fn message_sequencer_stream( mut stream: impl futures::Stream + Unpin, - mut rerequest_sink: impl futures::Sink + Unpin, + mut rerequest_sink: impl futures::Sink<(String, usize)> + Unpin, buf_size: usize, // need to add param to re-request messages ) -> impl Stream { stream! { - let mut next_expected_id = 0; - let mut cursor = 0; - let mut buf: Vec> = vec![None; buf_size]; - let mut len = buf.len(); + let mut topics = HashMap::new(); + + struct TopicBuffer { + buf: Vec>, + next_expected_id: usize, + cursor: usize + } - // TODO(will): need to buffer by topic id while let Some(message) = stream.next().await { + let topic = message.id.topic_id.clone(); let id = message.id.topic_sequence_n as usize; - if id == next_expected_id { + let entry = topics.entry(topic.clone()).or_insert(TopicBuffer { + buf: vec![None; buf_size], + next_expected_id: 0, + cursor: 0, + }); + + if id == entry.next_expected_id { yield message; - next_expected_id += 1; + entry.next_expected_id += 1; // emit any cached messages loop { - if let Some(msg) = buf[cursor].take() { - cursor = (cursor + 1) % len; - next_expected_id += 1; + if let Some(msg) = entry.buf[entry.cursor].take() { + entry.cursor = (entry.cursor + 1) % entry.buf.len(); + entry.next_expected_id += 1; yield msg; } else { break @@ -35,37 +45,36 @@ pub fn message_sequencer_stream( } } // received element out of order - else if id > next_expected_id { - let offset = id - next_expected_id - 1; + else if id > entry.next_expected_id { + let offset = id - entry.next_expected_id - 1; // if we are going to have an entry that may overwrite an element in our buffer, we need to resize - let should_resize_buffer = offset > (len - 1); + let should_resize_buffer = offset > (entry.buf.len() - 1); if should_resize_buffer { - let mut new_buf = vec![None; buf.len() * 2]; - for (dst, x) in buf.iter().cloned().cycle().skip(cursor).take(buf.len()).enumerate() { + let mut new_buf = vec![None; entry.buf.len() * 2]; + // FIXME(will): this causes unnecessary copies + for (dst, x) in entry.buf.iter().cloned().cycle().skip(entry.cursor).take(entry.buf.len()).enumerate() { new_buf[dst] = x; } - buf = new_buf; - len = buf.len(); - cursor = 0; + entry.buf = new_buf; + entry.cursor = 0; } - let write_idx = (offset + cursor) % buf.len(); - buf[write_idx] = Some(message.clone()); + let write_idx = (offset + entry.cursor) % entry.buf.len(); + entry.buf[write_idx] = Some(message.clone()); // re-request everything that we're missing - let mut offset = 1; - let mut scan_cursor = cursor; + let mut id_offset = 1; + let mut scan_cursor = entry.cursor; - // TODO(will): error handling - let _ = rerequest_sink.send(next_expected_id).await; + let _ = rerequest_sink.send((topic.clone(), entry.next_expected_id)).await; while scan_cursor != write_idx { - if buf[scan_cursor].is_none() { - let id_to_rerequest = next_expected_id + offset; - let _ = rerequest_sink.send(id_to_rerequest).await; + if entry.buf[scan_cursor].is_none() { + let id_to_rerequest = entry.next_expected_id + id_offset; + let _ = rerequest_sink.send((topic.clone(), id_to_rerequest)).await; } - offset += 1; - scan_cursor = (scan_cursor + 1) % len; + id_offset += 1; + scan_cursor = (scan_cursor + 1) % entry.buf.len(); } } } @@ -84,9 +93,9 @@ mod test { let (mut tx, rx) = futures::channel::mpsc::unbounded(); let (r_tx, r_rx) = futures::channel::mpsc::unbounded(); - let stream = message_sequencer_stream(rx, r_tx, 2); + let stream = message_sequencer_stream(rx, r_tx, 1); - let messages: Vec<_> = (0..5) + let ordered_client1_messages: Vec<_> = (0..3) .map(|x| Message { id: MessageId { publisher_id: ServiceId { @@ -105,7 +114,29 @@ mod test { }) .collect(); - let sending_order = vec![0, 3, 2, 4, 1, 2]; + let ordered_client2_messages: Vec<_> = (0..3) + .map(|x| Message { + id: MessageId { + publisher_id: ServiceId { + kind: ServiceKind::Port, + number: 0, + }, + topic_id: "client2".to_string(), + topic_sequence_n: x, + }, + kind: MessageKind::LimitOrderSubmitRequest { + price: 100, + quantity: 100, + symbol: ['A', 'D', 'B', 'E'], + side: Side::Ask, + }, + }) + .collect(); + + let mut messages: Vec<_> = ordered_client1_messages.clone(); + messages.extend(ordered_client2_messages.clone()); + + let sending_order = vec![0, 5, 0, 4, 3, 2, 1, 3, 4]; for i in sending_order { tx.send(messages[i].clone()).await.unwrap(); @@ -117,12 +148,41 @@ mod test { futures::pin_mut!(stream); let sampled: Vec<_> = stream.take(messages.len()).collect().await; - assert_eq!(messages, sampled); + dbg!(&sampled); + + let sampled_client1_messages: Vec<_> = sampled + .iter() + .cloned() + .filter(|x| x.id.topic_id == "client1") + .collect(); + + let sampled_client2_messages: Vec<_> = sampled + .iter() + .cloned() + .filter(|x| x.id.topic_id == "client2") + .collect(); + + assert_eq!( + sampled_client1_messages, + ordered_client1_messages.clone(), + "client1 messages not received in order" + ); + assert_eq!( + sampled_client2_messages, + ordered_client2_messages.clone(), + "client2 messages not received in order" + ); }); let x = tokio::spawn(async move { + let expected_rerequests = vec![ + ("client2".to_string(), 0), + ("client2".to_string(), 1), + ("client2".to_string(), 0), + ("client1".to_string(), 1), + ]; let rerequests: Vec<_> = r_rx.collect().await; - assert_eq!(rerequests, vec![1, 2, 1, 1]) + assert_eq!(rerequests, expected_rerequests) }); tokio::try_join!(x, y).unwrap(); diff --git a/services/matching-engine/Cargo.lock b/services/matching-engine/Cargo.lock index 4d33786..295169b 100644 --- a/services/matching-engine/Cargo.lock +++ b/services/matching-engine/Cargo.lock @@ -311,6 +311,7 @@ dependencies = [ "bytes", "fixer-upper", "futures", + "itertools", "matchbook-types", "serde_json", "socket2", diff --git a/services/port/Cargo.lock b/services/port/Cargo.lock index ee27ae1..000d02f 100644 --- a/services/port/Cargo.lock +++ b/services/port/Cargo.lock @@ -338,6 +338,7 @@ dependencies = [ "bytes", "fixer-upper", "futures", + "itertools", "matchbook-types", "serde_json", "socket2", diff --git a/services/retransmitter/Cargo.lock b/services/retransmitter/Cargo.lock index b55e3e7..5950804 100644 --- a/services/retransmitter/Cargo.lock +++ b/services/retransmitter/Cargo.lock @@ -325,6 +325,7 @@ dependencies = [ "bytes", "fixer-upper", "futures", + "itertools", "matchbook-types", "serde_json", "socket2",