Skip to content

Commit

Permalink
Merge branch 'refactor-pack-streaming'
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Jul 17, 2023
2 parents 9fbed4b + 33f95ba commit 8a46a7e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
17 changes: 0 additions & 17 deletions gix-features/src/zlib/stream/inflate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,6 @@ use std::{io, io::BufRead};

use flate2::{Decompress, FlushDecompress, Status};

/// The boxed variant is faster for what we do (moving the decompressor in and out a lot)
pub struct ReadBoxed<R> {
/// The reader from which bytes should be decompressed.
pub inner: R,
/// The decompressor doing all the work.
pub decompressor: Box<Decompress>,
}

impl<R> io::Read for ReadBoxed<R>
where
R: BufRead,
{
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
read(&mut self.inner, &mut self.decompressor, into)
}
}

/// Read bytes from `rd` and decompress them using `state` into a pre-allocated fitting buffer `dst`, returning the amount of bytes written.
pub fn read(rd: &mut impl BufRead, state: &mut Decompress, mut dst: &mut [u8]) -> io::Result<usize> {
let mut total_written = 0;
Expand Down
69 changes: 49 additions & 20 deletions gix-pack/src/data/input/bytes_to_entries.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::{fs, io};

use gix_features::{
hash,
hash::Sha1,
zlib::{stream::inflate::ReadBoxed, Decompress},
};
use gix_features::{hash::Sha1, zlib::Decompress};
use gix_hash::ObjectId;

use crate::data::input;
Expand All @@ -14,7 +10,7 @@ use crate::data::input;
/// The iterator used as part of [`Bundle::write_to_directory(…)`][crate::Bundle::write_to_directory()].
pub struct BytesToEntriesIter<BR> {
read: BR,
decompressor: Option<Box<Decompress>>,
decompressor: Decompress,
offset: u64,
had_error: bool,
version: crate::data::Version,
Expand Down Expand Up @@ -66,7 +62,7 @@ where
);
Ok(BytesToEntriesIter {
read,
decompressor: None,
decompressor: Decompress::new(true),
compressed,
offset: 12,
had_error: false,
Expand All @@ -88,31 +84,25 @@ where
self.objects_left -= 1; // even an error counts as objects

// Read header
let entry = match self.hash.take() {
let entry = match self.hash.as_mut() {
Some(hash) => {
let mut read = read_and_pass_to(
&mut self.read,
hash::Write {
HashWrite {
inner: io::sink(),
hash,
},
);
let res = crate::data::Entry::from_read(&mut read, self.offset, self.hash_len);
self.hash = Some(read.write.hash);
res
crate::data::Entry::from_read(&mut read, self.offset, self.hash_len)
}
None => crate::data::Entry::from_read(&mut self.read, self.offset, self.hash_len),
}
.map_err(input::Error::from)?;

// Decompress object to learn its compressed bytes
let mut decompressor = self
.decompressor
.take()
.unwrap_or_else(|| Box::new(Decompress::new(true)));
let compressed_buf = self.compressed_buf.take().unwrap_or_else(|| Vec::with_capacity(4096));
decompressor.reset(true);
let mut decompressed_reader = ReadBoxed {
self.decompressor.reset(true);
let mut decompressed_reader = DecompressRead {
inner: read_and_pass_to(
&mut self.read,
if self.compressed.keep() {
Expand All @@ -121,7 +111,7 @@ where
compressed_buf
},
),
decompressor,
decompressor: &mut self.decompressor,
};

let bytes_copied = io::copy(&mut decompressed_reader, &mut io::sink())?;
Expand All @@ -135,7 +125,6 @@ where
let pack_offset = self.offset;
let compressed_size = decompressed_reader.decompressor.total_in();
self.offset += entry.header_size() as u64 + compressed_size;
self.decompressor = Some(decompressed_reader.decompressor);

let mut compressed = decompressed_reader.inner.write;
debug_assert_eq!(
Expand Down Expand Up @@ -293,3 +282,43 @@ impl crate::data::File {
)
}
}

/// The boxed variant is faster for what we do (moving the decompressor in and out a lot)
pub struct DecompressRead<'a, R> {
/// The reader from which bytes should be decompressed.
pub inner: R,
/// The decompressor doing all the work.
pub decompressor: &'a mut Decompress,
}

impl<'a, R> io::Read for DecompressRead<'a, R>
where
R: io::BufRead,
{
fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
gix_features::zlib::stream::inflate::read(&mut self.inner, self.decompressor, into)
}
}

/// A utility to automatically generate a hash while writing into an inner writer.
pub struct HashWrite<'a, T> {
/// The hash implementation.
pub hash: &'a mut Sha1,
/// The inner writer.
pub inner: T,
}

impl<'a, T> std::io::Write for HashWrite<'a, T>
where
T: std::io::Write,
{
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let written = self.inner.write(buf)?;
self.hash.update(&buf[..written]);
Ok(written)
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}

0 comments on commit 8a46a7e

Please sign in to comment.