From 9f8143935bad0cf81c73c509e77121ccde426d21 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Thu, 25 Jan 2024 14:20:46 -0500 Subject: [PATCH] feat: Handle loading > 1000 Segment Info files --- influxdb3_write/src/persister.rs | 105 ++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 31 deletions(-) diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index 442256261ea..7a374cadff5 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -139,40 +139,60 @@ impl Persister for PersisterImpl { } } - async fn load_segments(&self, most_recent_n: usize) -> Result> { - let segment_list = self - .object_store - .list(Some(&SegmentInfoFilePath::dir())) - .await? - .collect::>() - .await; - - // Why not collect into a Result, object_store::Error>> - // like we could with Iterators? Well because it's a stream it ends up - // using different traits and can't really do that. So we need to loop - // through to return any errors that might have occurred, then do an - // unstable sort (which is faster and we know won't have any - // duplicates) since these can arrive out of order, and then issue gets - // on the n most recent segments that we want and is returned in order - // of the moste recent to least. - let mut list = Vec::new(); - for segment in segment_list { - list.push(segment?); - } + async fn load_segments(&self, mut most_recent_n: usize) -> Result> { + let mut output = Vec::new(); + let mut offset: Option = None; + while most_recent_n > 0 { + let count = if most_recent_n > 1000 { + most_recent_n -= 1000; + 1000 + } else { + let count = most_recent_n; + most_recent_n = 0; + count + }; + + let segment_list = if let Some(offset) = offset { + self.object_store + .list_with_offset(Some(&SegmentInfoFilePath::dir()), &offset) + .await? + .collect::>() + .await + } else { + self.object_store + .list(Some(&SegmentInfoFilePath::dir())) + .await? + .collect::>() + .await + }; - list.sort_unstable_by(|a, b| a.location.cmp(&b.location)); + // Why not collect into a Result, object_store::Error>> + // like we could with Iterators? Well because it's a stream it ends up + // using different traits and can't really do that. So we need to loop + // through to return any errors that might have occurred, then do an + // unstable sort (which is faster and we know won't have any + // duplicates) since these can arrive out of order, and then issue gets + // on the n most recent segments that we want and is returned in order + // of the moste recent to least. + let mut list = Vec::new(); + for segment in segment_list { + list.push(segment?); + } - let len = list.len(); - let range = if len <= most_recent_n { - 0..len - } else { - 0..most_recent_n - }; + list.sort_unstable_by(|a, b| a.location.cmp(&b.location)); - let mut output = Vec::new(); - for item in &list[range] { - let bytes = self.object_store.get(&item.location).await?.bytes().await?; - output.push(serde_json::from_slice(&bytes)?); + let len = list.len(); + let end = if len <= count { len } else { count }; + + for item in &list[0..end] { + let bytes = self.object_store.get(&item.location).await?.bytes().await?; + output.push(serde_json::from_slice(&bytes)?); + } + + // Get the last path in the array to use as an offset. This assumes + // we sorted the list as we can't guarantee otherwise the order of + // the list call to the object store. + offset = Some(list[end - 1].location.clone()); } Ok(output) @@ -405,6 +425,29 @@ async fn persist_and_load_segment_info_files_with_fewer_than_requested() { assert_eq!(segments[0].segment_id.0, 0); } +#[tokio::test] +/// This test makes sure that the logic for offset lists works +async fn persist_and_load_over_9000_segment_info_files() { + let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); + let persister = PersisterImpl::new(Arc::new(local_disk)); + for id in 0..9001 { + let info_file = PersistedSegment { + segment_id: SegmentId::new(id), + segment_wal_size_bytes: 0, + databases: HashMap::new(), + segment_min_time: 0, + segment_max_time: 1, + segment_row_count: 0, + segment_parquet_size_bytes: 0, + }; + persister.persist_segment(info_file).await.unwrap(); + } + let segments = persister.load_segments(9500).await.unwrap(); + // We asked for the most recent 9500 so there should be 9001 of them + assert_eq!(segments.len(), 9001); + assert_eq!(segments[0].segment_id.0, 9000); +} + #[tokio::test] async fn get_parquet_bytes() { let local_disk = LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();