Skip to content

Commit

Permalink
[feat] Add support for visit pattern in LSM index class
Browse files Browse the repository at this point in the history
  • Loading branch information
gesalous committed Feb 9, 2024
1 parent acc00b1 commit 0086448
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 48 deletions.
132 changes: 91 additions & 41 deletions src/fdb5/toc/LSMIndex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

#include <assert.h>
#include <openssl/md5.h>
#include <parallax.h>
#include <signal.h>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <shared_mutex>
#include <unordered_map>
#include "ParallaxSerDes.h"
#include "eckit/config/Resource.h"
#include "eckit/io/Offset.h"
Expand All @@ -22,11 +24,12 @@
#include "fdb5/toc/BTreeIndex.h"
#include "fdb5/toc/FieldRef.h"
#include "fdb5/toc/TocIndex.h"
#include "structures.h"

#define PARALLAX_VOLUME_ENV_VAR "PARH5_VOLUME"
#define PARALLAX_VOLUME_FORMAT_ENV_VAR "PARH5_VOLUME_FORMAT"
#define PARALLAX_VOLUME_FORMAT "PARH5_VOLUME_FORMAT"
#define PARALLAX_VOLUME "par.dat"
#define PARALLAX_L0_SIZE (16 * 1024 * 1024);
#define PARALLAX_L0_SIZE (16 * 1024 * 1024UL);
#define PARALLAX_GROWTH_FACTOR 8
/* The value must be between 256 and 65535 (inclusive) */
#define PARALLAX_VOL_CONNECTOR_VALUE ((H5VL_class_value_t)12202)
Expand Down Expand Up @@ -61,13 +64,32 @@ namespace fdb5 {
//----------------------------------------------------------------------------------------------------------------------
class ParallaxStore {
const char* par_volume_name;
std::unordered_map<std::string, void*> valueMap;
mutable std::shared_mutex mapMutex;


public:
static const char* getVolumeName(void) {
static ParallaxStore& getInstance(void) {
static ParallaxStore instance;
return instance.par_volume_name;
return instance;
}

const char* getVolumeName(void) {
return par_volume_name;
}

void* getParallaxVolume(const std::string& key) {
std::shared_lock<std::shared_mutex> lock(mapMutex);
auto it = valueMap.find(key);
return (it != valueMap.end()) ? it->second : nullptr;
}

void setParallaxVolume(const std::string& key, void* value) {
std::unique_lock<std::shared_mutex> lock(mapMutex);
valueMap[key] = value;
}


// Disallow copying and assignment
ParallaxStore(const ParallaxStore&) = delete;
ParallaxStore& operator=(const ParallaxStore&) = delete;
Expand All @@ -77,8 +99,8 @@ class ParallaxStore {
if (NULL == par_volume_name)
par_volume_name = PARALLAX_VOLUME;

const char* parh5_format_volume = getenv(PARALLAX_VOLUME_FORMAT_ENV_VAR);
if (NULL != par_volume_name) {
const char* parh5_format_volume = getenv(PARALLAX_VOLUME_FORMAT);
if (NULL != parh5_format_volume && strcmp(parh5_format_volume,"ON") == 0) {
const char* error = par_format((char*)par_volume_name, 128);
if (error) {
LSM_FATAL("Failed to format volume %s", par_volume_name);
Expand All @@ -97,20 +119,34 @@ class ParallaxStore {
}
};

// class LSMIndexVisitor : public fdb5::BTreeIndexVisitor {
// public:
// virtual ~LSMIndexVisitor() override;

// virtual void visit(const std::string& key, const fdb5::FieldRef&) override {
// }
// };

class LSMIndex : public BTreeIndex {
par_handle parallax_handle;

public:
LSMIndex(const eckit::PathName& path, bool readOnly, off_t offset) {

ParallaxStore& parallax_store = ParallaxStore::getInstance();
parallax_handle = parallax_store.getParallaxVolume(path.asString());
if (parallax_handle)
return;


// create the dummy index file so fdb-hammer does not nag
std::ofstream outfile(path.asString());
// ... write to the file or do other things with it.
std::string dbName = md5(path.asString());
std::cout << "DB name is " << path.asString() << "hash name: " << dbName << std::endl;
const char* volume_name = ParallaxStore::getInstance().getVolumeName();

par_db_options db_options = {.volume_name = (char*)ParallaxStore::getVolumeName(),
par_db_options db_options = {.volume_name = (char*)volume_name,
.db_name = dbName.c_str(),
.create_flag = PAR_CREATE_DB,
.options = par_get_default_options()};
Expand All @@ -126,10 +162,11 @@ class LSMIndex : public BTreeIndex {

if (parallax_handle == NULL && error_message)
LSM_FATAL("Error uppon opening the DB, error %s", error_message);
parallax_store.setParallaxVolume(path.asString(), parallax_handle);
}

~LSMIndex() {
LSM_DEBUG("Destroying LSM index.");
// LSM_DEBUG("Destroying LSM index.");
}

bool get(const ::std::string& key, FieldRef& data) const {
Expand All @@ -139,24 +176,29 @@ class LSMIndex : public BTreeIndex {

bool set(const std::string& key, const FieldRef& data) {
// LSM_DEBUG("LSM set operation. %s", key.c_str());
// raise(SIGINT);
fdb5::ParallaxSerDes<32> serializer;
eckit::DumpLoad& baseRef = serializer;
FieldRefLocation::UriID uriId = data.uriId();
const eckit::Offset& offset = data.offset();
const eckit::Length& length = data.length();
baseRef.beginObject("test");
offset.dump(baseRef);
length.dump(baseRef);
baseRef.dump(uriId);
baseRef.endObject();
// fdb5::ParallaxSerDes<32> serializer;
// eckit::DumpLoad& baseRef = serializer;
// FieldRefLocation::UriID uriId = data.uriId();
// const eckit::Offset& offset = data.offset();
// const eckit::Length& length = data.length();
// baseRef.beginObject("test");
// offset.dump(baseRef);
// length.dump(baseRef);
// baseRef.dump(uriId);
// baseRef.endObject();
const char* error_msg = NULL;
const char* key_str = key.c_str();
par_key_value KV;
KV.k.size = strlen(key_str) + 1;
KV.k.data = key_str;
KV.v.val_size = serializer.getSize();
KV.v.val_buffer = (char*)serializer.getBuffer();
// KV.v.val_size = serializer.getSize();
// KV.v.val_buffer = (char*)serializer.getBuffer();
KV.v.val_size = sizeof(FieldRef);
// Creating a copy of FieldRef
FieldRef dataCopy = data; // Copy constructor is used here
// Obtaining a char* pointer to the copy
KV.v.val_buffer = reinterpret_cast<char*>(&dataCopy);

par_put(this->parallax_handle, &KV, &error_msg);
if (error_msg) {
std::cout << "Sorry Parallax put failed reason: " << error_msg << std ::endl;
Expand All @@ -169,25 +211,49 @@ class LSMIndex : public BTreeIndex {

void flush() {
LSM_DEBUG("LSM flush operation.");
par_sync(this->parallax_handle);
}

void sync() {
LSM_DEBUG("LSM sync operation.");
par_sync(this->parallax_handle);
}

void flock() {
LSM_DEBUG("LSM flock operation.");
// LSM_DEBUG("LSM flock operation.");
}

void funlock() {
LSM_DEBUG("LSM funlock operation.");
}

void visit(BTreeIndexVisitor& visitor) const {
LSM_DEBUG("LSM visit operation.");
char zero = 0;
struct par_key start = {.size = 1, .data = &zero};
const char* error = nullptr;
par_scanner scanner = par_init_scanner(parallax_handle, &start, PAR_GREATER_OR_EQUAL, &error);
if (error)
LSM_FATAL("Init of scanner failed");
while (par_is_valid(scanner)) {
struct par_key parallax_key = par_get_key(scanner);
struct par_value parallax_value = par_get_value(scanner);
const std::string key = std::string(parallax_key.data, parallax_key.size);
if (parallax_value.val_size != sizeof(FieldRef))
LSM_FATAL("Values of FieldRef is wrong");

// Cast parallax_value.data to a FieldRef pointer
const FieldRef* fieldRefPtr = reinterpret_cast<const FieldRef*>(parallax_value.val_buffer);

// Make a copy of the FieldRef object
FieldRef fieldRefCopy = *fieldRefPtr;
visitor.visit(key, fieldRefCopy);
par_get_next(scanner);
}
par_close_scanner(scanner);
}

void preload() {
LSM_DEBUG("PRELOAD bitches");
LSM_DEBUG("Nothing to preload here we are PARALLAX");
}

private:
Expand All @@ -208,22 +274,6 @@ class LSMIndex : public BTreeIndex {
};


class LSMIndexVisitor {
BTreeIndexVisitor& visitor_;

public:
LSMIndexVisitor(BTreeIndexVisitor& visitor) :
visitor_(visitor) {
LSM_DEBUG("Initializing Index Visitor.");
}

void clear() {
}

void push_back(const std::pair<std::string, FieldRef>& kv) {
visitor_.visit(kv.first, kv.second);
}
};

static BTreeIndexBuilder<LSMIndex> lsmIndexBuilder("LSMIndex");

Expand Down
7 changes: 4 additions & 3 deletions src/fdb5/toc/ParallaxSerDes.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#ifndef PARALLAXSERDES_H
#define PARALLAXSERDES_H
#include <unistd.h>
#include <cstring>
#include <iostream>
#include <string>
#include <unistd.h>
#include "eckit/persist/DumpLoad.h"

#define SERDES_FATAL(...) \
do { \
char buffer[1024]; \
snprintf(buffer, sizeof(buffer), __VA_ARGS__); \
::std::cout << __FILE__ << ":" << __func__ << ":" << __LINE__ << " " \
<< " FATAL(unimplemented): " << buffer << ::std::endl; \
std::cerr << __FILE__ << ":" << __func__ << ":" << __LINE__ << " " \
<< " FATAL(unimplemented): " << buffer << "\n"; \
_exit(EXIT_FAILURE); \
} while (0);

Expand Down
8 changes: 4 additions & 4 deletions src/fdb5/toc/TocStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ std::unique_ptr<FieldLocation> TocStore::archive(const Key& key, const void* dat

eckit::Offset position = dh.position();
// gesalous hack
// long len = length;
//long len = length;
long len = dh.write(data, length);

ASSERT(len == length);
Expand All @@ -72,9 +72,9 @@ std::unique_ptr<FieldLocation> TocStore::archive(const Key& key, const void* dat

void TocStore::flush() {
// gesalous
std::cout << "File: " << __FILE__
<< ", Function: " << __func__
<< ", Line: " << __LINE__ << std::endl;
// std::cout << "File: " << __FILE__
// << ", Function: " << __func__
// << ", Line: " << __LINE__ << std::endl;
if (!dirty_) {
return;
}
Expand Down

0 comments on commit 0086448

Please sign in to comment.