Skip to content

Commit

Permalink
Merge pull request #10 from michaelkirk/mkirk/cleanup
Browse files Browse the repository at this point in the history
code cleanup, no behavioral differences
  • Loading branch information
michaelkirk authored Jul 18, 2024
2 parents 127b96c + d7147d7 commit 0adec24
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 25 deletions.
3 changes: 1 addition & 2 deletions geomedea-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ impl HttpReader {
}

struct FeatureCollection(geojson::FeatureCollection);
#[wasm_bindgen]
impl FeatureCollection {
async fn new(mut feature_stream: FeatureStream<'_>) -> geomedea::Result<Self> {
async fn new(mut feature_stream: FeatureStream) -> geomedea::Result<Self> {
let mut geojson_feature_collection = geojson::FeatureCollection {
bbox: None,
features: vec![],
Expand Down
25 changes: 11 additions & 14 deletions geomedea/src/http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ impl HttpReader {

let select_all = SelectAll::new(features_count);
let stream = Selection::SelectAll(select_all)
.into_feature_buffer_stream(self.header.is_compressed, http_client)
.await?;
.into_feature_buffer_stream(self.header.is_compressed, http_client);
Ok(FeatureStream::new(stream))
}

Expand All @@ -101,8 +100,7 @@ impl HttpReader {

let select_bbox = SelectBbox::new(feature_start, Box::new(feature_locations.into_iter()));
let stream = Selection::SelectBbox(select_bbox)
.into_feature_buffer_stream(self.header.is_compressed, http_client)
.await?;
.into_feature_buffer_stream(self.header.is_compressed, http_client);
Ok(FeatureStream::new(stream))
}

Expand Down Expand Up @@ -559,11 +557,11 @@ enum Selection {
}

impl Selection {
pub async fn into_feature_buffer_stream(
pub fn into_feature_buffer_stream(
mut self,
is_compressed: bool,
http_client: HttpClient,
) -> Result<impl Stream<Item = Result<Bytes>>> {
) -> impl Stream<Item = Result<Bytes>> {
let mut page_reader = AsyncPageReader::new(is_compressed, http_client);
let stream = async_stream::try_stream! {
loop {
Expand All @@ -575,7 +573,7 @@ impl Selection {
}
}
};
Ok(Box::pin(stream))
stream
}

async fn next_feature_buffer(
Expand Down Expand Up @@ -614,31 +612,30 @@ impl Selection {
let feature_len = u64::from_le_bytes(len_bytes);

let mut feature_buffer = BytesMut::zeroed(feature_len as usize);
// Error is on this next line:
page_reader.read_exact(&mut feature_buffer).await?;

Ok(Some(feature_buffer.freeze()))
}
}

pub struct FeatureStream<'a> {
inner: Box<dyn Stream<Item = Result<Feature>> + Unpin + 'a>,
pub struct FeatureStream {
inner: Box<dyn Stream<Item = Result<Feature>> + Unpin>,
}

impl<'a> FeatureStream<'a> {
fn new(stream: impl Stream<Item = Result<Bytes>> + Unpin + 'a) -> Self {
impl FeatureStream {
fn new(stream: impl Stream<Item = Result<Bytes>> + 'static) -> Self {
let inner = stream.map(move |feature_buffer| {
let feature = deserialize_from::<_, Feature>(feature_buffer?.as_ref())?;
// trace!("yielding feature: {feature:?}");
Ok(feature)
});
Self {
inner: Box::new(inner),
inner: Box::new(Box::pin(inner)),
}
}
}

impl Stream for FeatureStream<'_> {
impl Stream for FeatureStream {
type Item = Result<Feature>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
2 changes: 0 additions & 2 deletions geomedea/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ mod bounds;
mod error;
mod feature;
mod geometry;
#[cfg(feature = "writer")]
mod hilbert;
mod http_reader;
pub use http_reader::{FeatureStream, HttpReader};
mod format;
Expand Down
14 changes: 9 additions & 5 deletions geomedea/src/packed_r_tree/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,12 @@ pub(crate) mod http {
while let Some(node_range) = queue.pop_front() {
let level = self.tree.level_for_node_idx(node_range.start);
trace!("next node_range {node_range:?} (level {level})");
// REVIEW: why return node_idx? Can't we infer it from node_range?
for (node_idx, node) in self.read_node_range(node_range).await? {
for (node, node_idx) in self
.read_node_range(node_range.clone())
.await?
.into_iter()
.zip(node_range)
{
if !node.bounds.intersects(bbox) {
continue;
}
Expand Down Expand Up @@ -175,7 +179,7 @@ pub(crate) mod http {
self.http_client
}

async fn read_node_range(&mut self, node_range: Range<u64>) -> Result<Vec<(u64, Node)>> {
async fn read_node_range(&mut self, node_range: Range<u64>) -> Result<Vec<Node>> {
let start_byte =
self.index_starting_byte + node_range.start * Node::serialized_size() as u64;
let end_byte =
Expand All @@ -185,11 +189,11 @@ pub(crate) mod http {

let node_range_len = (node_range.end - node_range.start) as usize;
let mut nodes = Vec::with_capacity(node_range_len);
for node_id in node_range {
for _node_id in node_range {
let mut node_bytes = vec![0u8; Node::serialized_size()];
self.http_client.read_exact(&mut node_bytes).await?;
let node: Node = deserialize_from(&*node_bytes)?;
nodes.push((node_id, node))
nodes.push(node)
}

Ok(nodes)
Expand Down
File renamed without changes.
4 changes: 3 additions & 1 deletion geomedea/src/writer.rs → geomedea/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::geometry::Bounded;
use crate::io::CountingWriter;
use crate::packed_r_tree::{Node, PackedRTreeWriter};
use crate::{
deserialize_from, hilbert, serialize_into, serialized_size, Feature, FeatureLocation, Header,
deserialize_from, serialize_into, serialized_size, Feature, FeatureLocation, Header,
PageHeader, Result,
};
use byteorder::{LittleEndian, WriteBytesExt};
Expand All @@ -12,6 +12,8 @@ use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::marker::PhantomData;
use tempfile::tempfile;

mod hilbert;

#[derive(Debug)]
pub struct Writer<W: Write> {
inner: W,
Expand Down
2 changes: 1 addition & 1 deletion geomedea_geozero/src/geozero_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod processing {
/// # }
/// ```
pub async fn process_features<W: FeatureProcessor>(
stream: &mut FeatureStream<'_>,
stream: &mut FeatureStream,
out: &mut W,
) -> GeozeroResult<()> {
out.dataset_begin(None)?;
Expand Down

0 comments on commit 0adec24

Please sign in to comment.