From e82b46c449d6892c6c2de4ad0df81ba484a1e702 Mon Sep 17 00:00:00 2001 From: Michael Kirk Date: Thu, 18 Jul 2024 14:07:41 -0700 Subject: [PATCH] More efficient feature fetching We now make an educated guess as to how much "overfetch" to do based on the proximity of subsequent Features. --- geomedea/src/http_reader.rs | 61 +++++++++++++++++++++++++++++++++---- geomedea/src/lib.rs | 4 +++ geomedea/src/writer/mod.rs | 6 +--- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/geomedea/src/http_reader.rs b/geomedea/src/http_reader.rs index 6216f58..86b4236 100644 --- a/geomedea/src/http_reader.rs +++ b/geomedea/src/http_reader.rs @@ -1,10 +1,12 @@ use crate::feature::Feature; use crate::io::async_ruszstd::MyRuzstdDecoder; use crate::packed_r_tree::{Node, PackedRTree, PackedRTreeHttpReader}; -use crate::{deserialize_from, serialized_size, Bounds, Header, Result}; +use crate::{deserialize_from, serialized_size, Bounds, Header, Result, DEFAULT_PAGE_SIZE_GOAL}; use crate::{FeatureLocation, PageHeader}; use bytes::{Bytes, BytesMut}; use futures_util::{Stream, StreamExt}; +use std::collections::VecDeque; +use std::ops::Range; use std::pin::Pin; use std::task::{Context, Poll}; use streaming_http_range_client::{HttpClient, HttpRange}; @@ -118,7 +120,9 @@ struct SelectAll { struct SelectBbox { feature_start: u64, + current_batch: VecDeque, feature_locations: Box> + Unpin>, + first_item_of_next_batch: Option, } impl SelectBbox { @@ -129,8 +133,54 @@ impl SelectBbox { Self { feature_locations: Box::new(Box::pin(feature_locations)), feature_start, + current_batch: VecDeque::new(), + first_item_of_next_batch: None, } } + + /// Returns the location of the Feature as well as a suggested byte range within the Feature buffer + /// if a request needs to be made. + async fn next_feature_location(&mut self) -> Result)>> { + if self.current_batch.is_empty() { + // Else determine next batch + let mut prev_page_starting_offset = None; + + if let Some(first_item_of_next_batch) = self.first_item_of_next_batch.take() { + prev_page_starting_offset = Some(first_item_of_next_batch.page_starting_offset); + self.current_batch.push_back(first_item_of_next_batch) + } + + while let Some(next) = self.feature_locations.next().await.transpose()? { + let Some(batch_starting_offset) = prev_page_starting_offset else { + // starting a new batch + assert!(self.current_batch.is_empty()); + prev_page_starting_offset = Some(next.page_starting_offset); + self.current_batch.push_back(next); + continue; + }; + + let close_enough = DEFAULT_PAGE_SIZE_GOAL * 2; + if next.page_starting_offset < batch_starting_offset + close_enough { + // It's close enough, add it to the batch + prev_page_starting_offset = Some(next.page_starting_offset); + self.current_batch.push_back(next); + } else { + self.first_item_of_next_batch = Some(next); + break; + } + } + } + + let Some(end_of_last_in_batch) = self.current_batch.back().map(|last_in_batch| { + last_in_batch.page_starting_offset + (DEFAULT_PAGE_SIZE_GOAL as f64 * 1.1) as u64 + }) else { + return Ok(None); + }; + let next = self.current_batch.pop_front().expect("if there is a back, there is also a front"); + + let start_of_batch = next.page_starting_offset; + Ok(Some((next, start_of_batch..end_of_last_in_batch))) + } } #[derive(Debug)] @@ -330,9 +380,9 @@ impl AsyncPageReader { &mut self, feature_start: u64, location: FeatureLocation, + batch_range: Range ) -> Result<()> { - // TODO be smarter about this. - let overfetch = 512_000; + let overfetch = batch_range.end - batch_range.start; // First get to the right page. let (mut page_decoder, page_starting_offset) = match self @@ -596,13 +646,12 @@ impl Selection { select_all.features_left_in_document -= 1; } Selection::SelectBbox(select_bbox) => { - let Some(next_location) = select_bbox.feature_locations.next().await.transpose()? - else { + let Some((next_location, feature_batch_range)) = select_bbox.next_feature_location().await? else { return Ok(None); }; page_reader - .ff_to_location(select_bbox.feature_start, next_location) + .ff_to_location(select_bbox.feature_start, next_location, feature_batch_range) .await?; } } diff --git a/geomedea/src/lib.rs b/geomedea/src/lib.rs index c9122d3..934a7fb 100644 --- a/geomedea/src/lib.rs +++ b/geomedea/src/lib.rs @@ -39,6 +39,10 @@ use tokio::io as asyncio; pub use crate::feature::{Feature, Properties, PropertyValue}; use serde::{Deserialize, Serialize}; +// How large should we make each page of feature data +// before starting a new page. +pub(crate) const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64; + pub(crate) fn serialized_size(value: &T) -> Result where T: serde::Serialize + ?Sized, diff --git a/geomedea/src/writer/mod.rs b/geomedea/src/writer/mod.rs index 071638b..45e008d 100644 --- a/geomedea/src/writer/mod.rs +++ b/geomedea/src/writer/mod.rs @@ -4,7 +4,7 @@ use crate::io::CountingWriter; use crate::packed_r_tree::{Node, PackedRTreeWriter}; use crate::{ deserialize_from, serialize_into, serialized_size, Feature, FeatureLocation, Header, - PageHeader, Result, + PageHeader, Result, DEFAULT_PAGE_SIZE_GOAL, }; use byteorder::{LittleEndian, WriteBytesExt}; use std::fs::File; @@ -26,10 +26,6 @@ pub struct Writer { page_size_goal: u64, } -// How large should we make each page of feature data -// before starting a new page. -const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64; - impl Writer { pub fn new(inner: W, is_compressed: bool) -> Result { let header = Header {