Skip to content

Commit

Permalink
add timing
Browse files Browse the repository at this point in the history
  • Loading branch information
Roger Waleffe authored and Roger Waleffe committed Nov 19, 2023
1 parent 9343884 commit 417b2a4
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion src/cpp/src/pipeline/pipeline_gpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ void updateEvalForBatch(Pipeline* pipeline_, shared_ptr<Batch> batch) {
void RemoteLoadWorker::run() {
while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
// NOTE: this "train" is probably not set correctly all the time
shared_ptr<Batch> batch = std::make_shared<Batch>(pipeline_->dataloader_->train_);

Expand Down Expand Up @@ -77,6 +79,8 @@ void RemoteLoadWorker::run() {
} else {
((PipelineCPU *)pipeline_)->loaded_batches_->blocking_push(batch);
}
t.stop();
std::cout<<"remote load: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand All @@ -85,7 +89,12 @@ void RemoteLoadWorker::run() {
void RemoteToDeviceWorker::run() {
while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
auto tup = ((PipelineGPU *)pipeline_)->loaded_batches_->blocking_pop();
t.stop();
std::cout<<"remote to block: "<<t.getDuration()<<"\n";
t.start();
bool popped = std::get<0>(tup);
shared_ptr<Batch> batch = std::get<1>(tup);
if (!popped) {
Expand All @@ -108,6 +117,8 @@ void RemoteToDeviceWorker::run() {

batch->creator_id_ = pipeline_->model_->pg_gloo_->pg->getRank();
batch->remoteTo(pipeline_->model_->pg_gloo_->pg, child, tag);
t.stop();
std::cout<<"remote to: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand All @@ -120,14 +131,21 @@ void BatchToDeviceWorker::run() {

while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
auto tup = ((PipelineGPU *)pipeline_)->loaded_batches_->blocking_pop();
t.stop();
std::cout<<"batch to block: "<<t.getDuration()<<"\n";
t.start();
bool popped = std::get<0>(tup);
shared_ptr<Batch> batch = std::get<1>(tup);
if (!popped) {
break;
}

batchToDevice(pipeline_, batch);
t.stop();
std::cout<<"batch to: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand All @@ -139,7 +157,12 @@ void ComputeWorkerGPU::run() {

while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
auto tup = ((PipelineGPU *)pipeline_)->device_loaded_batches_[gpu_id_]->blocking_pop();
t.stop();
std::cout<<"compute block: "<<t.getDuration()<<"\n";
t.start();
bool popped = std::get<0>(tup);
shared_ptr<Batch> batch = std::get<1>(tup);
if (!popped) {
Expand Down Expand Up @@ -229,6 +252,8 @@ void ComputeWorkerGPU::run() {
batch->clear();
}
}
t.stop();
std::cout<<"compute: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand Down Expand Up @@ -260,7 +285,12 @@ void EncodeNodesWorkerGPU::run() {
void BatchToHostWorker::run() {
while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
auto tup = ((PipelineGPU *)pipeline_)->device_update_batches_[gpu_id_]->blocking_pop();
t.stop();
std::cout<<"batch to host block: "<<t.getDuration()<<"\n";
t.start();
bool popped = std::get<0>(tup);
shared_ptr<Batch> batch = std::get<1>(tup);
if (!popped) {
Expand All @@ -285,6 +315,8 @@ void BatchToHostWorker::run() {
}

((PipelineGPU *)pipeline_)->update_batches_->blocking_push(batch);
t.stop();
std::cout<<"batch to host: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand All @@ -293,7 +325,12 @@ void BatchToHostWorker::run() {
void RemoteToHostWorker::run() {
while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
auto tup = ((PipelineGPU *)pipeline_)->update_batches_->blocking_pop();
t.stop();
std::cout<<"remote to host block: "<<t.getDuration()<<"\n";
t.start();
bool popped = std::get<0>(tup);
shared_ptr<Batch> batch = std::get<1>(tup);
if (!popped) {
Expand Down Expand Up @@ -330,7 +367,8 @@ void RemoteToHostWorker::run() {
lock.unlock();

batch->remoteTo(pipeline_->model_->pg_gloo_->pg, parent, tag);

t.stop();
std::cout<<"remote to host: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand All @@ -339,6 +377,8 @@ void RemoteToHostWorker::run() {
void RemoteListenForUpdatesWorker::run() {
while (!done_) {
while (!paused_) {
Timer t = new Timer(false);
t.start();
// NOTE: this "train" is probably not set correctly all the time
shared_ptr<Batch> batch = std::make_shared<Batch>(pipeline_->dataloader_->train_);

Expand Down Expand Up @@ -370,6 +410,8 @@ void RemoteListenForUpdatesWorker::run() {
batch->remoteReceive(pipeline_->model_->pg_gloo_->pg, child, tag);

((PipelineGPU *)pipeline_)->update_batches_->blocking_push(batch);
t.stop();
std::cout<<"remote listen: "<<t.getDuration()<<"\n";
}
nanosleep(&sleep_time_, NULL);
}
Expand Down

0 comments on commit 417b2a4

Please sign in to comment.