Skip to content

Commit

Permalink
More efficient feature fetching
Browse files Browse the repository at this point in the history
We now make an educated guess as to how much "overfetch" to do based on
the proximity of subsequent Features.
  • Loading branch information
michaelkirk committed Jul 18, 2024
1 parent b335406 commit 11e8b30
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 11 deletions.
61 changes: 55 additions & 6 deletions geomedea/src/http_reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::feature::Feature;
use crate::io::async_ruszstd::MyRuzstdDecoder;
use crate::packed_r_tree::{Node, PackedRTree, PackedRTreeHttpReader};
use crate::{deserialize_from, serialized_size, Bounds, Header, Result};
use crate::{deserialize_from, serialized_size, Bounds, Header, Result, DEFAULT_PAGE_SIZE_GOAL};
use crate::{FeatureLocation, PageHeader};
use bytes::{Bytes, BytesMut};
use futures_util::{Stream, StreamExt};
use std::collections::VecDeque;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use streaming_http_range_client::{HttpClient, HttpRange};
Expand Down Expand Up @@ -118,7 +120,9 @@ struct SelectAll {

struct SelectBbox {
feature_start: u64,
current_batch: VecDeque<FeatureLocation>,
feature_locations: Box<dyn Stream<Item = Result<FeatureLocation>> + Unpin>,
first_item_of_next_batch: Option<FeatureLocation>,
}

impl SelectBbox {
Expand All @@ -129,8 +133,54 @@ impl SelectBbox {
Self {
feature_locations: Box::new(Box::pin(feature_locations)),
feature_start,
current_batch: VecDeque::new(),
first_item_of_next_batch: None,
}
}

/// Returns the location of the Feature as well as a suggested byte range within the Feature buffer
/// if a request needs to be made.
async fn next_feature_location(&mut self) -> Result<Option<(FeatureLocation, Range<u64>)>> {
if self.current_batch.is_empty() {
// Else determine next batch
let mut prev_page_starting_offset = None;

if let Some(first_item_of_next_batch) = self.first_item_of_next_batch.take() {
prev_page_starting_offset = Some(first_item_of_next_batch.page_starting_offset);
self.current_batch.push_back(first_item_of_next_batch)
}

while let Some(next) = self.feature_locations.next().await.transpose()? {
let Some(batch_starting_offset) = prev_page_starting_offset else {
// starting a new batch
assert!(self.current_batch.is_empty());
prev_page_starting_offset = Some(next.page_starting_offset);
self.current_batch.push_back(next);
continue;
};

let close_enough = DEFAULT_PAGE_SIZE_GOAL * 2;
if next.page_starting_offset < batch_starting_offset + close_enough {
// It's close enough, add it to the batch
prev_page_starting_offset = Some(next.page_starting_offset);
self.current_batch.push_back(next);
} else {
self.first_item_of_next_batch = Some(next);
break;
}
}
}

let Some(end_of_last_in_batch) = self.current_batch.back().map(|last_in_batch| {
last_in_batch.page_starting_offset + (DEFAULT_PAGE_SIZE_GOAL as f64 * 1.1) as u64
}) else {
return Ok(None);
};
let next = self.current_batch.pop_front().expect("if there is a back, there is also a front");

let start_of_batch = next.page_starting_offset;
Ok(Some((next, start_of_batch..end_of_last_in_batch)))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -330,9 +380,9 @@ impl AsyncPageReader {
&mut self,
feature_start: u64,
location: FeatureLocation,
batch_range: Range<u64>
) -> Result<()> {
// TODO be smarter about this.
let overfetch = 512_000;
let overfetch = batch_range.end - batch_range.start;

// First get to the right page.
let (mut page_decoder, page_starting_offset) = match self
Expand Down Expand Up @@ -596,13 +646,12 @@ impl Selection {
select_all.features_left_in_document -= 1;
}
Selection::SelectBbox(select_bbox) => {
let Some(next_location) = select_bbox.feature_locations.next().await.transpose()?
else {
let Some((next_location, feature_batch_range)) = select_bbox.next_feature_location().await? else {
return Ok(None);
};

page_reader
.ff_to_location(select_bbox.feature_start, next_location)
.ff_to_location(select_bbox.feature_start, next_location, feature_batch_range)
.await?;
}
}
Expand Down
4 changes: 4 additions & 0 deletions geomedea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use tokio::io as asyncio;
pub use crate::feature::{Feature, Properties, PropertyValue};
use serde::{Deserialize, Serialize};

// How large should we make each page of feature data
// before starting a new page.
pub(crate) const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64;

pub(crate) fn serialized_size<T>(value: &T) -> Result<u64>
where
T: serde::Serialize + ?Sized,
Expand Down
6 changes: 1 addition & 5 deletions geomedea/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::io::CountingWriter;
use crate::packed_r_tree::{Node, PackedRTreeWriter};
use crate::{
deserialize_from, serialize_into, serialized_size, Feature, FeatureLocation, Header,
PageHeader, Result,
PageHeader, Result, DEFAULT_PAGE_SIZE_GOAL,
};
use byteorder::{LittleEndian, WriteBytesExt};
use std::fs::File;
Expand All @@ -26,10 +26,6 @@ pub struct Writer<W: Write> {
page_size_goal: u64,
}

// How large should we make each page of feature data
// before starting a new page.
const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64;

impl<W: Write> Writer<W> {
pub fn new(inner: W, is_compressed: bool) -> Result<Self> {
let header = Header {
Expand Down

0 comments on commit 11e8b30

Please sign in to comment.