-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.h
200 lines (161 loc) · 4.73 KB
/
scheduler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#pragma once
#include <queue>
#include <memory>
#include <cassert>
#include "timer.h"
namespace wrr_queue
{
class mem_operation
{
public:
typedef enum {
OP_READ = 0,
OP_WRITE
} type;
mem_operation(uint64_t page, type op_type, uint64_t time = 0ull) : m_page(page), m_type(op_type), m_time(time) { ; }
mem_operation(const mem_operation ©) : m_page(copy.m_page), m_type(copy.m_type), m_time(copy.m_time) { ; }
mem_operation(const mem_operation ©, uint64_t time) : m_page(copy.m_page), m_type(copy.m_type), m_time(time) { ;}
static bool cmp(const mem_operation &left, const mem_operation &right) { return left.m_time > right.m_time; }
type get_type() const
{
return m_type;
}
uint64_t get_page() const
{
return m_page;
}
uint64_t get_time() const
{
return m_time;
}
private:
type m_type;
uint64_t m_page;
uint64_t m_time;
};
class cmp
{
public:
bool operator()(const mem_operation &left, const mem_operation &right) const
{
return mem_operation::cmp(left, right);
}
};
class queue : public time_dependent
{
public:
queue() { ; }
virtual void wait()
{
;
}
// report time of the next event
// 0 - now or
virtual uint64_t next_event() const
{
return m_queue.top().get_time();
}
virtual bool is_available() const
{
return m_queue.size();
}
uint64_t size() const
{
return m_queue.size();
}
void push(const mem_operation &op)
{
m_queue.push(op);
}
mem_operation pop()
{
mem_operation ret = m_queue.top();
m_queue.pop();
return ret;
}
private:
std::priority_queue<mem_operation, std::vector<mem_operation>, cmp> m_queue;
};
class scheduler : public timer
{
public:
scheduler(std::vector<std::shared_ptr<queue> > &input_queues, std::vector<uint32_t> &weights,
uint64_t device_read_time, uint64_t device_write_time, uint64_t device_queue_size) :
m_device_read_time(device_read_time), m_device_write_time(device_write_time), m_device_queue_size(device_queue_size),
m_device_queue(std::make_shared<queue>())
{
assert(input_queues.size() == weights.size());
m_input_queues = input_queues;
m_weights = weights;
m_queue_finishing_time.resize(weights.size());
m_queue_operations_count.resize(weights.size());
m_queue_responce_time.resize(weights.size());
}
void run()
{
bool stop = false;
uint32_t max_weight = *std::max_element(m_weights.begin(), m_weights.end());
std::vector<time_dependent*> waiter_list;
for(auto v : m_input_queues)
waiter_list.push_back(v.get());
this->wait_any(waiter_list);
while (!stop)
{
m_service_round_weights = m_weights;
//service round
for (uint32_t i = 0; i < max_weight; ++i)
{
for (uint32_t j = 0; j < m_input_queues.size(); ++j)
{
if (m_service_round_weights[j] == 0 || !m_input_queues[j]->is_available() ||
m_input_queues[j]->next_event() > this->report_time())
continue;
--m_service_round_weights[j];
mem_operation op = m_input_queues[j]->pop();
uint64_t op_time = op.get_type() == mem_operation::OP_READ ? m_device_read_time : m_device_write_time;
if (m_device_queue->size() > m_device_queue_size)
{
this->wait(m_device_queue.get());
m_device_queue->pop();
}
uint64_t finish_time = op_time + this->report_time();
m_device_queue->push(mem_operation(op, finish_time));
m_queue_responce_time[j].push_back(finish_time - op.get_time());
++m_queue_operations_count[j];
if (m_input_queues[j]->size() == 0)
m_queue_finishing_time[j] = this->report_time() + op_time;
this->wait_any(waiter_list);
}
}
//end of service round
bool _stop = true;
for (auto q : m_input_queues)
_stop &= !q->is_available();
stop = _stop;
}
}
const std::vector<uint64_t> &get_finishing_time() const
{
return m_queue_finishing_time;
}
const std::vector<uint64_t> &get_operations_count() const
{
return m_queue_operations_count;
}
const std::vector<std::vector<uint64_t> > &get_responce_time() const
{
return m_queue_responce_time;
}
private:
std::shared_ptr<queue> m_device_queue;
std::vector<std::shared_ptr<queue> > m_input_queues;
std::vector<uint32_t> m_weights;
std::vector<uint32_t> m_service_round_weights;
const uint64_t m_device_read_time;
const uint64_t m_device_write_time;
const uint64_t m_device_queue_size;
std::vector<uint64_t> m_queue_finishing_time;
std::vector<uint64_t> m_queue_operations_count;
std::vector<std::vector<uint64_t> > m_queue_responce_time;
};
}