Skip to content

Commit

Permalink
Merge pull request #107 from DICL/refactor-update
Browse files Browse the repository at this point in the history
Refactor update
  • Loading branch information
vicentebolea authored Jun 17, 2016
2 parents 33b61d6 + da78eff commit 765bb0e
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ bin_PROGRAMS = eclipse_node dfs
messages_files = src/messages/boundaries.cc \
src/messages/message.cc \
src/messages/keyvalue.cc \
src/messages/offsetkv.cc \
src/messages/keyrequest.cc \
src/messages/control.cc \
src/messages/boost_impl.cc \
src/messages/factory.cc \
src/messages/fileinfo.cc \
src/messages/blockinfo.cc \
src/messages/blockupdate.cc \
src/messages/task.cc \
src/messages/reply.cc \
src/messages/filerequest.cc \
Expand Down
121 changes: 114 additions & 7 deletions src/fs/dfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,10 @@ namespace eclipse{
return EXIT_SUCCESS;
}

int DFS::partial_get(int argc, char* argv[]) {
int DFS::pget(int argc, char* argv[]) {
string file_name = "";
if (argc < 5) {
cout << "[INFO] dfs partial_get file_name start_offset read_byte" << endl;
cout << "[INFO] dfs pget file_name start_offset read_byte" << endl;
return EXIT_FAILURE;
} else {
Histogram boundaries(NUM_NODES, 0);
Expand Down Expand Up @@ -474,7 +474,7 @@ namespace eclipse{
cerr << "[ERR] Wrong read byte." << endl;
return EXIT_FAILURE;
}
string outfile = "partial_" + file_name;
string outfile = "p_" + file_name;
ofstream f(outfile);
int block_seq = 0;
uint64_t passed_byte = 0;
Expand Down Expand Up @@ -502,12 +502,12 @@ namespace eclipse{
} else {
start_pos = 0;
}
string sub_str = block_content.substr(start_pos);
if (read_byte_cnt + sub_str.length() > read_byte) {
uint32_t read_length = block_content.length();
if (read_byte_cnt + read_length > read_byte) {
final_block = true;
uint32_t sub_length = read_byte - read_byte_cnt;
sub_str = block_content.substr(start_pos, sub_length);
read_length = read_byte - read_byte_cnt;
}
string sub_str = msg->content.substr(start_pos, read_length);
f << sub_str;
read_byte_cnt += sub_str.length();
tmp_socket->close();
Expand All @@ -521,4 +521,111 @@ namespace eclipse{
cout << "[INFO] " << file_name << " is read." << endl;
return EXIT_SUCCESS;
}

int DFS::update(int argc, char* argv[]) {
string ori_file_name = "";
if (argc < 5) {
cout << "[INFO] dfs update original_file new_file start_offset" << endl;
return EXIT_FAILURE;
} else {
Histogram boundaries(NUM_NODES, 0);
boundaries.initialize();

ori_file_name = argv[2];
string new_file_name = argv[3];
uint64_t start_offset = (uint64_t)atoi(argv[4]);
uint32_t file_hash_key = h(ori_file_name);
auto socket = connect (file_hash_key);
FileExist fe;
fe.name = ori_file_name;
send_message(socket.get(), &fe);
auto rep = read_reply<Reply> (socket.get());

if (rep->message != "TRUE") {
cerr << "[ERR] " << ori_file_name << " doesn't exist." << endl;
return EXIT_FAILURE;
}
FileRequest fr;
fr.name = ori_file_name;

ifstream myfile(new_file_name);
myfile.seekg(0, myfile.end);
uint64_t new_file_size = myfile.tellg();

send_message(socket.get(), &fr);
auto fd = read_reply<FileDescription> (socket.get());
socket->close();
if (start_offset + new_file_size > fd->size) {
cerr << "[ERR] Wrong file size." << endl;
return EXIT_FAILURE;
}
myfile.seekg(0, myfile.beg);
char *buffer = new char[new_file_size];
myfile.read(buffer, new_file_size);
string sbuffer(buffer);
delete[] buffer;
myfile.close();

int block_seq = 0;
uint64_t passed_byte = 0;
uint64_t write_byte_cnt = 0;
uint32_t ori_start_pos = 0;
uint32_t to_write_byte = new_file_size;
bool first_block = true;
bool final_block = false;
for (auto block_name : fd->blocks) {
// pass until find the block which has start_offset
if (passed_byte + fd->block_size[block_seq] < start_offset) {
passed_byte += fd->block_size[block_seq];
block_seq++;
continue;
} else {
// If this block is the first one of updating blocks,
// start position will be start_offset - passed_byte.
// Otherwise, start position will be 0.
uint32_t hash_key = fd->hash_keys[block_seq];
if (first_block) {
first_block = false;
ori_start_pos = start_offset - passed_byte;
} else {
ori_start_pos = 0;
}
// write length means the lenght which should be repliaced in THIS block.
// to_write_byte means remaining total bytes to write
// If this block is the last one, write_length should be same as to_write_byte
// Otherwise, write_length should be same as block_size - start position
uint32_t write_length = fd->block_size[block_seq] - ori_start_pos;
block_seq++;
if (to_write_byte < write_length) {
final_block = true;
write_length = to_write_byte;
}
// send message
BlockUpdate bu;
bu.name = block_name;
bu.replica = fd->replica;
bu.hash_key = hash_key;
bu.pos = ori_start_pos;
bu.len = write_length;
bu.content = sbuffer.substr(write_byte_cnt, write_length);
auto tmp_socket = connect(boundaries.get_index(hash_key));
send_message(tmp_socket.get(), &bu);
auto reply = read_reply<Reply> (tmp_socket.get());
tmp_socket->close();
if (reply->message != "OK") {
cerr << "[ERR] Failed to upload file. Details: " << reply->details << endl;
return EXIT_FAILURE;
}
// calculate total write bytes and remaining write bytes
write_byte_cnt += write_length;
if (final_block) {
break;
}
to_write_byte -= write_length;
}
}
}
cout << "[INFO] " << ori_file_name << " is updated." << endl;
return EXIT_SUCCESS;
}
}
3 changes: 2 additions & 1 deletion src/fs/dfs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "../messages/factory.hh"
#include "../messages/fileinfo.hh"
#include "../messages/blockinfo.hh"
#include "../messages/blockupdate.hh"
#include "../messages/fileexist.hh"
#include "../messages/filerequest.hh"
#include "../messages/filelist.hh"
Expand Down Expand Up @@ -57,7 +58,7 @@ namespace eclipse {
int rm(int argc, char* argv[]);
int format(int argc, char* argv[]);
int show(int argc, char* argv[]);
int partial_get(int argc, char* argv[]);
int pget(int argc, char* argv[]);
int update(int argc, char* argv[]);
};
}
28 changes: 16 additions & 12 deletions src/fs/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,42 @@ int main(int argc, char* argv[]) {
string op = argv[1];
if (op.compare("put") == 0) {
dfs.put(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("get") == 0) {
dfs.get(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("cat") == 0) {
dfs.cat(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("ls") == 0) {
dfs.ls(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("rm") == 0) {
dfs.rm(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("format") == 0) {
dfs.format(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("show") == 0) {
dfs.show(argc, argv);
return 0;
return EXIT_SUCCESS;
}
else if (op.compare("partial_get") == 0) {
dfs.DFS::partial_get(argc, argv);
return 0;
else if (op.compare("pget") == 0) {
dfs.DFS::pget(argc, argv);
return EXIT_SUCCESS;
}
else if (op.compare("update") == 0) {
dfs.DFS::update(argc, argv);
return EXIT_SUCCESS;
}
}
cerr << "[ERR] Unknown operation" << endl;
cout << "[INFO] dfs put|get|cat|ls|rm|format|partial_get" << endl;
return -1;
cout << "[INFO] dfs put|get|cat|ls|rm|format|pget|update" << endl;
return EXIT_FAILURE;
}
5 changes: 5 additions & 0 deletions src/messages/blockupdate.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "blockupdate.hh"

using namespace eclipse::messages;

std::string BlockUpdate::get_type() const { return "BlockUpdate"; }
18 changes: 18 additions & 0 deletions src/messages/blockupdate.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once
#include "message.hh"
#include <cstdint>

namespace eclipse {
namespace messages {
struct BlockUpdate: public Message {
std::string get_type() const override;

std::string name;
uint32_t hash_key;
int replica;
uint32_t pos;
uint32_t len;
std::string content;
};
}
}
2 changes: 2 additions & 0 deletions src/messages/boost_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
//! 4) Also here
BOOST_CLASS_EXPORT(eclipse::messages::Boundaries);
BOOST_CLASS_EXPORT(eclipse::messages::KeyValue);
BOOST_CLASS_EXPORT(eclipse::messages::OffsetKeyValue);
BOOST_CLASS_EXPORT(eclipse::messages::Control);
BOOST_CLASS_EXPORT(eclipse::messages::KeyRequest);
BOOST_CLASS_EXPORT(eclipse::messages::Task);
BOOST_CLASS_EXPORT(eclipse::messages::FileInfo);
BOOST_CLASS_EXPORT(eclipse::messages::FileList);
BOOST_CLASS_EXPORT(eclipse::messages::BlockInfo);
BOOST_CLASS_EXPORT(eclipse::messages::BlockUpdate);
BOOST_CLASS_EXPORT(eclipse::messages::Reply);
BOOST_CLASS_EXPORT(eclipse::messages::CacheInfo);
BOOST_CLASS_EXPORT(eclipse::messages::FileRequest);
Expand Down
25 changes: 25 additions & 0 deletions src/messages/boost_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "filedel.hh"
#include "formatrequest.hh"
#include "fileexist.hh"
#include "offsetkv.hh"
#include "blockupdate.hh"

#include <boost/serialization/export.hpp>
#include <boost/serialization/access.hpp>
Expand Down Expand Up @@ -62,6 +64,16 @@ template <typename Archive>
ar & BOOST_SERIALIZATION_NVP(k.value);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::OffsetKeyValue& k, unsigned int) {
ar & BASE_OBJECT(Message, k);
ar & BOOST_SERIALIZATION_NVP(k.key);
ar & BOOST_SERIALIZATION_NVP(k.name);
ar & BOOST_SERIALIZATION_NVP(k.value);
ar & BOOST_SERIALIZATION_NVP(k.pos);
ar & BOOST_SERIALIZATION_NVP(k.len);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::Control& c, unsigned int) {
ar & BASE_OBJECT(Message, c);
Expand Down Expand Up @@ -102,6 +114,17 @@ template <typename Archive>
ar & BOOST_SERIALIZATION_NVP(c.content);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::BlockUpdate& c, unsigned int) {
ar & BASE_OBJECT(Message, c);
ar & BOOST_SERIALIZATION_NVP(c.name);
ar & BOOST_SERIALIZATION_NVP(c.hash_key);
ar & BOOST_SERIALIZATION_NVP(c.replica);
ar & BOOST_SERIALIZATION_NVP(c.pos);
ar & BOOST_SERIALIZATION_NVP(c.len);
ar & BOOST_SERIALIZATION_NVP(c.content);
}

template <typename Archive>
void serialize (Archive& ar, eclipse::messages::Task& c, unsigned int) {
ar & BASE_OBJECT(Message, c);
Expand Down Expand Up @@ -189,10 +212,12 @@ BOOST_SERIALIZATION_ASSUME_ABSTRACT(eclipse::messages::Message);
BOOST_CLASS_TRACKING(eclipse::messages::Message, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::Boundaries, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::KeyValue, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::OffsetKeyValue, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::Control, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::KeyRequest, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::FileInfo, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::BlockInfo, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::BlockUpdate, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::Task, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::FileList, boost::serialization::track_never);
BOOST_CLASS_TRACKING(eclipse::messages::Reply, boost::serialization::track_never);
Expand Down
11 changes: 11 additions & 0 deletions src/messages/offsetkv.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include "offsetkv.hh"

namespace eclipse {
namespace messages {

OffsetKeyValue::OffsetKeyValue (uint32_t k, std::string n, std::string v, uint32_t p, uint32_t l) : key(k), name(n), value(v), pos(p), len(l) { }

std::string OffsetKeyValue::get_type() const { return "OffsetKeyValue"; }

}
}
20 changes: 20 additions & 0 deletions src/messages/offsetkv.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include "message.hh"
#include <string>

namespace eclipse {
namespace messages {

struct OffsetKeyValue: public Message {
OffsetKeyValue () = default;
OffsetKeyValue (uint32_t, std::string, std::string, uint32_t, uint32_t);

std::string get_type() const override;
uint32_t key;
std::string name, value;
uint32_t pos, len;
};

}
}
Loading

0 comments on commit 765bb0e

Please sign in to comment.