Skip to content

Commit

Permalink
Merge pull request #12 from michaelkirk/mkirk/batch-feature-locations
Browse files Browse the repository at this point in the history
batch feature locations
  • Loading branch information
michaelkirk authored Jul 18, 2024
2 parents cb2db7c + 246a673 commit 417d4f4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 10 deletions.
68 changes: 63 additions & 5 deletions geomedea/src/http_reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::feature::Feature;
use crate::io::async_ruszstd::MyRuzstdDecoder;
use crate::packed_r_tree::{Node, PackedRTree, PackedRTreeHttpReader};
use crate::{deserialize_from, serialized_size, Bounds, Header, Result};
use crate::{deserialize_from, serialized_size, Bounds, Header, Result, DEFAULT_PAGE_SIZE_GOAL};
use crate::{FeatureLocation, PageHeader};
use bytes::{Bytes, BytesMut};
use futures_util::{Stream, StreamExt};
use std::collections::VecDeque;
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
use streaming_http_range_client::{HttpClient, HttpRange};
Expand Down Expand Up @@ -118,7 +120,9 @@ struct SelectAll {

struct SelectBbox {
feature_start: u64,
current_batch: VecDeque<FeatureLocation>,
feature_locations: Box<dyn Stream<Item = Result<FeatureLocation>> + Unpin>,
first_item_of_next_batch: Option<FeatureLocation>,
}

impl SelectBbox {
Expand All @@ -129,8 +133,57 @@ impl SelectBbox {
Self {
feature_locations: Box::new(Box::pin(feature_locations)),
feature_start,
current_batch: VecDeque::new(),
first_item_of_next_batch: None,
}
}

/// Returns the location of the Feature as well as a suggested byte range within the Feature buffer
/// if a request needs to be made.
async fn next_feature_location(&mut self) -> Result<Option<(FeatureLocation, Range<u64>)>> {
if self.current_batch.is_empty() {
// Else determine next batch
let mut prev_page_starting_offset = None;

if let Some(first_item_of_next_batch) = self.first_item_of_next_batch.take() {
prev_page_starting_offset = Some(first_item_of_next_batch.page_starting_offset);
self.current_batch.push_back(first_item_of_next_batch)
}

while let Some(next) = self.feature_locations.next().await.transpose()? {
let Some(batch_starting_offset) = prev_page_starting_offset else {
// starting a new batch
assert!(self.current_batch.is_empty());
prev_page_starting_offset = Some(next.page_starting_offset);
self.current_batch.push_back(next);
continue;
};

let close_enough = DEFAULT_PAGE_SIZE_GOAL * 2;
if next.page_starting_offset < batch_starting_offset + close_enough {
// It's close enough, add it to the batch
prev_page_starting_offset = Some(next.page_starting_offset);
self.current_batch.push_back(next);
} else {
self.first_item_of_next_batch = Some(next);
break;
}
}
}

let Some(end_of_last_in_batch) = self.current_batch.back().map(|last_in_batch| {
last_in_batch.page_starting_offset + (DEFAULT_PAGE_SIZE_GOAL as f64 * 1.1) as u64
}) else {
return Ok(None);
};
let next = self
.current_batch
.pop_front()
.expect("if there is a back, there is also a front");

let start_of_batch = next.page_starting_offset;
Ok(Some((next, start_of_batch..end_of_last_in_batch)))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -330,9 +383,9 @@ impl AsyncPageReader {
&mut self,
feature_start: u64,
location: FeatureLocation,
batch_range: Range<u64>,
) -> Result<()> {
// TODO be smarter about this.
let overfetch = 512_000;
let overfetch = batch_range.end - batch_range.start;

// First get to the right page.
let (mut page_decoder, page_starting_offset) = match self
Expand Down Expand Up @@ -596,13 +649,18 @@ impl Selection {
select_all.features_left_in_document -= 1;
}
Selection::SelectBbox(select_bbox) => {
let Some(next_location) = select_bbox.feature_locations.next().await.transpose()?
let Some((next_location, feature_batch_range)) =
select_bbox.next_feature_location().await?
else {
return Ok(None);
};

page_reader
.ff_to_location(select_bbox.feature_start, next_location)
.ff_to_location(
select_bbox.feature_start,
next_location,
feature_batch_range,
)
.await?;
}
}
Expand Down
4 changes: 4 additions & 0 deletions geomedea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use tokio::io as asyncio;
pub use crate::feature::{Feature, Properties, PropertyValue};
use serde::{Deserialize, Serialize};

// How large should we make each page of feature data
// before starting a new page.
pub(crate) const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64;

pub(crate) fn serialized_size<T>(value: &T) -> Result<u64>
where
T: serde::Serialize + ?Sized,
Expand Down
6 changes: 1 addition & 5 deletions geomedea/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::io::CountingWriter;
use crate::packed_r_tree::{Node, PackedRTreeWriter};
use crate::{
deserialize_from, serialize_into, serialized_size, Feature, FeatureLocation, Header,
PageHeader, Result,
PageHeader, Result, DEFAULT_PAGE_SIZE_GOAL,
};
use byteorder::{LittleEndian, WriteBytesExt};
use std::fs::File;
Expand All @@ -26,10 +26,6 @@ pub struct Writer<W: Write> {
page_size_goal: u64,
}

// How large should we make each page of feature data
// before starting a new page.
const DEFAULT_PAGE_SIZE_GOAL: u64 = 1024 * 64;

impl<W: Write> Writer<W> {
pub fn new(inner: W, is_compressed: bool) -> Result<Self> {
let header = Header {
Expand Down

0 comments on commit 417d4f4

Please sign in to comment.