From daf66197212a30f5a1d49839ac159cc9de9ce633 Mon Sep 17 00:00:00 2001 From: Kibeom Jin Date: Sat, 11 Jun 2016 11:33:34 +0900 Subject: [PATCH 1/3] update (=offset overwrite) is done --- Makefile.am | 2 + src/fs/dfs.cc | 104 +++++++++++++++++++++++++++++++++--- src/fs/dfs.hh | 3 +- src/fs/main.cc | 28 +++++----- src/messages/blockupdate.cc | 5 ++ src/messages/blockupdate.hh | 18 +++++++ src/messages/boost_impl.cc | 2 + src/messages/boost_impl.hh | 25 +++++++++ src/messages/offsetkv.cc | 11 ++++ src/messages/offsetkv.hh | 20 +++++++ src/nodes/local_io.cc | 9 ++++ src/nodes/local_io.hh | 2 + src/nodes/peerdfs.cc | 59 +++++++++++++++++++- src/nodes/peerdfs.hh | 3 ++ src/nodes/remotedfs.cc | 20 +++++++ src/nodes/remotedfs.hh | 1 + 16 files changed, 291 insertions(+), 21 deletions(-) create mode 100644 src/messages/blockupdate.cc create mode 100644 src/messages/blockupdate.hh create mode 100644 src/messages/offsetkv.cc create mode 100644 src/messages/offsetkv.hh diff --git a/Makefile.am b/Makefile.am index 7bfc4cea..5bc3b2f1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -8,12 +8,14 @@ bin_PROGRAMS = eclipse_node dfs messages_files = src/messages/boundaries.cc \ src/messages/message.cc \ src/messages/keyvalue.cc \ + src/messages/offsetkv.cc \ src/messages/keyrequest.cc \ src/messages/control.cc \ src/messages/boost_impl.cc \ src/messages/factory.cc \ src/messages/fileinfo.cc \ src/messages/blockinfo.cc \ + src/messages/blockupdate.cc \ src/messages/task.cc \ src/messages/reply.cc \ src/messages/filerequest.cc \ diff --git a/src/fs/dfs.cc b/src/fs/dfs.cc index 36781bc8..706db680 100644 --- a/src/fs/dfs.cc +++ b/src/fs/dfs.cc @@ -441,10 +441,10 @@ namespace eclipse{ return EXIT_SUCCESS; } - int DFS::partial_get(int argc, char* argv[]) { + int DFS::pget(int argc, char* argv[]) { string file_name = ""; if (argc < 5) { - cout << "[INFO] dfs partial_get file_name start_offset read_byte" << endl; + cout << "[INFO] dfs pget file_name start_offset read_byte" << endl; return EXIT_FAILURE; } else { Histogram boundaries(NUM_NODES, 0); @@ -474,7 +474,7 @@ namespace eclipse{ cerr << "[ERR] Wrong read byte." << endl; return EXIT_FAILURE; } - string outfile = "partial_" + file_name; + string outfile = "p_" + file_name; ofstream f(outfile); int block_seq = 0; uint64_t passed_byte = 0; @@ -502,12 +502,12 @@ namespace eclipse{ } else { start_pos = 0; } - string sub_str = block_content.substr(start_pos); - if (read_byte_cnt + sub_str.length() > read_byte) { + uint32_t read_length = block_content.length(); + if (read_byte_cnt + read_length > read_byte) { final_block = true; - uint32_t sub_length = read_byte - read_byte_cnt; - sub_str = block_content.substr(start_pos, sub_length); + read_length = read_byte - read_byte_cnt; } + string sub_str = msg->content.substr(start_pos, read_length); f << sub_str; read_byte_cnt += sub_str.length(); tmp_socket->close(); @@ -521,4 +521,94 @@ namespace eclipse{ cout << "[INFO] " << file_name << " is read." << endl; return EXIT_SUCCESS; } + + int DFS::update(int argc, char* argv[]) { + string ori_file_name = ""; + if (argc < 5) { + cout << "[INFO] dfs update original_file new_file start_offset" << endl; + return EXIT_FAILURE; + } else { + Histogram boundaries(NUM_NODES, 0); + boundaries.initialize(); + + ori_file_name = argv[2]; + string new_file_name = argv[3]; + uint64_t start_offset = (uint64_t)atoi(argv[4]); + uint32_t file_hash_key = h(ori_file_name); + auto socket = connect (file_hash_key); + FileExist fe; + fe.name = ori_file_name; + send_message(socket.get(), &fe); + auto rep = read_reply (socket.get()); + + if (rep->message != "TRUE") { + cerr << "[ERR] " << ori_file_name << " doesn't exist." << endl; + return EXIT_FAILURE; + } + FileRequest fr; + fr.name = ori_file_name; + + ifstream myfile(new_file_name); + myfile.seekg(0, myfile.end); + uint64_t new_file_size = myfile.tellg(); + + send_message(socket.get(), &fr); + auto fd = read_reply (socket.get()); + socket->close(); + if (start_offset + new_file_size > fd->size) { + cerr << "[ERR] Wrong file size." << endl; + return EXIT_FAILURE; + } + myfile.seekg(0, myfile.beg); + char *buffer = new char[new_file_size]; + myfile.read(buffer, new_file_size); + string sbuffer(buffer); + delete[] buffer; + + int block_seq = 0; + uint64_t passed_byte = 0; + uint64_t write_byte_cnt = 0; + uint32_t ori_start_pos = 0; + uint32_t to_write_byte = new_file_size; + bool first_block = true; + bool final_block = false; + for (auto block_name : fd->blocks) { + if (passed_byte + fd->block_size[block_seq] < start_offset) { + passed_byte += fd->block_size[block_seq]; + block_seq++; + continue; + } else { + uint32_t hash_key = fd->hash_keys[block_seq]; + auto tmp_socket = connect(boundaries.get_index(hash_key)); + if (first_block) { + first_block = false; + ori_start_pos = start_offset - passed_byte; + } else { + ori_start_pos = 0; + } + uint32_t write_length = fd->block_size[block_seq++]; + if (to_write_byte < write_length) { + final_block = true; + write_length = to_write_byte; + } + BlockUpdate bu; + bu.name = block_name; + bu.replica = fd->replica; + bu.hash_key = hash_key; + bu.pos = ori_start_pos; + bu.len = write_length; + bu.content = sbuffer.substr(write_byte_cnt, write_length); + send_message(tmp_socket.get(), &bu); + write_byte_cnt += write_length; + tmp_socket->close(); + if (final_block) { + break; + } + } + } + myfile.close(); + } + cout << "[INFO] " << ori_file_name << " is updated." << endl; + return EXIT_SUCCESS; + } } diff --git a/src/fs/dfs.hh b/src/fs/dfs.hh index 6253433e..cf15a820 100644 --- a/src/fs/dfs.hh +++ b/src/fs/dfs.hh @@ -6,6 +6,7 @@ #include "../messages/factory.hh" #include "../messages/fileinfo.hh" #include "../messages/blockinfo.hh" +#include "../messages/blockupdate.hh" #include "../messages/fileexist.hh" #include "../messages/filerequest.hh" #include "../messages/filelist.hh" @@ -57,7 +58,7 @@ namespace eclipse { int rm(int argc, char* argv[]); int format(int argc, char* argv[]); int show(int argc, char* argv[]); - int partial_get(int argc, char* argv[]); + int pget(int argc, char* argv[]); int update(int argc, char* argv[]); }; } diff --git a/src/fs/main.cc b/src/fs/main.cc index 5d2e1eb7..e9396bdd 100644 --- a/src/fs/main.cc +++ b/src/fs/main.cc @@ -11,38 +11,42 @@ int main(int argc, char* argv[]) { string op = argv[1]; if (op.compare("put") == 0) { dfs.put(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("get") == 0) { dfs.get(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("cat") == 0) { dfs.cat(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("ls") == 0) { dfs.ls(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("rm") == 0) { dfs.rm(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("format") == 0) { dfs.format(argc, argv); - return 0; + return EXIT_SUCCESS; } else if (op.compare("show") == 0) { dfs.show(argc, argv); - return 0; + return EXIT_SUCCESS; } - else if (op.compare("partial_get") == 0) { - dfs.DFS::partial_get(argc, argv); - return 0; + else if (op.compare("pget") == 0) { + dfs.DFS::pget(argc, argv); + return EXIT_SUCCESS; + } + else if (op.compare("update") == 0) { + dfs.DFS::update(argc, argv); + return EXIT_SUCCESS; } } cerr << "[ERR] Unknown operation" << endl; - cout << "[INFO] dfs put|get|cat|ls|rm|format|partial_get" << endl; - return -1; + cout << "[INFO] dfs put|get|cat|ls|rm|format|pget|update" << endl; + return EXIT_FAILURE; } diff --git a/src/messages/blockupdate.cc b/src/messages/blockupdate.cc new file mode 100644 index 00000000..9dae6669 --- /dev/null +++ b/src/messages/blockupdate.cc @@ -0,0 +1,5 @@ +#include "blockupdate.hh" + +using namespace eclipse::messages; + +std::string BlockUpdate::get_type() const { return "BlockUpdate"; } diff --git a/src/messages/blockupdate.hh b/src/messages/blockupdate.hh new file mode 100644 index 00000000..375f6b12 --- /dev/null +++ b/src/messages/blockupdate.hh @@ -0,0 +1,18 @@ +#pragma once +#include "message.hh" +#include + +namespace eclipse { +namespace messages { + struct BlockUpdate: public Message { + std::string get_type() const override; + + std::string name; + uint32_t hash_key; + int replica; + uint32_t pos; + uint32_t len; + std::string content; + }; +} +} diff --git a/src/messages/boost_impl.cc b/src/messages/boost_impl.cc index 5d631c8c..6211e993 100644 --- a/src/messages/boost_impl.cc +++ b/src/messages/boost_impl.cc @@ -6,12 +6,14 @@ //! 4) Also here BOOST_CLASS_EXPORT(eclipse::messages::Boundaries); BOOST_CLASS_EXPORT(eclipse::messages::KeyValue); +BOOST_CLASS_EXPORT(eclipse::messages::OffsetKeyValue); BOOST_CLASS_EXPORT(eclipse::messages::Control); BOOST_CLASS_EXPORT(eclipse::messages::KeyRequest); BOOST_CLASS_EXPORT(eclipse::messages::Task); BOOST_CLASS_EXPORT(eclipse::messages::FileInfo); BOOST_CLASS_EXPORT(eclipse::messages::FileList); BOOST_CLASS_EXPORT(eclipse::messages::BlockInfo); +BOOST_CLASS_EXPORT(eclipse::messages::BlockUpdate); BOOST_CLASS_EXPORT(eclipse::messages::Reply); BOOST_CLASS_EXPORT(eclipse::messages::CacheInfo); BOOST_CLASS_EXPORT(eclipse::messages::FileRequest); diff --git a/src/messages/boost_impl.hh b/src/messages/boost_impl.hh index c241ca0c..3a8f841a 100644 --- a/src/messages/boost_impl.hh +++ b/src/messages/boost_impl.hh @@ -24,6 +24,8 @@ #include "filedel.hh" #include "formatrequest.hh" #include "fileexist.hh" +#include "offsetkv.hh" +#include "blockupdate.hh" #include #include @@ -62,6 +64,16 @@ template ar & BOOST_SERIALIZATION_NVP(k.value); } +template + void serialize (Archive& ar, eclipse::messages::OffsetKeyValue& k, unsigned int) { + ar & BASE_OBJECT(Message, k); + ar & BOOST_SERIALIZATION_NVP(k.key); + ar & BOOST_SERIALIZATION_NVP(k.name); + ar & BOOST_SERIALIZATION_NVP(k.value); + ar & BOOST_SERIALIZATION_NVP(k.pos); + ar & BOOST_SERIALIZATION_NVP(k.len); + } + template void serialize (Archive& ar, eclipse::messages::Control& c, unsigned int) { ar & BASE_OBJECT(Message, c); @@ -102,6 +114,17 @@ template ar & BOOST_SERIALIZATION_NVP(c.content); } +template + void serialize (Archive& ar, eclipse::messages::BlockUpdate& c, unsigned int) { + ar & BASE_OBJECT(Message, c); + ar & BOOST_SERIALIZATION_NVP(c.name); + ar & BOOST_SERIALIZATION_NVP(c.hash_key); + ar & BOOST_SERIALIZATION_NVP(c.replica); + ar & BOOST_SERIALIZATION_NVP(c.pos); + ar & BOOST_SERIALIZATION_NVP(c.len); + ar & BOOST_SERIALIZATION_NVP(c.content); + } + template void serialize (Archive& ar, eclipse::messages::Task& c, unsigned int) { ar & BASE_OBJECT(Message, c); @@ -189,10 +212,12 @@ BOOST_SERIALIZATION_ASSUME_ABSTRACT(eclipse::messages::Message); BOOST_CLASS_TRACKING(eclipse::messages::Message, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::Boundaries, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::KeyValue, boost::serialization::track_never); +BOOST_CLASS_TRACKING(eclipse::messages::OffsetKeyValue, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::Control, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::KeyRequest, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::FileInfo, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::BlockInfo, boost::serialization::track_never); +BOOST_CLASS_TRACKING(eclipse::messages::BlockUpdate, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::Task, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::FileList, boost::serialization::track_never); BOOST_CLASS_TRACKING(eclipse::messages::Reply, boost::serialization::track_never); diff --git a/src/messages/offsetkv.cc b/src/messages/offsetkv.cc new file mode 100644 index 00000000..374274a4 --- /dev/null +++ b/src/messages/offsetkv.cc @@ -0,0 +1,11 @@ +#include "offsetkv.hh" + +namespace eclipse { +namespace messages { + +OffsetKeyValue::OffsetKeyValue (uint32_t k, std::string n, std::string v, uint32_t p, uint32_t l) : key(k), name(n), value(v), pos(p), len(l) { } + +std::string OffsetKeyValue::get_type() const { return "OffsetKeyValue"; } + +} +} diff --git a/src/messages/offsetkv.hh b/src/messages/offsetkv.hh new file mode 100644 index 00000000..bd451ec7 --- /dev/null +++ b/src/messages/offsetkv.hh @@ -0,0 +1,20 @@ +#pragma once + +#include "message.hh" +#include + +namespace eclipse { +namespace messages { + +struct OffsetKeyValue: public Message { + OffsetKeyValue () = default; + OffsetKeyValue (uint32_t, std::string, std::string, uint32_t, uint32_t); + + std::string get_type() const override; + uint32_t key; + std::string name, value; + uint32_t pos, len; +}; + +} +} diff --git a/src/nodes/local_io.cc b/src/nodes/local_io.cc index f023aa95..39c072fb 100644 --- a/src/nodes/local_io.cc +++ b/src/nodes/local_io.cc @@ -20,6 +20,15 @@ void Local_io::write (std::string name, std::string v) { file.close(); } // }}} +// update {{{ +void Local_io::update (std::string name, std::string v, uint32_t pos, uint32_t len) { + string file_path = disk_path + string("/") + name; + fstream file (file_path, ios::out|ios::in); + file.seekp(pos, ios::beg); + file.write(v.c_str(), len); + file.close(); +} +// }}} // read {{{ std::string Local_io::read (string name) { ifstream in (disk_path + string("/") + name); diff --git a/src/nodes/local_io.hh b/src/nodes/local_io.hh index cebb439c..cfd8e4b5 100644 --- a/src/nodes/local_io.hh +++ b/src/nodes/local_io.hh @@ -1,5 +1,6 @@ #pragma once #include +#include namespace eclipse { @@ -10,6 +11,7 @@ namespace eclipse { class Local_io { public: void write (std::string, std::string); + void update (std::string, std::string, uint32_t, uint32_t); std::string read (std::string); void remove (std::string); bool format (); diff --git a/src/nodes/peerdfs.cc b/src/nodes/peerdfs.cc index bbc50378..a8e05029 100644 --- a/src/nodes/peerdfs.cc +++ b/src/nodes/peerdfs.cc @@ -46,6 +46,21 @@ void PeerDFS::insert(uint32_t hash_key, std::string name, std::string v) { } } // }}} +// update {{{ +void PeerDFS::update(uint32_t hash_key, std::string name, std::string v, uint32_t p, uint32_t l) { + int which_node = boundaries->get_index(hash_key); + + if (which_node == id) { + INFO("[DFS] Updating locally KEY: %s", name.c_str()); + local_io.update(name, v, p, l); + + } else { + INFO("[DFS] Forwaring KEY: %s -> %d", name.c_str(), which_node); + OffsetKeyValue okv (hash_key, name, v, p, l); + network->send(which_node, &okv); + } +} +// }}} // request {{{ void PeerDFS::request(uint32_t key, string name , req_func f) { int idx = boundaries->get_index(key); @@ -68,7 +83,7 @@ void PeerDFS::close() { exit(EXIT_SUCCESS); } // process (KeyValue* m) {{{ template<> void PeerDFS::process(KeyValue* m) { auto key = m->key; - auto name = m->name; + auto name = m->name; int which_node = boundaries->get_index(key); if (which_node == id or m->destination == id) { @@ -83,6 +98,18 @@ template<> void PeerDFS::process(KeyValue* m) { } } // }}} +// process (OffsetKeyValue* m) {{{ +template<> void PeerDFS::process(OffsetKeyValue* m) { + auto key = m->key; + auto name = m->name; + + int which_node = boundaries->get_index(key); + if (which_node == id or m->destination == id) { + INFO("Update key = %s", name.c_str()); + update(key, m->name, m->value, m->pos, m->len); + } +} +// }}} // process (KeyRequest* m) {{{ template<> void PeerDFS::process (KeyRequest* m) { INFO("Arrived req key = %s", m->key.c_str()); @@ -112,6 +139,12 @@ template<> void PeerDFS::process(BlockInfo* m) { logger->info("real host = %d", id); } // }}} +// process (BlockUpdate* m) {{{ +template<> void PeerDFS::process(BlockUpdate* m) { + local_io.update(m->name, m->content, m->pos, m->len); + logger->info("block update real host = %d", id); +} +// }}} // process (BlockDel* m) {{{ template<> void PeerDFS::process (BlockDel* m) { local_io.remove(m->name); @@ -123,6 +156,9 @@ void PeerDFS::on_read (Message* m, int) { if (type == "KeyValue") { auto m_ = dynamic_cast(m); process(m_); + } else if (type == "OffsetKeyValue") { + auto m_ = dynamic_cast(m); + process(m_); } else if (type == "Control") { auto m_ = dynamic_cast(m); process(m_); @@ -132,6 +168,9 @@ void PeerDFS::on_read (Message* m, int) { } else if (type == "BlockInfo") { auto m_ = dynamic_cast(m); process(m_); + } else if (type == "BlockUpdate") { + auto m_ = dynamic_cast(m); + process(m_); } else if (type == "BlockDel") { auto m_ = dynamic_cast(m); process(m_); @@ -164,6 +203,7 @@ bool PeerDFS::insert_file(messages::FileInfo* f) { // }}} // insert_block {{{ bool PeerDFS::insert_block(messages::BlockInfo* m) { + logger->info("DEBUG: insert_block\n"); directory.insert_block_metadata(*m); int which_node = boundaries->get_index(m->hash_key); int tmp_node; @@ -179,6 +219,23 @@ bool PeerDFS::insert_block(messages::BlockInfo* m) { return true; } // }}} +// update_block {{{ +bool PeerDFS::update_block(messages::BlockUpdate* m) { + // directory.update_block_metadata(*m); maybe needed later + int which_node = boundaries->get_index(m->hash_key); + int tmp_node; + for (int i=0; ireplica; i++) { + if(i%2 == 1) { + tmp_node = (which_node + (i+1)/2 + network_size) % network_size; + } else { + tmp_node = (which_node - i/2 + network_size) % network_size; + } + uint32_t tmp_hash_key = boundaries->random_within_boundaries(tmp_node); + update(tmp_hash_key, m->name, m->content, m->pos, m->len); + } + return true; +} +// }}} // delete_block {{{ bool PeerDFS::delete_block(messages::BlockDel* m) { directory.delete_block_metadata(m->file_name, m->seq); diff --git a/src/nodes/peerdfs.hh b/src/nodes/peerdfs.hh index 663d9c87..4fce9511 100644 --- a/src/nodes/peerdfs.hh +++ b/src/nodes/peerdfs.hh @@ -4,6 +4,7 @@ #include "local_io.hh" #include "../network/asyncnode.hh" #include "../messages/blockinfo.hh" +#include "../messages/blockupdate.hh" #include "../messages/fileinfo.hh" #include "../messages/keyrequest.hh" #include "../messages/filerequest.hh" @@ -33,10 +34,12 @@ class PeerDFS: public Node, public AsyncNode { void on_disconnect(int) override; virtual void insert (uint32_t, std::string, std::string); + virtual void update (uint32_t, std::string, std::string, uint32_t, uint32_t); virtual void request (uint32_t, std::string, req_func); void close (); bool insert_block (messages::BlockInfo*); + bool update_block (messages::BlockUpdate*); bool insert_file (messages::FileInfo*); bool delete_block (messages::BlockDel*); bool delete_file (messages::FileDel*); diff --git a/src/nodes/remotedfs.cc b/src/nodes/remotedfs.cc index b97ff64c..62ed2e64 100644 --- a/src/nodes/remotedfs.cc +++ b/src/nodes/remotedfs.cc @@ -15,6 +15,7 @@ RemoteDFS::RemoteDFS (PeerDFS* p, network::Network* net) : Router(net) { using std::placeholders::_2; auto& rt = routing_table; rt.insert({"BlockInfo", bind(&RemoteDFS::insert_block, this, _1, _2)}); + rt.insert({"BlockUpdate", bind(&RemoteDFS::update_block, this, _1, _2)}); rt.insert({"FileInfo", bind(&RemoteDFS::insert_file, this, _1, _2)}); rt.insert({"FileRequest", bind(&RemoteDFS::request_file, this, _1, _2)}); rt.insert({"BlockRequest", bind(&RemoteDFS::request_block, this, _1, _2)}); @@ -44,6 +45,25 @@ void RemoteDFS::insert_block (messages::Message* m_, int n_channel) { network->send(n_channel, &reply); } // }}} +// BlockUpdate {{{ +void RemoteDFS::update_block (messages::Message* m_, int n_channel) { + auto m = dynamic_cast (m_); + logger->info ("BlockUpdate received"); + + bool ret = peer_dfs->update_block(m); + Reply reply; + + if (ret) { + reply.message = "OK"; + + } else { + reply.message = "FAIL"; + reply.details = "Block update failed"; + } + + network->send(n_channel, &reply); +} +// }}} // delete_block {{{ void RemoteDFS::delete_block (messages::Message* m_, int n_channel) { auto m = dynamic_cast (m_); diff --git a/src/nodes/remotedfs.hh b/src/nodes/remotedfs.hh index b8b19283..edfd4d19 100644 --- a/src/nodes/remotedfs.hh +++ b/src/nodes/remotedfs.hh @@ -14,6 +14,7 @@ class RemoteDFS: public Router { ~RemoteDFS () = default; void insert_block (messages::Message*, int); + void update_block (messages::Message*, int); void insert_file (messages::Message*, int); void request_file (messages::Message*, int); void request_block (messages::Message*, int); From 9f6e2476643b5c6b5ef22c1133ba3ad59b0230b7 Mon Sep 17 00:00:00 2001 From: Kibeom Jin Date: Thu, 16 Jun 2016 06:59:18 +0900 Subject: [PATCH 2/3] bug fixed --- src/fs/dfs.cc | 16 ++++++++++++---- src/nodes/local_io.cc | 12 ++++++++++++ src/nodes/local_io.hh | 1 + 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/fs/dfs.cc b/src/fs/dfs.cc index 706db680..59010c1f 100644 --- a/src/fs/dfs.cc +++ b/src/fs/dfs.cc @@ -564,6 +564,7 @@ namespace eclipse{ myfile.read(buffer, new_file_size); string sbuffer(buffer); delete[] buffer; + myfile.close(); int block_seq = 0; uint64_t passed_byte = 0; @@ -579,14 +580,15 @@ namespace eclipse{ continue; } else { uint32_t hash_key = fd->hash_keys[block_seq]; - auto tmp_socket = connect(boundaries.get_index(hash_key)); + // auto tmp_socket = connect(boundaries.get_index(hash_key)); if (first_block) { first_block = false; ori_start_pos = start_offset - passed_byte; } else { ori_start_pos = 0; } - uint32_t write_length = fd->block_size[block_seq++]; + uint32_t write_length = fd->block_size[block_seq] - ori_start_pos; + block_seq++; if (to_write_byte < write_length) { final_block = true; write_length = to_write_byte; @@ -598,15 +600,21 @@ namespace eclipse{ bu.pos = ori_start_pos; bu.len = write_length; bu.content = sbuffer.substr(write_byte_cnt, write_length); + auto tmp_socket = connect(boundaries.get_index(hash_key)); send_message(tmp_socket.get(), &bu); - write_byte_cnt += write_length; + auto reply = read_reply (tmp_socket.get()); tmp_socket->close(); + if (reply->message != "OK") { + cerr << "[ERR] Failed to upload file. Details: " << reply->details << endl; + return EXIT_FAILURE; + } + write_byte_cnt += write_length; if (final_block) { break; } + to_write_byte -= write_length; } } - myfile.close(); } cout << "[INFO] " << ori_file_name << " is updated." << endl; return EXIT_SUCCESS; diff --git a/src/nodes/local_io.cc b/src/nodes/local_io.cc index 39c072fb..426783dc 100644 --- a/src/nodes/local_io.cc +++ b/src/nodes/local_io.cc @@ -39,6 +39,18 @@ std::string Local_io::read (string name) { return value; } // }}} +// pread {{{ +std::string Local_io::pread (string name, uint32_t pos, uint32_t len) { + ifstream in (disk_path + string("/") + name); + in.seekg(pos, in.beg); + char *buffer = new char[len]; + in.read(buffer, len); + string value(buffer); + delete[] buffer; + in.close(); + return value; +} +// }}} // format {{{ bool Local_io::format () { string fs_path = context.settings.get("path.scratch"); diff --git a/src/nodes/local_io.hh b/src/nodes/local_io.hh index cfd8e4b5..36ced10e 100644 --- a/src/nodes/local_io.hh +++ b/src/nodes/local_io.hh @@ -13,6 +13,7 @@ class Local_io { void write (std::string, std::string); void update (std::string, std::string, uint32_t, uint32_t); std::string read (std::string); + std::string pread (std::string, uint32_t, uint32_t); void remove (std::string); bool format (); From 8cf80c3aa615091d2d1c9cf582e1dfc08dc94c13 Mon Sep 17 00:00:00 2001 From: Kibeom Jin Date: Fri, 17 Jun 2016 16:52:57 +0900 Subject: [PATCH 3/3] comment added --- src/fs/dfs.cc | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/fs/dfs.cc b/src/fs/dfs.cc index 59010c1f..8e56fa81 100644 --- a/src/fs/dfs.cc +++ b/src/fs/dfs.cc @@ -574,25 +574,33 @@ namespace eclipse{ bool first_block = true; bool final_block = false; for (auto block_name : fd->blocks) { + // pass until find the block which has start_offset if (passed_byte + fd->block_size[block_seq] < start_offset) { passed_byte += fd->block_size[block_seq]; block_seq++; continue; } else { + // If this block is the first one of updating blocks, + // start position will be start_offset - passed_byte. + // Otherwise, start position will be 0. uint32_t hash_key = fd->hash_keys[block_seq]; - // auto tmp_socket = connect(boundaries.get_index(hash_key)); if (first_block) { first_block = false; ori_start_pos = start_offset - passed_byte; } else { ori_start_pos = 0; } + // write length means the lenght which should be repliaced in THIS block. + // to_write_byte means remaining total bytes to write + // If this block is the last one, write_length should be same as to_write_byte + // Otherwise, write_length should be same as block_size - start position uint32_t write_length = fd->block_size[block_seq] - ori_start_pos; block_seq++; if (to_write_byte < write_length) { final_block = true; write_length = to_write_byte; } + // send message BlockUpdate bu; bu.name = block_name; bu.replica = fd->replica; @@ -608,6 +616,7 @@ namespace eclipse{ cerr << "[ERR] Failed to upload file. Details: " << reply->details << endl; return EXIT_FAILURE; } + // calculate total write bytes and remaining write bytes write_byte_cnt += write_length; if (final_block) { break;