Skip to content

Commit

Permalink
[add] performance mode support (dev)
Browse files Browse the repository at this point in the history
  • Loading branch information
trheyi committed Dec 27, 2023
1 parent 207616c commit 06857d8
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 374 deletions.
125 changes: 48 additions & 77 deletions runtime/v8/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,135 +10,106 @@ import (

// RunnerMap is a runner map
type RunnerMap struct {
data map[*Runner]uint8
mutex *sync.RWMutex
data map[uint]*Runner
length uint
mutex *sync.RWMutex
}

// Dispatcher is a runner dispatcher
type Dispatcher struct {
availables RunnerMap
runners RunnerMap
count int
min int
max int
stoped bool
total uint
min uint
max uint
}

// the global dispatcher instance
// initialize when the v8 start
var dispatcher *Dispatcher = nil

// NewDispatcher is a runner dispatcher
func NewDispatcher(min, max int) *Dispatcher {
func NewDispatcher(min, max uint) *Dispatcher {

// Test the min and max
min = 50
max = 150
min = 200
max = 200
return &Dispatcher{
availables: RunnerMap{data: make(map[*Runner]uint8), mutex: &sync.RWMutex{}},
runners: RunnerMap{data: make(map[*Runner]uint8), mutex: &sync.RWMutex{}},
availables: RunnerMap{data: make(map[uint]*Runner), mutex: &sync.RWMutex{}},
total: 0,
min: min,
max: max,
stoped: false,
}
}

// Start start the v8 mannager
func (dispatcher *Dispatcher) Start() error {
for i := 0; i < dispatcher.min; i++ {
runner := NewRunner()
go runner.Start(nil)
for i := uint(0); i < dispatcher.min; i++ {
dispatcher.create()
}
log.Info("[dispatcher] the dispatcher is started. runners %d", dispatcher.count)
log.Info("[dispatcher] the dispatcher is started. runners %d", dispatcher.total)
return nil
}

// Stop stop the v8 mannager
func (dispatcher *Dispatcher) Stop() {
dispatcher.stoped = true
defer func() { dispatcher.stoped = false }()
for runner := range dispatcher.runners.data {
for _, runner := range dispatcher.availables.data {
dispatcher.destory(runner)
runner.signal <- RunnerCommandDestroy
}
}

// Register register a new v8 runner
func (dispatcher *Dispatcher) Register(runner *Runner) {
dispatcher.runners.mutex.Lock()
defer dispatcher.runners.mutex.Unlock()
dispatcher.runners.data[runner] = runner.status
dispatcher.count = len(dispatcher.runners.data)
if runner.status == RunnerStatusFree {
dispatcher.online(runner)
}
}

// Unregister unregister a v8 runner
func (dispatcher *Dispatcher) Unregister(runner *Runner) {
if runner.status == RunnerStatusRunning {
// TODO: send a command to the runner
log.Error("[dispatcher] you can't unregister a running runner")
return
}

dispatcher.runners.mutex.Lock()
defer dispatcher.runners.mutex.Unlock()
delete(dispatcher.runners.data, runner)
dispatcher.count = len(dispatcher.runners.data)
func (dispatcher *Dispatcher) offline(runner *Runner) {
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
dispatcher._offline(runner)
}

// UpdateStatus update the v8 runner status
func (dispatcher *Dispatcher) UpdateStatus(runner *Runner, status uint8) {
dispatcher.runners.mutex.Lock()
defer dispatcher.runners.mutex.Unlock()
dispatcher.runners.data[runner] = status

// log.Info("[dispatcher] update runner %p status %d", runner, status)

if status == RunnerStatusFree {
dispatcher.online(runner)
return
}
dispatcher.offline(runner)
func (dispatcher *Dispatcher) _offline(runner *Runner) {
delete(dispatcher.availables.data, runner.id)
log.Info("[dispatcher] runner %p offline (%d/%d) (%d/%d)", runner, len(dispatcher.availables.data), dispatcher.total, tempCount, cacheCount)
// create a new runner if the total runners are less than max
// if dispatcher.total < dispatcher.max {
// go dispatcher.create()
// }
}

func (dispatcher *Dispatcher) offline(runner *Runner) {
func (dispatcher *Dispatcher) online(runner *Runner) {
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
delete(dispatcher.availables.data, runner)
log.Info("[dispatcher] runner %p offline %d (%d/%d)", runner, len(dispatcher.availables.data), tempCount, cacheCount)
dispatcher.availables.data[runner.id] = runner
log.Info("[dispatcher] runner %p online (%d/%d) (%d/%d)", runner, len(dispatcher.availables.data), dispatcher.total, tempCount, cacheCount)
}

// Create a new runner if the free runners are less than max size
if dispatcher.count < dispatcher.max {
go NewRunner().Start(nil)
}
func (dispatcher *Dispatcher) create() {
runner := NewRunner(true)
go runner.Start()
dispatcher.total++
log.Info("[dispatcher] runner %p create (%d/%d) (%d/%d)", runner, len(dispatcher.availables.data), dispatcher.total, tempCount, cacheCount)
}

func (dispatcher *Dispatcher) online(runner *Runner) {
func (dispatcher *Dispatcher) destory(runner *Runner) {

Check failure on line 90 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.19.5, SQLite3, 4, 6.0)

"destory" is a misspelling of "destroy"

Check failure on line 90 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.19.5, SQLite3, 5, 6.0)

"destory" is a misspelling of "destroy"

Check failure on line 90 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.20.0, SQLite3, 4, 6.0)

"destory" is a misspelling of "destroy"
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
dispatcher.availables.data[runner] = runner.status
log.Info("[dispatcher] runner %p online %d (%d/%d)", runner, len(dispatcher.availables.data), tempCount, cacheCount)
delete(dispatcher.availables.data, runner.id)
dispatcher.total--
log.Info("[dispatcher] runner %p destory (%d,%d) (%d/%d)", runner, len(dispatcher.availables.data), dispatcher.total, tempCount, cacheCount)

Check failure on line 95 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.19.5, SQLite3, 4, 6.0)

"destory" is a misspelling of "destroy"

Check failure on line 95 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.19.5, SQLite3, 5, 6.0)

"destory" is a misspelling of "destroy"

Check failure on line 95 in runtime/v8/dispatcher.go

View workflow job for this annotation

GitHub Actions / unit-test (1.20.0, SQLite3, 4, 6.0)

"destory" is a misspelling of "destroy"
}

// Select select a free v8 runner
func (dispatcher *Dispatcher) Select(timeout time.Duration) (*Runner, error) {
if dispatcher.stoped {
return nil, fmt.Errorf("[dispatcher] the dispatcher is stoped")
}

dispatcher.availables.mutex.RLock()
defer dispatcher.availables.mutex.RUnlock()
for runner := range dispatcher.availables.data {
// log.Info("[dispatcher] select a free runner %p", runner)
dispatcher.availables.mutex.Lock()
defer dispatcher.availables.mutex.Unlock()
for _, runner := range dispatcher.availables.data {
cacheCount++
dispatcher._offline(runner)
fmt.Println("--------------------", runner.id)
fmt.Println("Select a free v8 runner id", runner.id, "count", len(dispatcher.availables.data))
return runner, nil
}

tempCount++
runner := NewRunner()
runner.status = RunnerStatusFree
runner.kind = "temp"
runner := NewRunner(false)
go runner.Start()
return runner, nil
}

Expand Down
126 changes: 8 additions & 118 deletions runtime/v8/isolate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v8

import (
"fmt"
"sync"
"time"

atobT "github.com/yaoapp/gou/runtime/v8/functions/atob"
Expand All @@ -25,41 +24,27 @@ import (
"rogchap.com/v8go"
)

var isolates = &Isolates{Data: &sync.Map{}, Len: 0}
var contextCache = map[*Isolate]map[*Script]*Context{}
var isoReady chan *store.Isolate

var chIsoReady chan *Isolate
var newIsolateLock = &sync.RWMutex{}

var chCtxReady chan *Context
var newContextLock = &sync.RWMutex{}

// initialize create a new Isolate
// in performance mode, the minSize isolates will be created
func initialize() {

v8go.YaoInit(uint(runtimeOption.HeapSizeLimit / 1024 / 1024))
fmt.Println("initialize mode:", runtimeOption.Mode)

// Make a global Isolate
// makeGlobalIsolate()
v8go.YaoInit(uint(runtimeOption.HeapSizeLimit / 1024 / 1024))

// Performance mode
if runtimeOption.Mode == "performance" {
dispatcher = NewDispatcher(runtimeOption.MinSize, runtimeOption.MaxSize)
dispatcher.Start()
return
}

isoReady = make(chan *store.Isolate, runtimeOption.MaxSize)
store.Isolates = store.New()
log.Trace(
"[V8] VM is initializing MinSize=%d MaxSize=%d HeapLimit=%d",
runtimeOption.MinSize, runtimeOption.MaxSize, runtimeOption.HeapSizeLimit,
)
if runtimeOption.Mode == "performance" {
for store.Isolates.Len() < runtimeOption.MinSize {
addIsolate()
}
}
// Standard mode
makeGlobalIsolate()
isoReady = make(chan *store.Isolate, runtimeOption.MinSize)

}

func release() {
Expand All @@ -69,45 +54,6 @@ func release() {
}
}

// addIsolate create a new and add to the isolates
func addIsolate() (*store.Isolate, error) {

if store.Isolates.Len() >= runtimeOption.MaxSize {
log.Warn("[V8] The maximum number of v8 vm has been reached (%d)", runtimeOption.MaxSize)
return nil, fmt.Errorf("The maximum number of v8 vm has been reached (%d)", runtimeOption.MaxSize)
}

iso := makeIsolate()
if runtimeOption.Precompile {
precompile(iso)
}

store.Isolates.Add(iso)
// store.MakeIsolateCache(iso.Key())
isoReady <- iso
log.Trace("[V8] VM %s is ready (%d)", iso.Key(), len(isoReady))
return iso, nil
}

// replaceIsolate
// remove a isolate
// create a new one append to the isolates if the isolates is less than minSize
func replaceIsolate(iso *store.Isolate) {
removeIsolate(iso)
if store.Isolates.Len() < runtimeOption.MinSize {
addIsolate()
}
}

// removeIsolate remove a isolate
func removeIsolate(iso *store.Isolate) {
key := iso.Key()
// store.CleanIsolateCache(key)
// store.Isolates.Remove(key)
iso.Dispose()
log.Trace("[V8] VM %s is removed", key)
}

// precompile compile the loaded scirpts
// it cost too much time and memory to compile all scripts
// ignore the error
Expand Down Expand Up @@ -161,25 +107,6 @@ func makeIsolate() *store.Isolate {
}
}

// SelectIsoPerformance one ready isolate
func SelectIsoPerformance(timeout time.Duration) (*store.Isolate, error) {

// make a timer
timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case iso := <-isoReady:
Lock(iso)
return iso, nil

case <-timer.C:
log.Error("[V8] Select isolate timeout %v", timeout)
return nil, fmt.Errorf("Select isolate timeout %v", timeout)
}

}

// SelectIsoStandard one ready isolate ( the max size is 2 )
func SelectIsoStandard(timeout time.Duration) (*store.Isolate, error) {

Expand All @@ -202,40 +129,3 @@ func SelectIsoStandard(timeout time.Duration) (*store.Isolate, error) {
return iso, nil
}
}

// Lock the isolate
func Lock(iso *store.Isolate) {
iso.Lock()
}

// Unlock the isolate
// Recycle the isolate if the isolate is not health
func Unlock(iso *store.Isolate) {

health := iso.Health(runtimeOption.HeapSizeRelease, runtimeOption.HeapAvailableSize)
available := len(isoReady)
log.Trace("[V8] VM %s is health %v available %d", iso.Key(), health, available)

// add the isolate if the available isolates are less than min size
if available < runtimeOption.MinSize {
defer addIsolate()
}

// remove the isolate if the available isolates are more than min size
if available > runtimeOption.MinSize {
go removeIsolate(iso)
return
}

// unlock the isolate if the isolate is health
if health {
iso.Unlock()
isoReady <- iso
return
}

// remove the isolate if the isolate is not health
// then create a new one
go replaceIsolate(iso)

}
Loading

0 comments on commit 06857d8

Please sign in to comment.