Skip to content

Commit

Permalink
Merge pull request #96 from DICL/refactor-small-asyncchannel-changes
Browse files Browse the repository at this point in the history
Changed queue of messages to smart ptrs
  • Loading branch information
vicentebolea committed May 15, 2016
2 parents c20b20d + 66c5174 commit 70584d6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
18 changes: 7 additions & 11 deletions src/network/asyncchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ void AsyncChannel::do_write (Message* m) {

stringstream ss;
ss << setfill('0') << setw(header_size) << str.length() << str;
//string* to_write = new string(ss.str());
// TODO messages_queue may not be thread-safe.
messages_queue.emplace (new string(ss.str()));

// TODO messages_queue may not be thread-safe: Let's leave that task to PeerDFS
messages_queue.emplace (make_unique<string>(ss.str()));
if (!is_writing.exchange(true)) {
do_write_impl ();
}
}
// }}}
// do_write_impl {{{
void AsyncChannel::do_write_impl () {
auto to_write = messages_queue.front();
async_write (*sender, buffer(*to_write), boost::asio::transfer_exactly(to_write->length()) ,boost::bind (&AsyncChannel::on_write,
this, ph::error, ph::bytes_transferred));
auto& to_write = messages_queue.front();
async_write (*sender, buffer(*to_write), transfer_exactly(to_write->length()),
boost::bind (&AsyncChannel::on_write, this, ph::error, ph::bytes_transferred));
}
// }}}
// on_write {{{
Expand All @@ -61,11 +61,10 @@ void AsyncChannel::on_write (const boost::system::error_code& ec,
logger->info ("Message could not reach err=%s",
ec.message().c_str());

//sleep(10);
do_write_impl();
} else {
delete messages_queue.front();
messages_queue.pop();

if (!messages_queue.empty()) {
do_write_impl ();
} else {
Expand All @@ -91,17 +90,14 @@ void AsyncChannel::read_coroutine (yield_context yield) {
while (true) {
size_t l = async_read (*receiver, buffer(header, header_size), yield[ec]);
if (l != (size_t)header_size or ec) {
logger->info ("HEADER size %d", l);
throw std::runtime_error("header size");
}

size_t size = atoi(header);
l = read (*receiver, body.prepare(size));
if (l != size) {
logger->info ("Body size %d != %d", l, size);
throw std::runtime_error("body size");
}
//if (ec) throw 1;

body.commit (l);
string str ((istreambuf_iterator<char>(&body)),
Expand Down
2 changes: 1 addition & 1 deletion src/network/asyncchannel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class AsyncChannel: public Channel {

NetObserver* node = nullptr;
tcp::socket *sender, *receiver;
std::queue<std::string*> messages_queue;
int id;
std::queue<std::unique_ptr<std::string>> messages_queue;
std::atomic<bool> is_writing;
};

Expand Down

0 comments on commit 70584d6

Please sign in to comment.