Skip to content

Commit

Permalink
feat: Optimized use of internal time-wheel memory
Browse files Browse the repository at this point in the history
  • Loading branch information
BinChengZhao committed Nov 19, 2021
1 parent 948dc49 commit 73ddde3
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 16 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
# Version 0.10.0

## Changed
Optimized the use of internal `time-wheel` memory. ([#28](https://github.com/BinChengZhao/delay-timer/issues/28)), thanks `elderbig` !

### Details
There is a `time-wheel` in `delay-timer`, which is the carrier of all tasks.

The time wheel uses slots (time scales) as units, each slot corresponds to a hash table, when a slot is rotated to it will execute the task that is already ready internally, when the task is executed it will move from one slot to another. In order to have enough capacity to store the tasks, there may be a memory allocation here, so that by the time the whole time wheel is traversed, each internal time wheel-slot will have rich memory capacity, and when there are many tasks the memory occupied by the whole time wheel will be very large. So it will be necessary to shrink the memory in time.

This change is to shrink the memory in time after each round of training slots and executing tasks to ensure that the slots have a basic and compact capacity.

# Version 0.9.2

## Changed
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "delay_timer"
version = "0.9.2"
version = "0.10.0"
authors = ["binchengZhao <[email protected]>"]
edition = "2018"
repository = "https://github.com/BinChengZhao/delay-timer"
Expand Down Expand Up @@ -49,6 +49,8 @@ tokio = { version = "^1.3.0", features = ["full"] , optional = true }
[dev-dependencies]
rand = "0.8.4"
surf = "^2.1.0"
tracing = "0.1.29"
tracing-subscriber = "0.2.0"
tokio = { version = "^1.3.0", features = ["full"] }
hyper= {version = "^0.14.2" , features = ["full"] }
pretty_env_logger = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub fn generate_closure_template(

let future = async move {
future_inner.await;
context.finishe_task().await;
context.finish_task().await;
};
create_delay_task_handler(async_spawn(future))
}
Expand Down
2 changes: 1 addition & 1 deletion examples/cycle_tokio_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn generate_closure_template(

let future = async move {
future_inner.await.ok();
context.finishe_task(None).await;
context.finish_task(None).await;
};

create_delay_task_handler(async_spawn_by_tokio(future))
Expand Down
2 changes: 1 addition & 1 deletion examples/demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn generate_closure_template(

let future = async move {
future_inner.await;
context.finishe_task(None).await;
context.finish_task(None).await;
};
create_delay_task_handler(async_spawn(future))
}
Expand Down
42 changes: 42 additions & 0 deletions examples/demo_async_std_and_tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#![allow(deprecated)]

use anyhow::Result;
use delay_timer::prelude::*;
use smol::Timer;
use std::time::Duration;
use tracing::{info, Level};
use tracing_subscriber::FmtSubscriber;

#[async_std::main]
async fn main() -> Result<()> {
// a builder for `FmtSubscriber`.
FmtSubscriber::builder()
// all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.)
// will be written to stdout.
.with_max_level(Level::DEBUG)
// completes the builder.
.init();

let delay_timer = DelayTimerBuilder::default().build();
for i in 0..1000 {
delay_timer.add_task(build_task_async_execute_process(i)?)?;
}

info!("==== All job is be init! ====");
for _ in 0..120 {
Timer::after(Duration::from_secs(60)).await;
}
Ok(delay_timer.stop_delay_timer()?)
}

fn build_task_async_execute_process(task_id: u64) -> Result<Task, TaskError> {
let mut task_builder = TaskBuilder::default();

let body = unblock_process_task_fn("echo hello".into());
task_builder
.set_frequency_by_candy(CandyFrequency::Repeated(CandyCron::Secondly))
.set_task_id(task_id)
.set_maximum_running_time(10)
.set_maximum_parallel_runnable_num(1)
.spawn(body)
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
//!
//! let future = async move {
//! future_inner.await;
//! context.finishe_task(None).await;
//! context.finish_task(None).await;
//! };
//!
//! create_delay_task_handler(async_spawn(future))
Expand Down
8 changes: 4 additions & 4 deletions src/macros/generate_fn_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ macro_rules! create_async_fn_body {
let future_inner = async move { $async_body };
future_inner.await;

context.finishe_task(None).await;
context.finish_task(None).await;
};
let handle = async_spawn(f);
create_delay_task_handler(handle)
Expand All @@ -34,7 +34,7 @@ macro_rules! create_async_fn_body {
let future_inner = async move { $async_body };
future_inner.await;

context.finishe_task(None).await;
context.finish_task(None).await;
};
let handle = async_spawn(f);
create_delay_task_handler(handle)
Expand All @@ -53,7 +53,7 @@ cfg_tokio_support!(
let future_inner = async move { $async_body };
future_inner.await;

context.finishe_task().await;
context.finish_task().await;
});
create_delay_task_handler(handle)
}
Expand All @@ -71,7 +71,7 @@ cfg_tokio_support!(
let future_inner = async move { $async_body };
future_inner.await;

context.finishe_task().await;
context.finish_task().await;
};
let handle = async_spawn(f);
create_delay_task_handler(handle)
Expand Down
11 changes: 9 additions & 2 deletions src/timer/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl Slot {
self.task_map.remove(&task_id)
}

//Check and reduce cylinder_line,
//Returns a Vec. containing all task ids to be executed.(cylinder_line == 0)
// Check and reduce cylinder_line,
// Returns a Vec. containing all task ids to be executed.(cylinder_line == 0)
pub(crate) fn arrival_time_tasks(&mut self) -> Vec<u64> {
let mut task_id_vec = vec![];

Expand All @@ -51,4 +51,11 @@ impl Slot {

task_id_vec
}

// When the operation is finished with the task, shrink the container in time
// To avoid the overall time-wheel from occupying too much memory.
// FIX: https://github.com/BinChengZhao/delay-timer/issues/28
pub(crate) fn shrink(&mut self) {
self.task_map.shrink_to(64);
}
}
2 changes: 1 addition & 1 deletion src/timer/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl TaskContext {
}

/// Send a task-Finish signal to EventHandle.
pub async fn finishe_task(self, finish_output: Option<FinishOutput>) {
pub async fn finish_task(self, finish_output: Option<FinishOutput>) {
if let Some(timer_event_sender) = self.timer_event_sender {
timer_event_sender
.send(TimerEvent::FinishTask(FinishTaskBody {
Expand Down
9 changes: 9 additions & 0 deletions src/timer/timer_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub enum TimerEvent {
pub struct Timer {
/// Event sender that provides events to `EventHandle` processing.
pub(crate) timer_event_sender: TimerEventSender,
#[allow(dead_code)]
status_report_sender: Option<AsyncSender<i32>>,
pub(crate) shared_header: SharedHeader,
}
Expand Down Expand Up @@ -237,6 +238,14 @@ impl Timer {
}
}

{
// When the operation is finished with the task, shrink the container in time
// To avoid the overall time-wheel from occupying too much memory.
if let Some(mut slot_mut) = self.shared_header.wheel_queue.get_mut(&second_hand) {
slot_mut.shrink();
}
}

clock.tick().await;
}
}
Expand Down
26 changes: 22 additions & 4 deletions src/utils/convenience.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,25 @@ pub mod functions {
) -> impl Fn(TaskContext) -> Box<dyn DelayTaskHandler> + 'static + Send + Sync {
use smol::process::{Child, Command};
move |context: TaskContext| {
debug!(
"Unblock-Process Task: {}, Record: {} Start",
context.task_id, context.record_id
);

let shell_command_clone = shell_command.clone();
create_delay_task_handler(async_spawn(async move {
let childs = parse_and_run::<Child, Command>(&shell_command_clone).await;

if let Err(err) = childs {
debug!(
"Unblock-Process Task: {}, Record: {} fail for: {}",
context.task_id,
context.record_id,
err.to_string()
);

context
.finishe_task(Some(FinishOutput::ExceptionOutput(err.to_string())))
.finish_task(Some(FinishOutput::ExceptionOutput(err.to_string())))
.await;
return Err(anyhow!(err.to_string()));
}
Expand All @@ -40,8 +52,14 @@ pub mod functions {

let last_child = childs.pop_back().ok_or_else(|| anyhow!("Without child."))?;
let output = last_child.wait_with_output().await?;

debug!(
"Unblock-Process Task: {}, Record: {} finished",
context.task_id, context.record_id
);

context
.finishe_task(Some(FinishOutput::ProcessOutput(output)))
.finish_task(Some(FinishOutput::ProcessOutput(output)))
.await;

Ok(())
Expand All @@ -63,7 +81,7 @@ pub mod functions {

if let Err(err) = childs {
context
.finishe_task(Some(FinishOutput::ExceptionOutput(err.to_string())))
.finish_task(Some(FinishOutput::ExceptionOutput(err.to_string())))
.await;
return Err(anyhow!(err.to_string()));
}
Expand All @@ -73,7 +91,7 @@ pub mod functions {
let last_child = childs.pop_back().ok_or_else(|| anyhow!("Without child."))?;
let output = last_child.wait_with_output().await?;
context
.finishe_task(Some(FinishOutput::ProcessOutput(output)))
.finish_task(Some(FinishOutput::ProcessOutput(output)))
.await;

Ok(())
Expand Down

0 comments on commit 73ddde3

Please sign in to comment.