Skip to content

Commit

Permalink
Refactor runner destruction and logging messages
Browse files Browse the repository at this point in the history
  • Loading branch information
trheyi committed Dec 28, 2023
1 parent 85b6b69 commit 0d975c4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 48 deletions.
18 changes: 9 additions & 9 deletions runtime/v8/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (dispatcher *Dispatcher) Start() error {
// Stop stop the v8 mannager
func (dispatcher *Dispatcher) Stop() {
for _, runner := range dispatcher.availables.data {
dispatcher.destory(runner)
dispatcher.destroy(runner)
runner.signal <- RunnerCommandDestroy
}
}
Expand All @@ -75,7 +75,7 @@ func (dispatcher *Dispatcher) offline(runner *Runner) {

func (dispatcher *Dispatcher) _offline(runner *Runner) {
delete(dispatcher.availables.data, runner.id)
log.Trace("[dispatcher] runner %d offline (%d/%d) %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
log.Trace("[dispatcher] [%d] runner offline. availables:%d, total:%d, %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
// create a new runner if the total runners are less than max
// if dispatcher.total < dispatcher.max {
// go dispatcher.create()
Expand All @@ -86,7 +86,7 @@ func (dispatcher *Dispatcher) online(runner *Runner) {
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
dispatcher.availables.data[runner.id] = runner
log.Trace("[dispatcher] runner %d online (%d/%d) %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
log.Trace("[dispatcher] [%d] runner online. availables:%d, total:%d, %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
}

func (dispatcher *Dispatcher) create() {
Expand All @@ -95,15 +95,15 @@ func (dispatcher *Dispatcher) create() {
go runner.Start(ready)
<-ready
dispatcher.total++
log.Trace("[dispatcher] runner %d create (%d/%d) %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
log.Trace("[dispatcher] [%d] runner create. availables:%d, total:%d, %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
}

func (dispatcher *Dispatcher) destory(runner *Runner) {
func (dispatcher *Dispatcher) destroy(runner *Runner) {
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
delete(dispatcher.availables.data, runner.id)
dispatcher.total--
log.Trace("[dispatcher] runner %d destory (%d,%d) %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
log.Trace("[dispatcher] [%d] runner destroy. availables:%d, total:%d, %s", runner.id, len(dispatcher.availables.data), dispatcher.total, dispatcher.health)
}

// Select select a free v8 runner
Expand All @@ -114,8 +114,8 @@ func (dispatcher *Dispatcher) Select(timeout time.Duration) (*Runner, error) {

for _, runner := range dispatcher.availables.data {
dispatcher._offline(runner)
log.Debug(fmt.Sprintf("--- %d -----------------\n", runner.id))
log.Debug(fmt.Sprintln("1. Select a free v8 runner id", runner.id, "count", len(dispatcher.availables.data)))
log.Debug(fmt.Sprintf("--- [%d] -----------------", runner.id))
log.Debug(fmt.Sprintf("1. [%d] Select a free v8 runner. availables=%d", runner.id, len(dispatcher.availables.data)))
return runner, nil
}

Expand Down Expand Up @@ -162,7 +162,7 @@ func (dispatcher *Dispatcher) totalCount() {
}

func (health *Health) String() string {
return fmt.Sprintf("missing: %d, total: %d", health.missing, health.total)
return fmt.Sprintf("missing:%d, total:%d", health.missing, health.total)
}

// Reset reset the health
Expand Down
55 changes: 16 additions & 39 deletions runtime/v8/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ const (
// RunnerCommandExec is the runner command exec
RunnerCommandExec

// RunnerCommandClose is the runner command close
RunnerCommandClose

// RunnerCommandStatus is the runner command status
RunnerCommandStatus
)
Expand Down Expand Up @@ -97,6 +94,7 @@ func (runner *Runner) Start(ready chan bool) error {
}

ticker := time.NewTicker(time.Millisecond * 50)

ready <- true

// Command loop
Expand All @@ -120,12 +118,8 @@ func (runner *Runner) Start(ready chan bool) error {
runner.exec()
break

case RunnerCommandClose:
runner.close()
break

case RunnerCommandDestroy:
runner.destory()
runner.destroy()
return nil

default:
Expand All @@ -152,7 +146,7 @@ func (runner *Runner) Exec(script *Script) interface{} {
runner.status = RunnerStatusRunning
runner.script = script
runner.chResp = make(chan interface{})
log.Debug(fmt.Sprintln("2. Exec a script to the v8 runner to execute", "runner.id:", runner.id, "status:", runner.status, "keepalive:", runner.keepalive, len(runner.signal)))
log.Debug(fmt.Sprintf("2. [%d] Exec script %s.%s. status:%d, keepalive:%v, signal:%d", runner.id, script.ID, runner.method, runner.status, runner.keepalive, len(runner.signal)))

runner.signal <- RunnerCommandExec
select {
Expand All @@ -171,13 +165,15 @@ func (runner *Runner) exec() {
defer func() {
go func() {
if !runner.keepalive {
log.Debug(fmt.Sprintln("3.1 Send a destory signal to the v8 runner", "runner.id:", runner.id, "status:", runner.status, runner.keepalive))
log.Debug(fmt.Sprintf("3.1 [%d] Send a destroy signal to the v8 runner. status:%d, keepalive:%v", runner.id, runner.status, runner.keepalive))
runner.signal <- RunnerCommandDestroy
log.Debug(fmt.Sprintln("3.2 Send a destory signal to the v8 runner done", "runner.id:", runner.id, "status:", runner.status))
log.Debug(fmt.Sprintf("3.2 [%d] Send a destroy signal to the v8 runner. sstatus:%d, keepalive:%v (done)", runner.id, runner.status, runner.keepalive))
return
}
runner.signal <- RunnerCommandClose
log.Debug(fmt.Sprintln("3. Send a close signal to the v8 runner done", "runner.id:", runner.id, "status:", runner.status, runner.keepalive))

log.Debug(fmt.Sprintf("3.1 [%d] Send a reset signal to the v8 runner. status:%d, keepalive:%v", runner.id, runner.status, runner.keepalive))
runner.signal <- RunnerCommandReset
log.Debug(fmt.Sprintf("3.2 [%d] Send a reset signal to the v8 runner. status:%d, keepalive:%v (done)", runner.id, runner.status, runner.keepalive))
}()
}()

Expand Down Expand Up @@ -235,32 +231,11 @@ func (runner *Runner) _exec() {
runner.chResp <- goRes
}

func (runner *Runner) close() {
// destroy the runner
func (runner *Runner) destroy() {

log.Debug(fmt.Sprintln("4. close the runner", "runner.id:", runner.id, "status:", runner.status))
log.Debug(fmt.Sprintf("--- %d end -----------------\n\n", runner.id))

if runner.keepalive {
runner.reset()
return
}

// Clean the runner
if runner.signal != nil {
close(runner.signal)
}
runner.ctx.Close()
runner.iso.Dispose()
runner.iso = nil
runner.ctx = nil
runner.args = nil
}

// destory the runner
func (runner *Runner) destory() {

log.Debug(fmt.Sprintln("4. destory the runner", "runner.id:", runner.id, "status:", runner.status))
log.Debug(fmt.Sprintf("--- %d end -----------------\n\n", runner.id))
log.Debug(fmt.Sprintf("4. [%d] destroy the runner. status:%d, keepalive:%v ", runner.id, runner.status, runner.keepalive))
log.Debug(fmt.Sprintf("--- [%d] end -----------------", runner.id))

runner.status = RunnerStatusDestroy
if runner.signal != nil {
Expand All @@ -277,8 +252,10 @@ func (runner *Runner) destory() {
// reset the runner
func (runner *Runner) reset() {

log.Debug(fmt.Sprintf("4. [%d] destroy the runner. status:%d, keepalive:%v ", runner.id, runner.status, runner.keepalive))
log.Debug(fmt.Sprintf("--- [%d] end -----------------", runner.id))

runner.status = RunnerStatusDestroy
// dispatcher.UpdateStatus(runner, RunnerStatusDestroy)

runner.ctx.Close()
runner.iso.Dispose()
Expand Down

0 comments on commit 0d975c4

Please sign in to comment.