From fd99c4d73e01b0e80993411bac97158a909ee571 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Wed, 18 Oct 2023 16:28:17 +0800 Subject: [PATCH 01/15] [Bug fixed]: fixed a bug in timing/cu/issuearbiter.go; this bug causes dead lock when (1,2,3) are satisfied. 1) wavefonts in wfpools[next] are complted but not scheduled out; 2)wfpools[next+1].wfs has activate wfs; 3)other wavefronts are completed --- timing/cu/issuearbiter.go | 42 +++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index ac594acd..46c9bf98 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -23,24 +23,36 @@ func (a *IssueArbiter) Arbitrate( return []*wavefront.Wavefront{} } - a.moveToNextSIMD(wfPools) - for len(wfPools[a.lastSIMDID].wfs) == 0 { - a.moveToNextSIMD(wfPools) - } + originalSIMDID := a.lastSIMDID - typeMask := make([]bool, 7) - wfPool := wfPools[a.lastSIMDID] list := make([]*wavefront.Wavefront, 0) - for _, wf := range wfPool.wfs { - if wf.State != wavefront.WfReady || wf.InstToIssue == nil { - continue - } + for len(list) == 0 { + a.moveToNextSIMD(wfPools) + for len(wfPools[a.lastSIMDID].wfs) == 0 { + a.moveToNextSIMD(wfPools) + if a.lastSIMDID == originalSIMDID { + break + } + } + if len(wfPools[a.lastSIMDID].wfs) != 0 { - if typeMask[wf.InstToIssue.ExeUnit] == false { - list = append(list, wf) - typeMask[wf.InstToIssue.ExeUnit] = true - } - } + typeMask := make([]bool, 7) + wfPool := wfPools[a.lastSIMDID] + for _, wf := range wfPool.wfs { + if wf.State != wavefront.WfReady || wf.InstToIssue == nil { + continue + } + + if typeMask[wf.InstToIssue.ExeUnit] == false { + list = append(list, wf) + typeMask[wf.InstToIssue.ExeUnit] = true + } + } + } + if a.lastSIMDID == originalSIMDID { + break + } + } return list } From d4b0438182c51a1933d45a92a341522eb95f51a4 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Wed, 18 Oct 2023 20:18:29 +0800 Subject: [PATCH 02/15] [Format]: how to replace 4 space with a tab --- timing/cu/issuearbiter.go | 52 +++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index 46c9bf98..f3cb4e28 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -23,36 +23,36 @@ func (a *IssueArbiter) Arbitrate( return []*wavefront.Wavefront{} } - originalSIMDID := a.lastSIMDID + originalSIMDID := a.lastSIMDID list := make([]*wavefront.Wavefront, 0) - for len(list) == 0 { - a.moveToNextSIMD(wfPools) - for len(wfPools[a.lastSIMDID].wfs) == 0 { - a.moveToNextSIMD(wfPools) - if a.lastSIMDID == originalSIMDID { - break - } - } - if len(wfPools[a.lastSIMDID].wfs) != 0 { + for len(list) == 0 { + a.moveToNextSIMD(wfPools) + for len(wfPools[a.lastSIMDID].wfs) == 0 { + a.moveToNextSIMD(wfPools) + if a.lastSIMDID == originalSIMDID { + break + } + } + if len(wfPools[a.lastSIMDID].wfs) != 0 { - typeMask := make([]bool, 7) - wfPool := wfPools[a.lastSIMDID] - for _, wf := range wfPool.wfs { - if wf.State != wavefront.WfReady || wf.InstToIssue == nil { - continue - } + typeMask := make([]bool, 7) + wfPool := wfPools[a.lastSIMDID] + for _, wf := range wfPool.wfs { + if wf.State != wavefront.WfReady || wf.InstToIssue == nil { + continue + } - if typeMask[wf.InstToIssue.ExeUnit] == false { - list = append(list, wf) - typeMask[wf.InstToIssue.ExeUnit] = true - } - } - } - if a.lastSIMDID == originalSIMDID { - break - } - } + if typeMask[wf.InstToIssue.ExeUnit] == false { + list = append(list, wf) + typeMask[wf.InstToIssue.ExeUnit] = true + } + } + } + if a.lastSIMDID == originalSIMDID { + break + } + } return list } From 18782ca0061d1ae74244386582bc42151b966992 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Oct 2023 13:01:51 +0800 Subject: [PATCH 03/15] [Issue Arbiter]: fixed a minimal bug; could cause dead loop issue --- timing/cu/issuearbiter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index f3cb4e28..76cb734b 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -29,10 +29,10 @@ func (a *IssueArbiter) Arbitrate( for len(list) == 0 { a.moveToNextSIMD(wfPools) for len(wfPools[a.lastSIMDID].wfs) == 0 { - a.moveToNextSIMD(wfPools) if a.lastSIMDID == originalSIMDID { break } + a.moveToNextSIMD(wfPools) } if len(wfPools[a.lastSIMDID].wfs) != 0 { From ae39027d53e0af7ac5e9cc327f4d11802bcd8d9c Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Mon, 27 Nov 2023 21:56:35 +0800 Subject: [PATCH 04/15] [Bug Fixed]: fixed a bug in vector memory unit --- timing/cu/vectormemoryunit.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/timing/cu/vectormemoryunit.go b/timing/cu/vectormemoryunit.go index c0b98528..ba93b0ea 100644 --- a/timing/cu/vectormemoryunit.go +++ b/timing/cu/vectormemoryunit.go @@ -109,8 +109,9 @@ func (u *VectorMemoryUnit) insertTransactionToPipeline( } func (u *VectorMemoryUnit) execute(now sim.VTimeInSec) (madeProgress bool) { - item := u.postInstructionPipelineBuffer.Pop() + item := u.postInstructionPipelineBuffer.Peek() if item == nil { + u.postInstructionPipelineBuffer.Pop() return false } @@ -126,6 +127,7 @@ func (u *VectorMemoryUnit) execute(now sim.VTimeInSec) (madeProgress bool) { log.Panicf("running inst %s in vector memory unit is not supported", inst.String(nil)) } + u.postInstructionPipelineBuffer.Pop() u.cu.UpdatePCAndSetReady(wave) u.numInstInFlight-- From f3b0b8cc5cf6253eecc6e00c09a8213555062023 Mon Sep 17 00:00:00 2001 From: Yifan Sun Date: Mon, 27 Nov 2023 07:43:21 -0700 Subject: [PATCH 05/15] Apply suggestions from code review Minor changes in formats --- timing/cu/issuearbiter.go | 3 ++- timing/cu/vectormemoryunit.go | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index 76cb734b..7171bb96 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -34,8 +34,8 @@ func (a *IssueArbiter) Arbitrate( } a.moveToNextSIMD(wfPools) } - if len(wfPools[a.lastSIMDID].wfs) != 0 { + if len(wfPools[a.lastSIMDID].wfs) != 0 { typeMask := make([]bool, 7) wfPool := wfPools[a.lastSIMDID] for _, wf := range wfPool.wfs { @@ -49,6 +49,7 @@ func (a *IssueArbiter) Arbitrate( } } } + if a.lastSIMDID == originalSIMDID { break } diff --git a/timing/cu/vectormemoryunit.go b/timing/cu/vectormemoryunit.go index ba93b0ea..8b571336 100644 --- a/timing/cu/vectormemoryunit.go +++ b/timing/cu/vectormemoryunit.go @@ -111,7 +111,6 @@ func (u *VectorMemoryUnit) insertTransactionToPipeline( func (u *VectorMemoryUnit) execute(now sim.VTimeInSec) (madeProgress bool) { item := u.postInstructionPipelineBuffer.Peek() if item == nil { - u.postInstructionPipelineBuffer.Pop() return false } From ea939c3e35a21a115950fa4689acb6f74ae50876 Mon Sep 17 00:00:00 2001 From: Yifan Sun Date: Wed, 13 Dec 2023 10:19:07 -0500 Subject: [PATCH 06/15] update logic --- timing/cu/issuearbiter.go | 67 +++++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index 7171bb96..b1e251d9 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -10,7 +10,7 @@ type IssueArbiter struct { // NewIssueArbiter returns a newly created IssueArbiter func NewIssueArbiter() *IssueArbiter { a := new(IssueArbiter) - a.lastSIMDID = -1 + a.lastSIMDID = 0 return a } @@ -23,38 +23,57 @@ func (a *IssueArbiter) Arbitrate( return []*wavefront.Wavefront{} } - originalSIMDID := a.lastSIMDID + wfToIssue := make([]*wavefront.Wavefront, 0) + for i := 0; i < len(wfPools); i++ { + simdID := (a.lastSIMDID + i) % len(wfPools) - list := make([]*wavefront.Wavefront, 0) - for len(list) == 0 { - a.moveToNextSIMD(wfPools) - for len(wfPools[a.lastSIMDID].wfs) == 0 { - if a.lastSIMDID == originalSIMDID { - break + typeMask := make([]bool, 7) + wfPool := wfPools[simdID] + for _, wf := range wfPool.wfs { + if wf.State != wavefront.WfReady || wf.InstToIssue == nil { + continue } - a.moveToNextSIMD(wfPools) - } - - if len(wfPools[a.lastSIMDID].wfs) != 0 { - typeMask := make([]bool, 7) - wfPool := wfPools[a.lastSIMDID] - for _, wf := range wfPool.wfs { - if wf.State != wavefront.WfReady || wf.InstToIssue == nil { - continue - } - if typeMask[wf.InstToIssue.ExeUnit] == false { - list = append(list, wf) - typeMask[wf.InstToIssue.ExeUnit] = true - } + if typeMask[wf.InstToIssue.ExeUnit] == false { + wfToIssue = append(wfToIssue, wf) + typeMask[wf.InstToIssue.ExeUnit] = true } } - if a.lastSIMDID == originalSIMDID { + if len(wfToIssue) != 0 { + a.lastSIMDID = simdID break } } - return list + + // for len(wfToIssue) == 0 { + // a.moveToNextSIMD(wfPools) + // for len(wfPools[a.lastSIMDID].wfs) == 0 { + // if a.lastSIMDID == originalSIMDID { + // break + // } + // a.moveToNextSIMD(wfPools) + // } + + // typeMask := make([]bool, 7) + // wfPool := wfPools[a.lastSIMDID] + // for _, wf := range wfPool.wfs { + // if wf.State != wavefront.WfReady || wf.InstToIssue == nil { + // continue + // } + + // if typeMask[wf.InstToIssue.ExeUnit] == false { + // wfToIssue = append(wfToIssue, wf) + // typeMask[wf.InstToIssue.ExeUnit] = true + // } + // } + + // if a.lastSIMDID == originalSIMDID { + // break + // } + // } + + return wfToIssue } func (a *IssueArbiter) moveToNextSIMD(wfPools []*WavefrontPool) { From e0717d9aab967c5ffefd15575e74a23351ebd226 Mon Sep 17 00:00:00 2001 From: Yifan Sun Date: Mon, 18 Dec 2023 10:25:54 -0500 Subject: [PATCH 07/15] Fix unit test --- timing/cu/issuearbiter.go | 27 --------------------------- timing/cu/vectormemoryunit_test.go | 2 ++ 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/timing/cu/issuearbiter.go b/timing/cu/issuearbiter.go index b1e251d9..2992dbe8 100644 --- a/timing/cu/issuearbiter.go +++ b/timing/cu/issuearbiter.go @@ -46,33 +46,6 @@ func (a *IssueArbiter) Arbitrate( } } - // for len(wfToIssue) == 0 { - // a.moveToNextSIMD(wfPools) - // for len(wfPools[a.lastSIMDID].wfs) == 0 { - // if a.lastSIMDID == originalSIMDID { - // break - // } - // a.moveToNextSIMD(wfPools) - // } - - // typeMask := make([]bool, 7) - // wfPool := wfPools[a.lastSIMDID] - // for _, wf := range wfPool.wfs { - // if wf.State != wavefront.WfReady || wf.InstToIssue == nil { - // continue - // } - - // if typeMask[wf.InstToIssue.ExeUnit] == false { - // wfToIssue = append(wfToIssue, wf) - // typeMask[wf.InstToIssue.ExeUnit] = true - // } - // } - - // if a.lastSIMDID == originalSIMDID { - // break - // } - // } - return wfToIssue } diff --git a/timing/cu/vectormemoryunit_test.go b/timing/cu/vectormemoryunit_test.go index 4c5cc62a..0e067254 100644 --- a/timing/cu/vectormemoryunit_test.go +++ b/timing/cu/vectormemoryunit_test.go @@ -91,6 +91,7 @@ var _ = Describe("Vector Memory Unit", func() { transactions[i].Read = read } coalescer.EXPECT().generateMemTransactions(wave).Return(transactions) + instBuffer.EXPECT().Peek().Return(vectorMemInst{wavefront: wave}) instBuffer.EXPECT().Pop().Return(vectorMemInst{wavefront: wave}) madeProgress := vecMemUnit.instToTransaction(10) @@ -122,6 +123,7 @@ var _ = Describe("Vector Memory Unit", func() { transactions[i].Write = write } coalescer.EXPECT().generateMemTransactions(wave).Return(transactions) + instBuffer.EXPECT().Peek().Return(vectorMemInst{wavefront: wave}) instBuffer.EXPECT().Pop().Return(vectorMemInst{wavefront: wave}) madeProgress := vecMemUnit.instToTransaction(10) From 57df83794285e6206b8db30faf390ba63f976dde Mon Sep 17 00:00:00 2001 From: Yifan Sun Date: Mon, 18 Dec 2023 10:41:19 -0500 Subject: [PATCH 08/15] Remove concurrent kernel in test --- tests/acceptance/cases.go | 56 +++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/tests/acceptance/cases.go b/tests/acceptance/cases.go index 8a7a23e0..c7d37a6f 100644 --- a/tests/acceptance/cases.go +++ b/tests/acceptance/cases.go @@ -801,32 +801,32 @@ var benchmarks = []benchmark{ {gpus: []int{1, 2, 3, 4}, timing: true, parallel: true, unifiedGPU: true, unifiedMemory: true}, }, }, - { - benchmarkPath: "", - executablePath: "../../samples/concurrentkernel", - executable: "concurrentkernel", - sizeArgs: []string{}, - cases: []benchmarkCase{ - {gpus: []int{1}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: false}, - }, - }, - { - benchmarkPath: "", - executablePath: "../../samples/concurrentworkload", - executable: "concurrentworkload", - sizeArgs: []string{}, - cases: []benchmarkCase{ - {gpus: []int{1, 2, 3, 4}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1, 2, 3, 4}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1, 2, 3, 4}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: false}, - {gpus: []int{1, 2, 3, 4}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: false}, - // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: true}, - // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: true}, - // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: true}, - // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: true}, - }, - }, + // { + // benchmarkPath: "", + // executablePath: "../../samples/concurrentkernel", + // executable: "concurrentkernel", + // sizeArgs: []string{}, + // cases: []benchmarkCase{ + // {gpus: []int{1}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: false}, + // }, + // }, + // { + // benchmarkPath: "", + // executablePath: "../../samples/concurrentworkload", + // executable: "concurrentworkload", + // sizeArgs: []string{}, + // cases: []benchmarkCase{ + // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: false}, + // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: false, unifiedGPU: false, unifiedMemory: true}, + // {gpus: []int{1, 2, 3, 4}, timing: false, parallel: true, unifiedGPU: false, unifiedMemory: true}, + // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: false, unifiedGPU: false, unifiedMemory: true}, + // {gpus: []int{1, 2, 3, 4}, timing: true, parallel: true, unifiedGPU: false, unifiedMemory: true}, + // }, + // }, } From 4814ca21adccdde54c3df8ca22d86e5f5aa0d0fb Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Dec 2024 12:09:21 +0100 Subject: [PATCH 09/15] [Sampling]: add wavefront sampling into MGPUSim. To merge to v3 branch --- kernels/grid.go | 3 + samples/runner/runner.go | 3 +- samplinglib/stableengine.go | 75 +++++++++++ samplinglib/wfsampling.go | 125 +++++++++++++++++++ timing/cp/commandprocessor.go | 5 +- timing/cp/internal/dispatching/dispatcher.go | 12 +- timing/cu/computeunit.go | 108 +++++++++++++--- timing/cu/scheduler.go | 12 +- timing/wavefront/wavefront.go | 11 +- timing/wavefront/wfcompletionevent.go | 24 ++++ 10 files changed, 355 insertions(+), 23 deletions(-) create mode 100644 samplinglib/stableengine.go create mode 100644 samplinglib/wfsampling.go create mode 100644 timing/wavefront/wfcompletionevent.go diff --git a/kernels/grid.go b/kernels/grid.go index 416faf01..fceb08e5 100644 --- a/kernels/grid.go +++ b/kernels/grid.go @@ -57,6 +57,9 @@ type Wavefront struct { InitExecMask uint64 WorkItems []*WorkItem + //for sampling + Finishtime sim.VTimeInSec + Issuetime sim.VTimeInSec } // NewWavefront returns a new Wavefront. diff --git a/samples/runner/runner.go b/samples/runner/runner.go index 77839b01..9866e04a 100644 --- a/samples/runner/runner.go +++ b/samples/runner/runner.go @@ -15,6 +15,7 @@ import ( "github.com/sarchlab/akita/v3/tracing" "github.com/sarchlab/mgpusim/v3/benchmarks" "github.com/sarchlab/mgpusim/v3/driver" + "github.com/sarchlab/mgpusim/v3/samplinglib" "github.com/tebeka/atexit" ) @@ -64,7 +65,7 @@ func (r *Runner) Init() *Runner { r.parseGPUFlag() log.SetFlags(log.Llongfile | log.Ldate | log.Ltime) - + samplinglib.InitSampledEngine() if r.Timing { r.buildTimingPlatform() } else { diff --git a/samplinglib/stableengine.go b/samplinglib/stableengine.go new file mode 100644 index 00000000..173383e0 --- /dev/null +++ b/samplinglib/stableengine.go @@ -0,0 +1,75 @@ +package samplinglib + +import ( + "github.com/sarchlab/akita/v3/sim" +) + +type WFFeature struct { + Issuetime sim.VTimeInSec + Finishtime sim.VTimeInSec +} + +type StableEngine struct { + issuetime_sum sim.VTimeInSec + finishtime_sum sim.VTimeInSec + intervaltime_sum sim.VTimeInSec + mix_sum sim.VTimeInSec + issuetime_square_sum sim.VTimeInSec + rate float64 + granulary int + Wffeatures []WFFeature + boundary float64 + enableSampled bool + predTime sim.VTimeInSec +} + +func (stable_engine *StableEngine) Analysis() { + + rate_bottom := sim.VTimeInSec(stable_engine.granulary)*stable_engine.issuetime_square_sum - stable_engine.issuetime_sum*stable_engine.issuetime_sum + rate_top := sim.VTimeInSec(stable_engine.granulary)*stable_engine.mix_sum - stable_engine.issuetime_sum*stable_engine.finishtime_sum + rate := float64(rate_top / rate_bottom) + stable_engine.rate = rate + boundary := stable_engine.boundary + stable_engine.predTime = stable_engine.intervaltime_sum / sim.VTimeInSec(stable_engine.granulary) + if rate >= (1-boundary) && rate <= (1+boundary) { + stable_engine.enableSampled = true + } else { + stable_engine.enableSampled = false + } +} +func (stable_engine *StableEngine) Reset() { + stable_engine.Wffeatures = nil + stable_engine.issuetime_sum = 0 + stable_engine.finishtime_sum = 0 + stable_engine.intervaltime_sum = 0 + stable_engine.mix_sum = 0 + stable_engine.issuetime_square_sum = 0 + stable_engine.predTime = 0 + stable_engine.enableSampled = false +} +func (stable_engine *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) { + wffeature := WFFeature{ + Issuetime: issuetime, + Finishtime: finishtime, + } + + stable_engine.Wffeatures = append(stable_engine.Wffeatures, wffeature) + stable_engine.issuetime_sum += issuetime + stable_engine.finishtime_sum += finishtime + stable_engine.mix_sum += finishtime * issuetime + stable_engine.issuetime_square_sum += issuetime * issuetime + stable_engine.intervaltime_sum += (finishtime - issuetime) + if len(stable_engine.Wffeatures) == stable_engine.granulary { + stable_engine.Analysis() + ///delete old data + wffeature2 := stable_engine.Wffeatures[0] + stable_engine.Wffeatures = stable_engine.Wffeatures[1:] + issuetime = wffeature2.Issuetime + finishtime = wffeature2.Finishtime + stable_engine.issuetime_sum -= issuetime + stable_engine.finishtime_sum -= finishtime + stable_engine.mix_sum -= finishtime * issuetime + stable_engine.issuetime_square_sum -= issuetime * issuetime + stable_engine.intervaltime_sum -= (finishtime - issuetime) + } +} diff --git a/samplinglib/wfsampling.go b/samplinglib/wfsampling.go new file mode 100644 index 00000000..ab9c82f0 --- /dev/null +++ b/samplinglib/wfsampling.go @@ -0,0 +1,125 @@ +package samplinglib + +import ( + "flag" + "log" + "time" + + "github.com/sarchlab/akita/v3/sim" +) + +var SampledRunnerFlag = flag.Bool("wf-sampling", false, "enable wavefront-level sampled simulation.") +var SampledRunnerThresholdFlag = flag.Float64("sampled-threshold", 0.03, "the threshold of the sampled execution to enable sampling simulation.") +var SampledRunnerGranularyFlag = flag.Int("sampled-granulary", 1024, "the granulary of the sampled execution to collect and analyze data.") + +type SampledEngine struct { + predTime sim.VTimeInSec + enableSampled bool + disableEngine bool + Simtime float64 `json:"simtime"` + Walltime float64 `json:"walltime"` + FullSimWalltime float64 `json:"fullsimwalltime"` + FullSimWalltimeStart time.Time + dataidx uint64 + stable_engine *StableEngine + short_stable_engine *StableEngine + predTimeSum sim.VTimeInSec + predTimeNum uint64 + granulary int +} + +func (sampled_engine *SampledEngine) Reset() { + sampled_engine.FullSimWalltimeStart = time.Now() + sampled_engine.stable_engine.Reset() + sampled_engine.short_stable_engine.Reset() + sampled_engine.predTime = 0 + sampled_engine.predTimeNum = 0 + sampled_engine.predTimeSum = 0 + sampled_engine.dataidx = 0 + sampled_engine.enableSampled = false +} + +// const granulary = 512 +func NewSampledEngine(granulary int, boundary float64, control bool) *SampledEngine { + + stable_engine := &StableEngine{ + granulary: granulary, + boundary: boundary, + } + short_stable_engine := &StableEngine{ + granulary: granulary / 2, + boundary: boundary, + } + ret := &SampledEngine{ + stable_engine: stable_engine, + short_stable_engine: short_stable_engine, + granulary: granulary / 2, + } + ret.Reset() + if control { + ret.disableEngine = false + } + return ret +} + +var Sampledengine *SampledEngine + +func InitSampledEngine() { + Sampledengine = NewSampledEngine(*SampledRunnerGranularyFlag, *SampledRunnerThresholdFlag, false) + if *SampledRunnerFlag { + Sampledengine.Enable() + } else { + Sampledengine.Disabled() + } +} + +func (sampled_engine *SampledEngine) Disabled() { + sampled_engine.disableEngine = true +} +func (sampled_engine *SampledEngine) Enable() { + sampled_engine.disableEngine = false +} +func (sampled_engine *SampledEngine) IfDisable() bool { + return sampled_engine.disableEngine +} +func (sampled_engine *SampledEngine) Collect(issuetime sim.VTimeInSec, finishtime sim.VTimeInSec) { + if sampled_engine.enableSampled || sampled_engine.disableEngine { //we do not need to collect data if sampling is enabled + return + } + + sampled_engine.dataidx++ + if sampled_engine.dataidx < 1024 { // discard the first 1024 data + return + } + + sampled_engine.stable_engine.Collect(issuetime, finishtime) + sampled_engine.short_stable_engine.Collect(issuetime, finishtime) + stable_engine := sampled_engine.stable_engine + short_stable_engine := sampled_engine.short_stable_engine + + if stable_engine.enableSampled { + + long_time := stable_engine.predTime + short_time := short_stable_engine.predTime + sampled_engine.predTime = short_stable_engine.predTime + diff := float64((long_time - short_time) / (long_time + short_time)) + + diff_boundary := *SampledRunnerThresholdFlag + if diff <= diff_boundary && diff >= -diff_boundary { + sampled_engine.enableSampled = true + sampled_engine.predTime = short_time + sampled_engine.predTimeSum = short_time * sim.VTimeInSec(sampled_engine.granulary) + sampled_engine.predTimeNum = uint64(sampled_engine.granulary) + } + + } else if short_stable_engine.enableSampled { + sampled_engine.predTime = stable_engine.predTime + } + if sampled_engine.enableSampled { + log.Printf("Warp Sampling is enabled") + } +} + +func (sampled_engine *SampledEngine) Predict() (sim.VTimeInSec, bool) { + return sampled_engine.predTime, sampled_engine.enableSampled +} diff --git a/timing/cp/commandprocessor.go b/timing/cp/commandprocessor.go index eee71d94..064c2e7d 100644 --- a/timing/cp/commandprocessor.go +++ b/timing/cp/commandprocessor.go @@ -8,6 +8,7 @@ import ( "github.com/sarchlab/akita/v3/sim" "github.com/sarchlab/akita/v3/tracing" "github.com/sarchlab/mgpusim/v3/protocol" + "github.com/sarchlab/mgpusim/v3/samplinglib" "github.com/sarchlab/mgpusim/v3/timing/cp/internal/dispatching" "github.com/sarchlab/mgpusim/v3/timing/cp/internal/resource" "github.com/sarchlab/mgpusim/v3/timing/pagemigrationcontroller" @@ -297,7 +298,9 @@ func (p *CommandProcessor) processLaunchKernelReq( if d == nil { return false } - + if *samplinglib.SampledRunnerFlag { + samplinglib.Sampledengine.Reset() + } d.StartDispatching(req) p.ToDriver.Retrieve(now) diff --git a/timing/cp/internal/dispatching/dispatcher.go b/timing/cp/internal/dispatching/dispatcher.go index bb666f7e..4d333629 100644 --- a/timing/cp/internal/dispatching/dispatcher.go +++ b/timing/cp/internal/dispatching/dispatcher.go @@ -9,6 +9,7 @@ import ( "github.com/sarchlab/akita/v3/tracing" "github.com/sarchlab/mgpusim/v3/kernels" "github.com/sarchlab/mgpusim/v3/protocol" + "github.com/sarchlab/mgpusim/v3/samplinglib" "github.com/sarchlab/mgpusim/v3/timing/cp/internal/resource" ) @@ -123,9 +124,18 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { case *protocol.WGCompletionMsg: count := 0 for _, rspToID := range msg.RspTo { - _, ok := d.inflightWGs[rspToID] + location, ok := d.inflightWGs[rspToID] if ok { count += 1 + ///sampling + if *samplinglib.SampledRunnerFlag { + + for _, l := range location.locations { + wavefront := l.Wavefront + samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime) + + } + } } } diff --git a/timing/cu/computeunit.go b/timing/cu/computeunit.go index d6d8f2a1..b3956c21 100644 --- a/timing/cu/computeunit.go +++ b/timing/cu/computeunit.go @@ -12,6 +12,7 @@ import ( "github.com/sarchlab/mgpusim/v3/insts" "github.com/sarchlab/mgpusim/v3/kernels" "github.com/sarchlab/mgpusim/v3/protocol" + "github.com/sarchlab/mgpusim/v3/samplinglib" "github.com/sarchlab/mgpusim/v3/timing/wavefront" ) @@ -71,6 +72,8 @@ type ComputeUnit struct { currentFlushReq *protocol.CUPipelineFlushReq currentRestartReq *protocol.CUPipelineRestartReq + //for sampling + wftime map[string]sim.VTimeInSec } // ControlPort returns the port that can receive controlling messages from the @@ -308,7 +311,55 @@ func (cu *ComputeUnit) processInputFromACE(now sim.VTimeInSec) bool { panic("unknown req type") } } +func (cu *ComputeUnit) Handle(evt sim.Event) error { + ctx := sim.HookCtx{ + Domain: cu, + Pos: sim.HookPosBeforeEvent, + Item: evt, + } + cu.InvokeHook(ctx) + + cu.Lock() + + defer cu.Unlock() + + switch evt := evt.(type) { + case *wavefront.WfCompletionEvent: + cu.handleWfCompletionEvent(evt) + default: + log.Panicf("Unable to process evevt of type %s", + reflect.TypeOf(evt)) + } + + ctx.Pos = sim.HookPosAfterEvent + cu.InvokeHook(ctx) + return nil +} +func (cu *ComputeUnit) handleWfCompletionEvent(evt *wavefront.WfCompletionEvent) error { + wf := evt.Wf + wf.State = wavefront.WfCompleted + s_ := cu.Scheduler + s := s_.(*SchedulerImpl) + if s.areAllOtherWfsInWGCompleted(wf.WG, wf) { + now := evt.Time() + + done := s.sendWGCompletionMessage(now, wf.WG) + if !done { + newEvent := wavefront.NewWfCompletionEvent(cu.Freq.NextTick(now), cu, wf) + cu.Engine.Schedule(newEvent) + return nil + } + + s.resetRegisterValue(wf) + cu.clearWGResource(wf.WG) + tracing.EndTask(wf.UID, cu) + tracing.TraceReqComplete(wf.WG.MapReq, cu) + + return nil + } + return nil +} func (cu *ComputeUnit) handleMapWGReq( now sim.VTimeInSec, req *protocol.MapWGReq, @@ -317,20 +368,49 @@ func (cu *ComputeUnit) handleMapWGReq( tracing.TraceReqReceive(req, cu) - for i, wf := range wg.Wfs { - location := req.Wavefronts[i] - cu.WfPools[location.SIMDID].AddWf(wf) - cu.WfDispatcher.DispatchWf(now, wf, req.Wavefronts[i]) - wf.State = wavefront.WfReady + //sampling + skip_simulate := false + if *samplinglib.SampledRunnerFlag { + for _, wf := range wg.Wfs { + cu.wftime[wf.UID] = now + } + wfpredicttime, wfsampled := samplinglib.Sampledengine.Predict() + predtime := wfpredicttime + skip_simulate = wfsampled + for _, wf := range wg.Wfs { + + if skip_simulate { + predicted_time := predtime + now + wf.State = wavefront.WfSampledCompleted + newEvent := wavefront.NewWfCompletionEvent(predicted_time, cu, wf) + cu.Engine.Schedule(newEvent) + tracing.StartTask(wf.UID, + tracing.MsgIDAtReceiver(req, cu), + cu, + "wavefront", + "wavefront", + nil, + ) + } + } - tracing.StartTaskWithSpecificLocation(wf.UID, - tracing.MsgIDAtReceiver(req, cu), - cu, - "wavefront", - "wavefront", - cu.Name()+".WFPool", - nil, - ) + } + if !skip_simulate { + for i, wf := range wg.Wfs { + location := req.Wavefronts[i] + cu.WfPools[location.SIMDID].AddWf(wf) + cu.WfDispatcher.DispatchWf(now, wf, req.Wavefronts[i]) + wf.State = wavefront.WfReady + + tracing.StartTaskWithSpecificLocation(wf.UID, + tracing.MsgIDAtReceiver(req, cu), + cu, + "wavefront", + "wavefront", + cu.Name()+".WFPool", + nil, + ) + } } cu.running = true @@ -809,6 +889,6 @@ func NewComputeUnit( cu.ToScalarMem = sim.NewLimitNumMsgPort(cu, 4, name+".ToScalarMem") cu.ToVectorMem = sim.NewLimitNumMsgPort(cu, 4, name+".ToVectorMem") cu.ToCP = sim.NewLimitNumMsgPort(cu, 4, name+".ToCP") - + cu.wftime = make(map[string]sim.VTimeInSec) return cu } diff --git a/timing/cu/scheduler.go b/timing/cu/scheduler.go index e8edc542..157ea2a2 100644 --- a/timing/cu/scheduler.go +++ b/timing/cu/scheduler.go @@ -8,6 +8,7 @@ import ( "github.com/sarchlab/akita/v3/tracing" "github.com/sarchlab/mgpusim/v3/insts" "github.com/sarchlab/mgpusim/v3/protocol" + "github.com/sarchlab/mgpusim/v3/samplinglib" "github.com/sarchlab/mgpusim/v3/timing/wavefront" ) @@ -270,7 +271,16 @@ func (s *SchedulerImpl) evalSEndPgm( wf.OutstandingScalarMemAccess > 0 { return false, false } - + ////sampling + if *samplinglib.SampledRunnerFlag { + issuetime, found := s.cu.wftime[wf.UID] + if found { + finishtime := now + wf.Finishtime = finishtime + wf.Issuetime = issuetime + delete(s.cu.wftime, wf.UID) + } + } if s.areAllOtherWfsInWGCompleted(wf.WG, wf) { done := s.sendWGCompletionMessage(now, wf.WG) if !done { diff --git a/timing/wavefront/wavefront.go b/timing/wavefront/wavefront.go index b95ae2ad..618331e9 100644 --- a/timing/wavefront/wavefront.go +++ b/timing/wavefront/wavefront.go @@ -15,11 +15,12 @@ type WfState int // A list of all possible WfState const ( - WfDispatching WfState = iota // Dispatching in progress, not ready to run - WfReady // Allow the scheduler to schedule instruction - WfRunning // Instruction in fight - WfCompleted // Wavefront completed - WfAtBarrier // Wavefront at barrier + WfDispatching WfState = iota // Dispatching in progress, not ready to run + WfReady // Allow the scheduler to schedule instruction + WfRunning // Instruction in fight + WfCompleted // Wavefront completed + WfAtBarrier // Wavefront at barrier + WfSampledCompleted // Wavefront completed at Sampling ) // A Wavefront in the timing package contains the information of the progress diff --git a/timing/wavefront/wfcompletionevent.go b/timing/wavefront/wfcompletionevent.go new file mode 100644 index 00000000..dec10f35 --- /dev/null +++ b/timing/wavefront/wfcompletionevent.go @@ -0,0 +1,24 @@ +package wavefront + +import ( + "github.com/sarchlab/akita/v3/sim" + // "gitlab.com/akita/mgpusim/v3/timing/wavefront" +) + +// A WfCompletionEvent marks the completion of a wavefront +type WfCompletionEvent struct { + *sim.EventBase + Wf *Wavefront +} + +// NewWfCompletionEvent returns a newly constructed WfCompleteEvent +func NewWfCompletionEvent( + time sim.VTimeInSec, + handler sim.Handler, + wf *Wavefront, +) *WfCompletionEvent { + evt := new(WfCompletionEvent) + evt.EventBase = sim.NewEventBase(time, handler) + evt.Wf = wf + return evt +} From bc7aec1f3992f355b6c48197d1c68e6d68708717 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Dec 2024 15:52:49 +0100 Subject: [PATCH 10/15] [Sampling]: fixed the lint issues --- samplinglib/stableengine.go | 68 ++++++++++++++++++---------------- samplinglib/wfsampling.go | 74 +++++++++++++++++++++---------------- timing/cu/computeunit.go | 20 +++++----- 3 files changed, 89 insertions(+), 73 deletions(-) diff --git a/samplinglib/stableengine.go b/samplinglib/stableengine.go index 173383e0..64a57d91 100644 --- a/samplinglib/stableengine.go +++ b/samplinglib/stableengine.go @@ -4,49 +4,55 @@ import ( "github.com/sarchlab/akita/v3/sim" ) +// WFFeature is used for recording the runtime info type WFFeature struct { Issuetime sim.VTimeInSec Finishtime sim.VTimeInSec } +// StableEngine is used to detect if the feature detecting is stable or not type StableEngine struct { - issuetime_sum sim.VTimeInSec - finishtime_sum sim.VTimeInSec - intervaltime_sum sim.VTimeInSec - mix_sum sim.VTimeInSec - issuetime_square_sum sim.VTimeInSec - rate float64 - granulary int - Wffeatures []WFFeature - boundary float64 - enableSampled bool - predTime sim.VTimeInSec + issuetimeSum sim.VTimeInSec + finishtimeSum sim.VTimeInSec + intervaltimeSum sim.VTimeInSec + mixSum sim.VTimeInSec + issuetimeSquareSum sim.VTimeInSec + rate float64 + granulary int + Wffeatures []WFFeature + boundary float64 + enableSampled bool + predTime sim.VTimeInSec } +// Analysis the data func (stable_engine *StableEngine) Analysis() { - - rate_bottom := sim.VTimeInSec(stable_engine.granulary)*stable_engine.issuetime_square_sum - stable_engine.issuetime_sum*stable_engine.issuetime_sum - rate_top := sim.VTimeInSec(stable_engine.granulary)*stable_engine.mix_sum - stable_engine.issuetime_sum*stable_engine.finishtime_sum - rate := float64(rate_top / rate_bottom) + rateBottom := sim.VTimeInSec(stable_engine.granulary)*stable_engine.issuetimeSquareSum - stable_engine.issuetimeSum*stable_engine.issuetimeSum + rateTop := sim.VTimeInSec(stable_engine.granulary)*stable_engine.mixSum - stable_engine.issuetimeSum*stable_engine.finishtimeSum + rate := float64(rateTop / rateBottom) stable_engine.rate = rate boundary := stable_engine.boundary - stable_engine.predTime = stable_engine.intervaltime_sum / sim.VTimeInSec(stable_engine.granulary) + stable_engine.predTime = stable_engine.intervaltimeSum / sim.VTimeInSec(stable_engine.granulary) if rate >= (1-boundary) && rate <= (1+boundary) { stable_engine.enableSampled = true } else { stable_engine.enableSampled = false } } + +// Reset all information func (stable_engine *StableEngine) Reset() { stable_engine.Wffeatures = nil - stable_engine.issuetime_sum = 0 - stable_engine.finishtime_sum = 0 - stable_engine.intervaltime_sum = 0 - stable_engine.mix_sum = 0 - stable_engine.issuetime_square_sum = 0 + stable_engine.issuetimeSum = 0 + stable_engine.finishtimeSum = 0 + stable_engine.intervaltimeSum = 0 + stable_engine.mixSum = 0 + stable_engine.issuetimeSquareSum = 0 stable_engine.predTime = 0 stable_engine.enableSampled = false } + +// Collect data func (stable_engine *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) { wffeature := WFFeature{ Issuetime: issuetime, @@ -54,11 +60,11 @@ func (stable_engine *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) } stable_engine.Wffeatures = append(stable_engine.Wffeatures, wffeature) - stable_engine.issuetime_sum += issuetime - stable_engine.finishtime_sum += finishtime - stable_engine.mix_sum += finishtime * issuetime - stable_engine.issuetime_square_sum += issuetime * issuetime - stable_engine.intervaltime_sum += (finishtime - issuetime) + stable_engine.issuetimeSum += issuetime + stable_engine.finishtimeSum += finishtime + stable_engine.mixSum += finishtime * issuetime + stable_engine.issuetimeSquareSum += issuetime * issuetime + stable_engine.intervaltimeSum += (finishtime - issuetime) if len(stable_engine.Wffeatures) == stable_engine.granulary { stable_engine.Analysis() ///delete old data @@ -66,10 +72,10 @@ func (stable_engine *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) stable_engine.Wffeatures = stable_engine.Wffeatures[1:] issuetime = wffeature2.Issuetime finishtime = wffeature2.Finishtime - stable_engine.issuetime_sum -= issuetime - stable_engine.finishtime_sum -= finishtime - stable_engine.mix_sum -= finishtime * issuetime - stable_engine.issuetime_square_sum -= issuetime * issuetime - stable_engine.intervaltime_sum -= (finishtime - issuetime) + stable_engine.issuetimeSum -= issuetime + stable_engine.finishtimeSum -= finishtime + stable_engine.mixSum -= finishtime * issuetime + stable_engine.issuetimeSquareSum -= issuetime * issuetime + stable_engine.intervaltimeSum -= (finishtime - issuetime) } } diff --git a/samplinglib/wfsampling.go b/samplinglib/wfsampling.go index ab9c82f0..727f313d 100644 --- a/samplinglib/wfsampling.go +++ b/samplinglib/wfsampling.go @@ -8,10 +8,16 @@ import ( "github.com/sarchlab/akita/v3/sim" ) +// SampledRunnerFlag is used to enable wf sampling var SampledRunnerFlag = flag.Bool("wf-sampling", false, "enable wavefront-level sampled simulation.") + +// SampledRunnerThresholdFlag is used to set the threshold of the sampling var SampledRunnerThresholdFlag = flag.Float64("sampled-threshold", 0.03, "the threshold of the sampled execution to enable sampling simulation.") + +// SampledRunnerGranularyFlag is used to set the granulary of the sampling var SampledRunnerGranularyFlag = flag.Int("sampled-granulary", 1024, "the granulary of the sampled execution to collect and analyze data.") +// SampledEngine is used to detect if the wavefront sampling is stable or not. type SampledEngine struct { predTime sim.VTimeInSec enableSampled bool @@ -21,17 +27,18 @@ type SampledEngine struct { FullSimWalltime float64 `json:"fullsimwalltime"` FullSimWalltimeStart time.Time dataidx uint64 - stable_engine *StableEngine - short_stable_engine *StableEngine + stableEngine *StableEngine + shortStableEngine *StableEngine predTimeSum sim.VTimeInSec predTimeNum uint64 granulary int } +// Reset all status func (sampled_engine *SampledEngine) Reset() { sampled_engine.FullSimWalltimeStart = time.Now() - sampled_engine.stable_engine.Reset() - sampled_engine.short_stable_engine.Reset() + sampled_engine.stableEngine.Reset() + sampled_engine.shortStableEngine.Reset() sampled_engine.predTime = 0 sampled_engine.predTimeNum = 0 sampled_engine.predTimeSum = 0 @@ -39,21 +46,20 @@ func (sampled_engine *SampledEngine) Reset() { sampled_engine.enableSampled = false } -// const granulary = 512 +// NewSampledEngine is used to new a sampled engine for wavefront sampling func NewSampledEngine(granulary int, boundary float64, control bool) *SampledEngine { - - stable_engine := &StableEngine{ + stableEngine := &StableEngine{ granulary: granulary, boundary: boundary, } - short_stable_engine := &StableEngine{ + shortStableEngine := &StableEngine{ granulary: granulary / 2, boundary: boundary, } ret := &SampledEngine{ - stable_engine: stable_engine, - short_stable_engine: short_stable_engine, - granulary: granulary / 2, + stableEngine: stableEngine, + shortStableEngine: shortStableEngine, + granulary: granulary / 2, } ret.Reset() if control { @@ -62,8 +68,10 @@ func NewSampledEngine(granulary int, boundary float64, control bool) *SampledEng return ret } +// Sampledengine is used to monitor wavefront sampling var Sampledengine *SampledEngine +// InitSampledEngine is used to initial all status and data structure func InitSampledEngine() { Sampledengine = NewSampledEngine(*SampledRunnerGranularyFlag, *SampledRunnerThresholdFlag, false) if *SampledRunnerFlag { @@ -73,53 +81,55 @@ func InitSampledEngine() { } } +// Disabled the sampling engine func (sampled_engine *SampledEngine) Disabled() { sampled_engine.disableEngine = true } + +// Enable the sampling engine func (sampled_engine *SampledEngine) Enable() { sampled_engine.disableEngine = false } + +// IfDisable the sampling engine func (sampled_engine *SampledEngine) IfDisable() bool { return sampled_engine.disableEngine } + +// Collect the runtime information func (sampled_engine *SampledEngine) Collect(issuetime sim.VTimeInSec, finishtime sim.VTimeInSec) { if sampled_engine.enableSampled || sampled_engine.disableEngine { //we do not need to collect data if sampling is enabled return } - sampled_engine.dataidx++ if sampled_engine.dataidx < 1024 { // discard the first 1024 data return } - - sampled_engine.stable_engine.Collect(issuetime, finishtime) - sampled_engine.short_stable_engine.Collect(issuetime, finishtime) - stable_engine := sampled_engine.stable_engine - short_stable_engine := sampled_engine.short_stable_engine - - if stable_engine.enableSampled { - - long_time := stable_engine.predTime - short_time := short_stable_engine.predTime - sampled_engine.predTime = short_stable_engine.predTime - diff := float64((long_time - short_time) / (long_time + short_time)) - - diff_boundary := *SampledRunnerThresholdFlag - if diff <= diff_boundary && diff >= -diff_boundary { + sampled_engine.stableEngine.Collect(issuetime, finishtime) + sampled_engine.shortStableEngine.Collect(issuetime, finishtime) + stableEngine := sampled_engine.stableEngine + shortStableEngine := sampled_engine.shortStableEngine + if stableEngine.enableSampled { + longTime := stableEngine.predTime + shortTime := shortStableEngine.predTime + sampled_engine.predTime = shortStableEngine.predTime + diff := float64((longTime - shortTime) / (longTime + shortTime)) + diffBoundary := *SampledRunnerThresholdFlag + if diff <= diffBoundary && diff >= -diffBoundary { sampled_engine.enableSampled = true - sampled_engine.predTime = short_time - sampled_engine.predTimeSum = short_time * sim.VTimeInSec(sampled_engine.granulary) + sampled_engine.predTime = shortTime + sampled_engine.predTimeSum = shortTime * sim.VTimeInSec(sampled_engine.granulary) sampled_engine.predTimeNum = uint64(sampled_engine.granulary) } - - } else if short_stable_engine.enableSampled { - sampled_engine.predTime = stable_engine.predTime + } else if shortStableEngine.enableSampled { + sampled_engine.predTime = stableEngine.predTime } if sampled_engine.enableSampled { log.Printf("Warp Sampling is enabled") } } +// Predict the execution time of the next wavefronts func (sampled_engine *SampledEngine) Predict() (sim.VTimeInSec, bool) { return sampled_engine.predTime, sampled_engine.enableSampled } diff --git a/timing/cu/computeunit.go b/timing/cu/computeunit.go index b3956c21..25840c5f 100644 --- a/timing/cu/computeunit.go +++ b/timing/cu/computeunit.go @@ -311,6 +311,8 @@ func (cu *ComputeUnit) processInputFromACE(now sim.VTimeInSec) bool { panic("unknown req type") } } + +// Handle the wavefront completion events func (cu *ComputeUnit) Handle(evt sim.Event) error { ctx := sim.HookCtx{ Domain: cu, @@ -339,8 +341,8 @@ func (cu *ComputeUnit) Handle(evt sim.Event) error { func (cu *ComputeUnit) handleWfCompletionEvent(evt *wavefront.WfCompletionEvent) error { wf := evt.Wf wf.State = wavefront.WfCompleted - s_ := cu.Scheduler - s := s_.(*SchedulerImpl) + sTmp := cu.Scheduler + s := sTmp.(*SchedulerImpl) if s.areAllOtherWfsInWGCompleted(wf.WG, wf) { now := evt.Time() @@ -369,20 +371,19 @@ func (cu *ComputeUnit) handleMapWGReq( tracing.TraceReqReceive(req, cu) //sampling - skip_simulate := false + skipSimulate := false if *samplinglib.SampledRunnerFlag { for _, wf := range wg.Wfs { cu.wftime[wf.UID] = now } wfpredicttime, wfsampled := samplinglib.Sampledengine.Predict() predtime := wfpredicttime - skip_simulate = wfsampled + skipSimulate = wfsampled for _, wf := range wg.Wfs { - - if skip_simulate { - predicted_time := predtime + now + if skipSimulate { + predictedTime := predtime + now wf.State = wavefront.WfSampledCompleted - newEvent := wavefront.NewWfCompletionEvent(predicted_time, cu, wf) + newEvent := wavefront.NewWfCompletionEvent(predictedTime, cu, wf) cu.Engine.Schedule(newEvent) tracing.StartTask(wf.UID, tracing.MsgIDAtReceiver(req, cu), @@ -393,9 +394,8 @@ func (cu *ComputeUnit) handleMapWGReq( ) } } - } - if !skip_simulate { + if !skipSimulate { for i, wf := range wg.Wfs { location := req.Wavefronts[i] cu.WfPools[location.SIMDID].AddWf(wf) From d93085b700c73d19cb357545dbf6643eb643ce21 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Dec 2024 16:06:15 +0100 Subject: [PATCH 11/15] [Sampling]: fixed the lint issues --- samplinglib/stableengine.go | 1 + timing/cp/internal/dispatching/dispatcher.go | 14 ++++++-------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/samplinglib/stableengine.go b/samplinglib/stableengine.go index 64a57d91..d355f962 100644 --- a/samplinglib/stableengine.go +++ b/samplinglib/stableengine.go @@ -1,3 +1,4 @@ +// Package samplinglib provides tools for performing sampling simulation package samplinglib import ( diff --git a/timing/cp/internal/dispatching/dispatcher.go b/timing/cp/internal/dispatching/dispatcher.go index 4d333629..280cc2f5 100644 --- a/timing/cp/internal/dispatching/dispatcher.go +++ b/timing/cp/internal/dispatching/dispatcher.go @@ -127,14 +127,12 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { location, ok := d.inflightWGs[rspToID] if ok { count += 1 - ///sampling - if *samplinglib.SampledRunnerFlag { - - for _, l := range location.locations { - wavefront := l.Wavefront - samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime) - - } + } + ///sampling + if ok && (*samplinglib.SampledRunnerFlag) { + for _, l := range location.locations { + wavefront := l.Wavefront + samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime) } } } From b69ad43ede38615a663c9b0a727e774c8824310d Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Dec 2024 16:13:02 +0100 Subject: [PATCH 12/15] [Sampling]: fixed the lint issues --- timing/cp/internal/dispatching/dispatcher.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/timing/cp/internal/dispatching/dispatcher.go b/timing/cp/internal/dispatching/dispatcher.go index 280cc2f5..50926391 100644 --- a/timing/cp/internal/dispatching/dispatcher.go +++ b/timing/cp/internal/dispatching/dispatcher.go @@ -114,6 +114,15 @@ func (d *DispatcherImpl) Tick(now sim.VTimeInSec) (madeProgress bool) { return madeProgress } +func (d *DispatcherImpl) collectSamplingData(locations []protocol.WfDispatchLocation) { + if *samplinglib.SampledRunnerFlag { + for _, l := range locations { + wavefront := l.Wavefront + samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime) + } + } +} + func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { msg := d.dispatchingPort.Peek() if msg == nil { @@ -129,11 +138,8 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { count += 1 } ///sampling - if ok && (*samplinglib.SampledRunnerFlag) { - for _, l := range location.locations { - wavefront := l.Wavefront - samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime) - } + if ok { + d.collectSamplingData(location.locations) } } From 646473ad9fe20ae20dee2d1abeed3af4a5b40c1b Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Fri, 20 Dec 2024 16:19:03 +0100 Subject: [PATCH 13/15] [Sampling]: fixed the lint issues of cyclomatic complexity larger than 10 --- timing/cp/internal/dispatching/dispatcher.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/timing/cp/internal/dispatching/dispatcher.go b/timing/cp/internal/dispatching/dispatcher.go index 50926391..22b2b1a8 100644 --- a/timing/cp/internal/dispatching/dispatcher.go +++ b/timing/cp/internal/dispatching/dispatcher.go @@ -136,11 +136,10 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { location, ok := d.inflightWGs[rspToID] if ok { count += 1 - } - ///sampling - if ok { + ///sampling d.collectSamplingData(location.locations) } + } if count == 0 { From 57c056166fa229fd04da63da74794ea659ad1bf1 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Mon, 23 Dec 2024 11:11:57 +0100 Subject: [PATCH 14/15] [Sampling]: fixed a lint issue --- timing/cp/internal/dispatching/dispatcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/timing/cp/internal/dispatching/dispatcher.go b/timing/cp/internal/dispatching/dispatcher.go index 22b2b1a8..66ec63a6 100644 --- a/timing/cp/internal/dispatching/dispatcher.go +++ b/timing/cp/internal/dispatching/dispatcher.go @@ -139,7 +139,6 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool { ///sampling d.collectSamplingData(location.locations) } - } if count == 0 { From 3a8c5816498a3fdf488976668af5286d73d5f5f6 Mon Sep 17 00:00:00 2001 From: Liu Changxi Date: Mon, 23 Dec 2024 11:58:48 +0100 Subject: [PATCH 15/15] [Sampling]: fixed the lint issue of too many characters per line --- samplinglib/stableengine.go | 66 +++++++++++++++++----------------- samplinglib/wfsampling.go | 70 +++++++++++++++++++------------------ 2 files changed, 69 insertions(+), 67 deletions(-) diff --git a/samplinglib/stableengine.go b/samplinglib/stableengine.go index d355f962..3d554161 100644 --- a/samplinglib/stableengine.go +++ b/samplinglib/stableengine.go @@ -27,56 +27,56 @@ type StableEngine struct { } // Analysis the data -func (stable_engine *StableEngine) Analysis() { - rateBottom := sim.VTimeInSec(stable_engine.granulary)*stable_engine.issuetimeSquareSum - stable_engine.issuetimeSum*stable_engine.issuetimeSum - rateTop := sim.VTimeInSec(stable_engine.granulary)*stable_engine.mixSum - stable_engine.issuetimeSum*stable_engine.finishtimeSum +func (se *StableEngine) Analysis() { + rateBottom := sim.VTimeInSec(se.granulary)*se.issuetimeSquareSum - se.issuetimeSum*se.issuetimeSum + rateTop := sim.VTimeInSec(se.granulary)*se.mixSum - se.issuetimeSum*se.finishtimeSum rate := float64(rateTop / rateBottom) - stable_engine.rate = rate - boundary := stable_engine.boundary - stable_engine.predTime = stable_engine.intervaltimeSum / sim.VTimeInSec(stable_engine.granulary) + se.rate = rate + boundary := se.boundary + se.predTime = se.intervaltimeSum / sim.VTimeInSec(se.granulary) if rate >= (1-boundary) && rate <= (1+boundary) { - stable_engine.enableSampled = true + se.enableSampled = true } else { - stable_engine.enableSampled = false + se.enableSampled = false } } // Reset all information -func (stable_engine *StableEngine) Reset() { - stable_engine.Wffeatures = nil - stable_engine.issuetimeSum = 0 - stable_engine.finishtimeSum = 0 - stable_engine.intervaltimeSum = 0 - stable_engine.mixSum = 0 - stable_engine.issuetimeSquareSum = 0 - stable_engine.predTime = 0 - stable_engine.enableSampled = false +func (se *StableEngine) Reset() { + se.Wffeatures = nil + se.issuetimeSum = 0 + se.finishtimeSum = 0 + se.intervaltimeSum = 0 + se.mixSum = 0 + se.issuetimeSquareSum = 0 + se.predTime = 0 + se.enableSampled = false } // Collect data -func (stable_engine *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) { +func (se *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) { wffeature := WFFeature{ Issuetime: issuetime, Finishtime: finishtime, } - stable_engine.Wffeatures = append(stable_engine.Wffeatures, wffeature) - stable_engine.issuetimeSum += issuetime - stable_engine.finishtimeSum += finishtime - stable_engine.mixSum += finishtime * issuetime - stable_engine.issuetimeSquareSum += issuetime * issuetime - stable_engine.intervaltimeSum += (finishtime - issuetime) - if len(stable_engine.Wffeatures) == stable_engine.granulary { - stable_engine.Analysis() + se.Wffeatures = append(se.Wffeatures, wffeature) + se.issuetimeSum += issuetime + se.finishtimeSum += finishtime + se.mixSum += finishtime * issuetime + se.issuetimeSquareSum += issuetime * issuetime + se.intervaltimeSum += (finishtime - issuetime) + if len(se.Wffeatures) == se.granulary { + se.Analysis() ///delete old data - wffeature2 := stable_engine.Wffeatures[0] - stable_engine.Wffeatures = stable_engine.Wffeatures[1:] + wffeature2 := se.Wffeatures[0] + se.Wffeatures = se.Wffeatures[1:] issuetime = wffeature2.Issuetime finishtime = wffeature2.Finishtime - stable_engine.issuetimeSum -= issuetime - stable_engine.finishtimeSum -= finishtime - stable_engine.mixSum -= finishtime * issuetime - stable_engine.issuetimeSquareSum -= issuetime * issuetime - stable_engine.intervaltimeSum -= (finishtime - issuetime) + se.issuetimeSum -= issuetime + se.finishtimeSum -= finishtime + se.mixSum -= finishtime * issuetime + se.issuetimeSquareSum -= issuetime * issuetime + se.intervaltimeSum -= (finishtime - issuetime) } } diff --git a/samplinglib/wfsampling.go b/samplinglib/wfsampling.go index 727f313d..2f5536f0 100644 --- a/samplinglib/wfsampling.go +++ b/samplinglib/wfsampling.go @@ -12,10 +12,12 @@ import ( var SampledRunnerFlag = flag.Bool("wf-sampling", false, "enable wavefront-level sampled simulation.") // SampledRunnerThresholdFlag is used to set the threshold of the sampling -var SampledRunnerThresholdFlag = flag.Float64("sampled-threshold", 0.03, "the threshold of the sampled execution to enable sampling simulation.") +var SampledRunnerThresholdFlag = flag.Float64("sampled-threshold", 0.03, + "the threshold of the sampled execution to enable sampling simulation.") // SampledRunnerGranularyFlag is used to set the granulary of the sampling -var SampledRunnerGranularyFlag = flag.Int("sampled-granulary", 1024, "the granulary of the sampled execution to collect and analyze data.") +var SampledRunnerGranularyFlag = flag.Int("sampled-granulary", 1024, + "the granulary of the sampled execution to collect and analyze data.") // SampledEngine is used to detect if the wavefront sampling is stable or not. type SampledEngine struct { @@ -35,15 +37,15 @@ type SampledEngine struct { } // Reset all status -func (sampled_engine *SampledEngine) Reset() { - sampled_engine.FullSimWalltimeStart = time.Now() - sampled_engine.stableEngine.Reset() - sampled_engine.shortStableEngine.Reset() - sampled_engine.predTime = 0 - sampled_engine.predTimeNum = 0 - sampled_engine.predTimeSum = 0 - sampled_engine.dataidx = 0 - sampled_engine.enableSampled = false +func (se *SampledEngine) Reset() { + se.FullSimWalltimeStart = time.Now() + se.stableEngine.Reset() + se.shortStableEngine.Reset() + se.predTime = 0 + se.predTimeNum = 0 + se.predTimeSum = 0 + se.dataidx = 0 + se.enableSampled = false } // NewSampledEngine is used to new a sampled engine for wavefront sampling @@ -82,54 +84,54 @@ func InitSampledEngine() { } // Disabled the sampling engine -func (sampled_engine *SampledEngine) Disabled() { - sampled_engine.disableEngine = true +func (se *SampledEngine) Disabled() { + se.disableEngine = true } // Enable the sampling engine -func (sampled_engine *SampledEngine) Enable() { - sampled_engine.disableEngine = false +func (se *SampledEngine) Enable() { + se.disableEngine = false } // IfDisable the sampling engine -func (sampled_engine *SampledEngine) IfDisable() bool { - return sampled_engine.disableEngine +func (se *SampledEngine) IfDisable() bool { + return se.disableEngine } // Collect the runtime information -func (sampled_engine *SampledEngine) Collect(issuetime sim.VTimeInSec, finishtime sim.VTimeInSec) { - if sampled_engine.enableSampled || sampled_engine.disableEngine { //we do not need to collect data if sampling is enabled +func (se *SampledEngine) Collect(issuetime sim.VTimeInSec, finishtime sim.VTimeInSec) { + if se.enableSampled || se.disableEngine { //we do not need to collect data if sampling is enabled return } - sampled_engine.dataidx++ - if sampled_engine.dataidx < 1024 { // discard the first 1024 data + se.dataidx++ + if se.dataidx < 1024 { // discard the first 1024 data return } - sampled_engine.stableEngine.Collect(issuetime, finishtime) - sampled_engine.shortStableEngine.Collect(issuetime, finishtime) - stableEngine := sampled_engine.stableEngine - shortStableEngine := sampled_engine.shortStableEngine + se.stableEngine.Collect(issuetime, finishtime) + se.shortStableEngine.Collect(issuetime, finishtime) + stableEngine := se.stableEngine + shortStableEngine := se.shortStableEngine if stableEngine.enableSampled { longTime := stableEngine.predTime shortTime := shortStableEngine.predTime - sampled_engine.predTime = shortStableEngine.predTime + se.predTime = shortStableEngine.predTime diff := float64((longTime - shortTime) / (longTime + shortTime)) diffBoundary := *SampledRunnerThresholdFlag if diff <= diffBoundary && diff >= -diffBoundary { - sampled_engine.enableSampled = true - sampled_engine.predTime = shortTime - sampled_engine.predTimeSum = shortTime * sim.VTimeInSec(sampled_engine.granulary) - sampled_engine.predTimeNum = uint64(sampled_engine.granulary) + se.enableSampled = true + se.predTime = shortTime + se.predTimeSum = shortTime * sim.VTimeInSec(se.granulary) + se.predTimeNum = uint64(se.granulary) } } else if shortStableEngine.enableSampled { - sampled_engine.predTime = stableEngine.predTime + se.predTime = stableEngine.predTime } - if sampled_engine.enableSampled { + if se.enableSampled { log.Printf("Warp Sampling is enabled") } } // Predict the execution time of the next wavefronts -func (sampled_engine *SampledEngine) Predict() (sim.VTimeInSec, bool) { - return sampled_engine.predTime, sampled_engine.enableSampled +func (se *SampledEngine) Predict() (sim.VTimeInSec, bool) { + return se.predTime, se.enableSampled }