-
Notifications
You must be signed in to change notification settings - Fork 3
/
ideal.go
152 lines (125 loc) · 3.63 KB
/
ideal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
// run ideal event loop until completion.
import (
"container/heap"
"container/list"
"sort"
)
const (
NUM_HOSTS = 144
PROPAGATION_DELAY = 0.4 // microseconds
HOSTS_IN_RACK = 16
)
func removeFromActiveFlows(ls *list.List, f *Flow) {
for e := ls.Front(); e != nil; e = e.Next() {
if e.Value.(*Flow) == f {
ls.Remove(e)
break
}
}
}
func getOracleFCT(flow *Flow, bw float64) (float64, float64) {
pd := PROPAGATION_DELAY * 4
td := (float64(flow.Size) / (bw * 1e9 / 8)) * 1e6 // microseconds
return pd, td
}
// input: linked list of flows
// output: sorted slice of flows, number of flows
func getSortedFlows(actives *list.List) SortedFlows {
sortedFlows := make(SortedFlows, actives.Len())
i := 0
for e := actives.Front(); e != nil; e = e.Next() {
sortedFlows[i] = e.Value.(*Flow)
i++
}
sort.Sort(sortedFlows)
return sortedFlows
}
// input: eventQueue of FlowArrival events, topology bandwidth (to determine oracle FCT)
// output: slice of pointers to completed Flows
func ideal(bandwidth float64, logger chan LogEvent, eventQueue EventQueue) []*Flow {
heap.Init(&eventQueue)
activeFlows := list.New()
completedFlows := make([]*Flow, 0)
var srcPorts [NUM_HOSTS]*Flow
var dstPorts [NUM_HOSTS]*Flow
var currentTime float64
for len(eventQueue) > 0 {
ev := heap.Pop(&eventQueue).(*Event)
if ev.Cancelled {
continue
}
if ev.Time < currentTime {
panic("going backwards!")
}
currentTime = ev.Time
flow := ev.Flow
switch ev.Type {
case FlowArrival:
logger <- LogEvent{Time: currentTime, Type: LOG_FLOW_ARRIVAL, Flow: flow}
prop_delay, trans_delay := getOracleFCT(flow, bandwidth)
flow.TimeRemaining = trans_delay
flow.OracleFct = prop_delay + trans_delay
activeFlows.PushBack(flow)
case FlowSourceFree:
flow.FinishSending = true
flow.FinishEvent = makeCompletionEvent(currentTime+2*PROPAGATION_DELAY, flow, FlowDestFree)
heap.Push(&eventQueue, flow.FinishEvent)
case FlowDestFree:
if !flow.FinishSending {
panic("finish without finishSending")
}
removeFromActiveFlows(activeFlows, flow)
flow.End = currentTime + 2*PROPAGATION_DELAY // send an ACK
flow.Finish = true
completedFlows = append(completedFlows, flow)
logger <- LogEvent{Time: currentTime, Type: LOG_FLOW_FINISHED, Flow: flow}
}
for i := 0; i < len(srcPorts); i++ {
if srcPorts[i] != nil {
inProgressFlow := srcPorts[i]
if inProgressFlow.LastTime == 0 {
panic("flow in progress without LastTime set")
}
if inProgressFlow.FinishEvent == nil {
panic("flow in progress without FinishEvent set")
}
inProgressFlow.TimeRemaining -= (currentTime - inProgressFlow.LastTime)
inProgressFlow.LastTime = 0
if !inProgressFlow.FinishSending {
inProgressFlow.FinishEvent.Cancelled = true
inProgressFlow.FinishEvent = nil
}
}
srcPorts[i] = nil
dstPorts[i] = nil
}
sortedActiveFlows := getSortedFlows(activeFlows)
for _, f := range sortedActiveFlows {
src := f.Source
dst := f.Dest
if f.FinishSending {
if f.Finish {
panic("finished flow in actives")
}
if srcPorts[src] != nil || dstPorts[dst] != nil {
panic("ports taken on still sending flow")
}
dstPorts[dst] = f
continue
}
if srcPorts[src] == nil && dstPorts[dst] == nil {
//this flow gets scheduled.
f.LastTime = currentTime
srcPorts[src] = f
dstPorts[dst] = f
if f.FinishEvent != nil {
panic("flow being scheduled, finish event non-nil")
}
f.FinishEvent = makeCompletionEvent(currentTime+f.TimeRemaining, f, FlowSourceFree)
heap.Push(&eventQueue, f.FinishEvent)
}
}
}
return completedFlows
}