Skip to content

Commit

Permalink
Merge pull request #79 from DICL/fix-dfs
Browse files Browse the repository at this point in the history
big file size bugs are fixed
  • Loading branch information
vicentebolea committed Apr 11, 2016
2 parents 939e021 + a96608f commit dd95dd5
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 21 deletions.
2 changes: 1 addition & 1 deletion doc/manual
Submodule manual updated from e4c081 to d4a436
4 changes: 3 additions & 1 deletion src/fs/dfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ int DFS::get(int argc, char* argv[])
auto fd = read_reply<FileDescription> (socket.get());

ofstream f (file_name);
socket->close();
int block_seq = 0;
for (auto block_name : fd->blocks) {
uint32_t hash_key = fd->hash_keys[block_seq++];
Expand Down Expand Up @@ -274,6 +275,7 @@ int DFS::rm(int argc, char* argv[])

send_message(socket.get(), &fr);
auto fd = read_reply<FileDescription>(socket.get());
socket->close();

unsigned int block_seq = 0;
for (auto block_name : fd->blocks) {
Expand All @@ -288,7 +290,7 @@ int DFS::rm(int argc, char* argv[])
cerr << "[ERR] " << block_name << "doesn't exist." << endl;
return EXIT_FAILURE;
}

socket->close();
}

FileDel file_del;
Expand Down
20 changes: 10 additions & 10 deletions src/network/p2p.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ void P2P::do_write (Message* m) {
string str = save_message (m);
stringstream ss;
ss << setfill('0') << setw(header_size) << str.length() << str;
string* to_write = new string(ss.str());

async_write (client,
buffer(ss.str()), boost::bind (&P2P::on_write, this,
ph::error, ph::bytes_transferred, m));
async_write (client, buffer(*to_write), boost::bind (&P2P::on_write, this,
ph::error, ph::bytes_transferred, m, to_write));
}
// }}}
// on_write {{{
void P2P::on_write (const boost::system::error_code& ec,
size_t s, Message* m) {
size_t s, Message* m, string* str) {
delete str;
if (ec) {
logger->info ("Message could not reach err=%s",
ec.message().c_str());
Expand All @@ -83,7 +84,7 @@ void P2P::on_write (const boost::system::error_code& ec,
// }}}
// read_coroutine {{{
void P2P::read_coroutine (yield_context yield) {
boost::asio::streambuf body;
char * body;
boost::system::error_code ec;
char header [17];
auto* sock = server;
Expand All @@ -95,13 +96,12 @@ void P2P::read_coroutine (yield_context yield) {
if (l != (size_t)header_size) continue;

size_t size = atoi(header);
l = async_read (*sock, body.prepare(size), yield[ec]);
body = new char[size];
l = async_read (*sock, buffer(body, size), yield[ec]);

if (!ec) {
body.commit (l);
string str ((istreambuf_iterator<char>(&body)),
istreambuf_iterator<char>());
body.consume (l);
string str (body);
delete[] body;

Message* msg = nullptr;
msg = load_message(str);
Expand Down
3 changes: 2 additions & 1 deletion src/network/p2p.hh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string>
#include <vector>
#include <boost/asio/spawn.hpp>
#include <sstream>

namespace eclipse {
namespace network {
Expand All @@ -22,7 +23,7 @@ class P2P: public AsyncChannel {
protected:
void on_connect (const boost::system::error_code&);
void on_write (const boost::system::error_code&, size_t,
Message*);
Message*, std::string*);

void do_read ();
void read_coroutine (boost::asio::yield_context);
Expand Down
19 changes: 12 additions & 7 deletions src/network/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ void Server::do_write (Message* m) {
stringstream ss;
ss << setfill('0') << setw(header_size) << str.length() << str;

async_write (*server,
buffer(ss.str()), boost::bind (&Server::on_write, this,
ph::error, ph::bytes_transferred, m));
string* to_write = new string(ss.str());

async_write (*server, buffer(*to_write), boost::bind (&Server::on_write, this,
ph::error, ph::bytes_transferred, m, to_write));
}
// }}}
// on_write {{{
void Server::on_write (const boost::system::error_code& ec,
size_t s, Message* m) {
size_t s, Message* m, string* str) {
delete str;
if (ec) {
logger->info ("Message could not reach err=%s",
ec.message().c_str());
Expand All @@ -74,6 +76,7 @@ void Server::read_coroutine (yield_context yield) {

size_t size = atoi(header);
l = async_read (*sock, body.prepare(size), yield[ec]);
logger->info ("Server: l=%d", l);
if (ec) throw 1;

body.commit (l);
Expand All @@ -88,12 +91,14 @@ void Server::read_coroutine (yield_context yield) {
}
} catch (...) {
if (ec == boost::asio::error::eof)
logger->info ("Closing server socket to client");
logger->info ("Server: Closing server socket to client");
else
logger->info ("Message arrived error=%s",
logger->info ("Server: Message arrived error=%s",
ec.message().c_str());

server->close();
if (server != nullptr)
server->close();

delete server;
server = nullptr;
node->on_disconnect();
Expand Down
2 changes: 1 addition & 1 deletion src/network/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Server: public AsyncChannel {

protected:
void on_write (const boost::system::error_code&, size_t,
Message*);
Message*, std::string*);

void do_read ();
void read_coroutine (boost::asio::yield_context);
Expand Down
1 change: 1 addition & 0 deletions src/nodes/peerdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void PeerDFS::insert (uint32_t hash_key, std::string name, std::string v) {
int which_node = boundaries->get_index(hash_key);

if (which_node == id) {
logger->info ("[DFS] Saving locally KEY: %s", name.c_str());
string file_path = disk_path + string("/") + name;
ofstream file (file_path);
file << v;
Expand Down

0 comments on commit dd95dd5

Please sign in to comment.