Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Sampling]: add wavefront sampling into MGPUSim. To merge to v3 branch #126

Open
wants to merge 18 commits into
base: v3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kernels/grid.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type Wavefront struct {
InitExecMask uint64

WorkItems []*WorkItem
//for sampling
Finishtime sim.VTimeInSec
Issuetime sim.VTimeInSec
}

// NewWavefront returns a new Wavefront.
Expand Down
3 changes: 2 additions & 1 deletion samples/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/sarchlab/akita/v3/tracing"
"github.com/sarchlab/mgpusim/v3/benchmarks"
"github.com/sarchlab/mgpusim/v3/driver"
"github.com/sarchlab/mgpusim/v3/samplinglib"
"github.com/tebeka/atexit"
)

Expand Down Expand Up @@ -64,7 +65,7 @@ func (r *Runner) Init() *Runner {
r.parseGPUFlag()

log.SetFlags(log.Llongfile | log.Ldate | log.Ltime)

samplinglib.InitSampledEngine()
if r.Timing {
r.buildTimingPlatform()
} else {
Expand Down
82 changes: 82 additions & 0 deletions samplinglib/stableengine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Package samplinglib provides tools for performing sampling simulation
package samplinglib

import (
"github.com/sarchlab/akita/v3/sim"
)

// WFFeature is used for recording the runtime info
type WFFeature struct {
Issuetime sim.VTimeInSec
Finishtime sim.VTimeInSec
}

// StableEngine is used to detect if the feature detecting is stable or not
type StableEngine struct {
issuetimeSum sim.VTimeInSec
finishtimeSum sim.VTimeInSec
intervaltimeSum sim.VTimeInSec
mixSum sim.VTimeInSec
issuetimeSquareSum sim.VTimeInSec
rate float64
granulary int
Wffeatures []WFFeature
boundary float64
enableSampled bool
predTime sim.VTimeInSec
}

// Analysis the data
func (se *StableEngine) Analysis() {
rateBottom := sim.VTimeInSec(se.granulary)*se.issuetimeSquareSum - se.issuetimeSum*se.issuetimeSum
rateTop := sim.VTimeInSec(se.granulary)*se.mixSum - se.issuetimeSum*se.finishtimeSum
rate := float64(rateTop / rateBottom)
se.rate = rate
boundary := se.boundary
se.predTime = se.intervaltimeSum / sim.VTimeInSec(se.granulary)
if rate >= (1-boundary) && rate <= (1+boundary) {
se.enableSampled = true
} else {
se.enableSampled = false
}
}

// Reset all information
func (se *StableEngine) Reset() {
se.Wffeatures = nil
se.issuetimeSum = 0
se.finishtimeSum = 0
se.intervaltimeSum = 0
se.mixSum = 0
se.issuetimeSquareSum = 0
se.predTime = 0
se.enableSampled = false
}

// Collect data
func (se *StableEngine) Collect(issuetime, finishtime sim.VTimeInSec) {
wffeature := WFFeature{
Issuetime: issuetime,
Finishtime: finishtime,
}

se.Wffeatures = append(se.Wffeatures, wffeature)
se.issuetimeSum += issuetime
se.finishtimeSum += finishtime
se.mixSum += finishtime * issuetime
se.issuetimeSquareSum += issuetime * issuetime
se.intervaltimeSum += (finishtime - issuetime)
if len(se.Wffeatures) == se.granulary {
se.Analysis()
///delete old data
wffeature2 := se.Wffeatures[0]
se.Wffeatures = se.Wffeatures[1:]
issuetime = wffeature2.Issuetime
finishtime = wffeature2.Finishtime
se.issuetimeSum -= issuetime
se.finishtimeSum -= finishtime
se.mixSum -= finishtime * issuetime
se.issuetimeSquareSum -= issuetime * issuetime
se.intervaltimeSum -= (finishtime - issuetime)
}
}
137 changes: 137 additions & 0 deletions samplinglib/wfsampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package samplinglib

import (
"flag"
"log"
"time"

"github.com/sarchlab/akita/v3/sim"
)

// SampledRunnerFlag is used to enable wf sampling
var SampledRunnerFlag = flag.Bool("wf-sampling", false, "enable wavefront-level sampled simulation.")

// SampledRunnerThresholdFlag is used to set the threshold of the sampling
var SampledRunnerThresholdFlag = flag.Float64("sampled-threshold", 0.03,
"the threshold of the sampled execution to enable sampling simulation.")

// SampledRunnerGranularyFlag is used to set the granulary of the sampling
var SampledRunnerGranularyFlag = flag.Int("sampled-granulary", 1024,
"the granulary of the sampled execution to collect and analyze data.")

// SampledEngine is used to detect if the wavefront sampling is stable or not.
type SampledEngine struct {
predTime sim.VTimeInSec
enableSampled bool
disableEngine bool
Simtime float64 `json:"simtime"`
Walltime float64 `json:"walltime"`
FullSimWalltime float64 `json:"fullsimwalltime"`
FullSimWalltimeStart time.Time
dataidx uint64
stableEngine *StableEngine
shortStableEngine *StableEngine
predTimeSum sim.VTimeInSec
predTimeNum uint64
granulary int
}

// Reset all status
func (se *SampledEngine) Reset() {
se.FullSimWalltimeStart = time.Now()
se.stableEngine.Reset()
se.shortStableEngine.Reset()
se.predTime = 0
se.predTimeNum = 0
se.predTimeSum = 0
se.dataidx = 0
se.enableSampled = false
}

// NewSampledEngine is used to new a sampled engine for wavefront sampling
func NewSampledEngine(granulary int, boundary float64, control bool) *SampledEngine {
stableEngine := &StableEngine{
granulary: granulary,
boundary: boundary,
}
shortStableEngine := &StableEngine{
granulary: granulary / 2,
boundary: boundary,
}
ret := &SampledEngine{
stableEngine: stableEngine,
shortStableEngine: shortStableEngine,
granulary: granulary / 2,
}
ret.Reset()
if control {
ret.disableEngine = false
}
return ret
}

// Sampledengine is used to monitor wavefront sampling
var Sampledengine *SampledEngine

// InitSampledEngine is used to initial all status and data structure
func InitSampledEngine() {
Sampledengine = NewSampledEngine(*SampledRunnerGranularyFlag, *SampledRunnerThresholdFlag, false)
if *SampledRunnerFlag {
Sampledengine.Enable()
} else {
Sampledengine.Disabled()
}
}

// Disabled the sampling engine
func (se *SampledEngine) Disabled() {
se.disableEngine = true
}

// Enable the sampling engine
func (se *SampledEngine) Enable() {
se.disableEngine = false
}

// IfDisable the sampling engine
func (se *SampledEngine) IfDisable() bool {
return se.disableEngine
}

// Collect the runtime information
func (se *SampledEngine) Collect(issuetime sim.VTimeInSec, finishtime sim.VTimeInSec) {
if se.enableSampled || se.disableEngine { //we do not need to collect data if sampling is enabled
return
}
se.dataidx++
if se.dataidx < 1024 { // discard the first 1024 data
return
}
se.stableEngine.Collect(issuetime, finishtime)
se.shortStableEngine.Collect(issuetime, finishtime)
stableEngine := se.stableEngine
shortStableEngine := se.shortStableEngine
if stableEngine.enableSampled {
longTime := stableEngine.predTime
shortTime := shortStableEngine.predTime
se.predTime = shortStableEngine.predTime
diff := float64((longTime - shortTime) / (longTime + shortTime))
diffBoundary := *SampledRunnerThresholdFlag
if diff <= diffBoundary && diff >= -diffBoundary {
se.enableSampled = true
se.predTime = shortTime
se.predTimeSum = shortTime * sim.VTimeInSec(se.granulary)
se.predTimeNum = uint64(se.granulary)
}
} else if shortStableEngine.enableSampled {
se.predTime = stableEngine.predTime
}
if se.enableSampled {
log.Printf("Warp Sampling is enabled")
}
}

// Predict the execution time of the next wavefronts
func (se *SampledEngine) Predict() (sim.VTimeInSec, bool) {
return se.predTime, se.enableSampled
}
5 changes: 4 additions & 1 deletion timing/cp/commandprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/sarchlab/akita/v3/sim"
"github.com/sarchlab/akita/v3/tracing"
"github.com/sarchlab/mgpusim/v3/protocol"
"github.com/sarchlab/mgpusim/v3/samplinglib"
"github.com/sarchlab/mgpusim/v3/timing/cp/internal/dispatching"
"github.com/sarchlab/mgpusim/v3/timing/cp/internal/resource"
"github.com/sarchlab/mgpusim/v3/timing/pagemigrationcontroller"
Expand Down Expand Up @@ -297,7 +298,9 @@ func (p *CommandProcessor) processLaunchKernelReq(
if d == nil {
return false
}

if *samplinglib.SampledRunnerFlag {
samplinglib.Sampledengine.Reset()
}
d.StartDispatching(req)
p.ToDriver.Retrieve(now)

Expand Down
14 changes: 13 additions & 1 deletion timing/cp/internal/dispatching/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sarchlab/akita/v3/tracing"
"github.com/sarchlab/mgpusim/v3/kernels"
"github.com/sarchlab/mgpusim/v3/protocol"
"github.com/sarchlab/mgpusim/v3/samplinglib"
"github.com/sarchlab/mgpusim/v3/timing/cp/internal/resource"
)

Expand Down Expand Up @@ -113,6 +114,15 @@ func (d *DispatcherImpl) Tick(now sim.VTimeInSec) (madeProgress bool) {
return madeProgress
}

func (d *DispatcherImpl) collectSamplingData(locations []protocol.WfDispatchLocation) {
if *samplinglib.SampledRunnerFlag {
for _, l := range locations {
wavefront := l.Wavefront
samplinglib.Sampledengine.Collect(wavefront.Issuetime, wavefront.Finishtime)
}
}
}

func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool {
msg := d.dispatchingPort.Peek()
if msg == nil {
Expand All @@ -123,9 +133,11 @@ func (d *DispatcherImpl) processMessagesFromCU(now sim.VTimeInSec) bool {
case *protocol.WGCompletionMsg:
count := 0
for _, rspToID := range msg.RspTo {
_, ok := d.inflightWGs[rspToID]
location, ok := d.inflightWGs[rspToID]
if ok {
count += 1
///sampling
d.collectSamplingData(location.locations)
}
}

Expand Down
Loading
Loading