Skip to content

Commit

Permalink
Read previous logs if latest log does not include server start
Browse files Browse the repository at this point in the history
  • Loading branch information
fenhl committed Jan 6, 2024
1 parent c35278b commit 320c756
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crate/wurstminebot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ serde_json = "1" #TODO make sure enabling the arbitrary_precision feature doesn'
serenity = "0.12"
thiserror = "1"

[dependencies.async-compression]
version = "0.4"
features = ["gzip", "tokio"]

[dependencies.async_zip]
version = "0.0.16"
features = ["full"]
Expand Down
36 changes: 29 additions & 7 deletions crate/wurstminebot/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use {
io::{
self,
AsyncBufReadExt as _,
AsyncReadExt as _,
BufReader,
},
sync::RwLock,
Expand All @@ -55,7 +56,10 @@ use {
File,
},
io_error_from_reqwest,
traits::ReqwestResponseExt as _,
traits::{
IoResultExt as _,
ReqwestResponseExt as _,
},
},
};

Expand Down Expand Up @@ -285,14 +289,32 @@ impl Line {
}
}

fn history(http_client: reqwest::Client, world: &World) -> impl Stream<Item = Result<Line, Error>> {
stream::once(fs::read_to_string(world.dir().join("logs/latest.log")))
.err_into::<Error>()
.and_then(move |contents| {
async fn history_paths(world: &World) -> Result<Vec<PathBuf>, Error> {
let mut logs = fs::read_dir(world.dir().join("logs")).map_ok(|entry| entry.path()).try_collect::<Vec<_>>().await?;
logs.sort_unstable_by(|a, b| b.cmp(a));
logs.reserve_exact(1);
logs.push(world.dir().join("server.log"));
Ok(logs)
}

fn history(http_client: reqwest::Client, world: &World) -> impl Stream<Item = Result<Line, Error>> + '_ {
stream::once(history_paths(world))
.and_then(move |paths| {
let http_client = http_client.clone();
future::ok(
stream::iter(contents.lines().rev().map(|line| line.to_owned()).collect_vec())
.then(move |line| {
stream::iter(paths)
.then(|path| async move {
if path.extension().is_some_and(|ext| ext == "gz") {
let mut buf = String::default();
async_compression::tokio::bufread::GzipDecoder::new(BufReader::new(File::open(&path).await?)).read_to_string(&mut buf).await.at(path)?;
Ok(buf)
} else {
fs::read_to_string(path).await
}
})
.and_then(|contents| future::ok(stream::iter(contents.lines().rev().map(|line| line.to_owned()).collect_vec()).map(Ok)))
.try_flatten()
.and_then(move |line| {
let http_client = http_client.clone();
async move {
Line::parse(Arc::new(RwLock::new(FollowerState { // reset state for each line since we're going backwards
Expand Down

0 comments on commit 320c756

Please sign in to comment.