Skip to content

Commit

Permalink
Eliminate eviction process; use same connection (#1495)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakeSiFive authored Dec 20, 2023
1 parent 9f596a5 commit 33032fe
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 166 deletions.
131 changes: 21 additions & 110 deletions src/job_cache/daemon_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,9 @@ struct CacheDbImpl {
OutputSymlinks output_symlinks;
Transaction transact;
SelectMatchingJobs matching_jobs;
std::unique_ptr<EvictionPolicy> policy;

CacheDbImpl(const std::string &_dir)
CacheDbImpl(EvictionConfig config, const std::string &_dir)
: db(std::make_unique<job_cache::Database>(_dir)),
jobs(db),
input_files(db),
Expand All @@ -663,7 +664,21 @@ struct CacheDbImpl {
output_dirs(db),
output_symlinks(db),
transact(db),
matching_jobs(db) {}
matching_jobs(db) {
switch (config.type) {
case EvictionPolicyType::TTL:
wcl::log::info("Using TTL eviction policy, seconds_to_live = %lu",
config.ttl.seconds_to_live)();
policy = std::make_unique<TTLEvictionPolicy>(config.ttl.seconds_to_live);
break;
case EvictionPolicyType::LRU:
wcl::log::info("Using LRU eviction policy, low = %lu, max = %lu", config.lru.low_size,
config.lru.max_size)();
policy = std::make_unique<LRUEvictionPolicy>(config.lru.low_size, config.lru.max_size);
break;
}
policy->init(db, _dir);
}
};

DaemonCache::DaemonCache(std::string dir, std::string bulk_dir, EvictionConfig config)
Expand All @@ -679,9 +694,7 @@ DaemonCache::DaemonCache(std::string dir, std::string bulk_dir, EvictionConfig c
key = rng.unique_name();
listen_socket_fd = create_cache_socket(".", key);

impl = std::make_unique<CacheDbImpl>(".");

launch_evict_loop();
impl = std::make_unique<CacheDbImpl>(config, ".");
}

int DaemonCache::run() {
Expand Down Expand Up @@ -983,17 +996,7 @@ FindJobResponse DaemonCache::read(const FindJobRequest &find_request) {
redirect_path(input_dir);
}

EvictionCommand cmd(EvictionCommandType::Read, job_id);

std::string msg = cmd.serialize();
msg += '\0';

wcl::log::info("Sending Read command to eviction loop")();
if (write(evict_stdin, msg.data(), msg.size()) == -1) {
wcl::log::warning("Failed to send eviction update: %s", strerror(errno))();
} else {
wcl::log::info("Successfully sent eviction the job")();
}
impl->policy->read(job_id);

return FindJobResponse(wcl::make_some<MatchingJob>(std::move(result)));
}
Expand Down Expand Up @@ -1078,102 +1081,10 @@ void DaemonCache::add(const AddJobRequest &add_request) {
std::string job_dir = wcl::join_paths(job_group_dir, std::to_string(job_id));
rename_no_fail(tmp_job_dir.c_str(), job_dir.c_str());

EvictionCommand cmd(EvictionCommandType::Write, job_id);

std::string msg = cmd.serialize();
msg += '\0';

wcl::log::info("Sending Write command to eviction loop")();
if (write(evict_stdin, msg.data(), msg.size()) == -1) {
wcl::log::warning("Failed to send eviction update: %s", strerror(errno))();
} else {
wcl::log::info("Sucessfully sent eviction add update")();
}
}

void DaemonCache::launch_evict_loop() {
const size_t read_side = 0;
const size_t write_side = 1;

int stdinPipe[2];
int stdoutPipe[2];

if (pipe(stdinPipe) < 0) {
wcl::log::error("Failed to allocate eviction pipe: %s", strerror(errno)).urgent()();
exit(1);
}

if (pipe(stdoutPipe) < 0) {
wcl::log::error("Failed to allocate eviction pipe: %s", strerror(errno)).urgent()();
exit(1);
}

int pid = fork();

// error forking
if (pid < 0) {
wcl::log::error("Failed to fork eviction process: %s", strerror(errno)).urgent()();
exit(1);
}

// child
if (pid == 0) {
if (dup2(stdinPipe[read_side], STDIN_FILENO) == -1) {
wcl::log::error("Failed to dup2 stdin pipe for eviction process: %s", strerror(errno))
.urgent()();
exit(1);
}

if (dup2(stdoutPipe[write_side], STDOUT_FILENO) == -1) {
wcl::log::error("Failed to dup2 stdin pipe for eviction process: %s", strerror(errno))
.urgent()();
exit(1);
}

close(stdinPipe[read_side]);
close(stdinPipe[write_side]);
close(stdoutPipe[read_side]);
close(stdoutPipe[write_side]);

wcl::log::info("Launching eviction loop")();

// Finally enter the eviction loop, if it exits cleanly
// go ahead and exit with its result.
std::unique_ptr<EvictionPolicy> policy;
switch (config.type) {
case EvictionPolicyType::TTL:
wcl::log::info("Using TTL eviction policy, seconds_to_live = %lu",
config.ttl.seconds_to_live)();
policy = std::make_unique<TTLEvictionPolicy>(config.ttl.seconds_to_live);
break;
case EvictionPolicyType::LRU:
wcl::log::info("Using LRU eviction policy, low = %lu, max = %lu", config.lru.low_size,
config.lru.max_size)();
policy = std::make_unique<LRUEvictionPolicy>(config.lru.low_size, config.lru.max_size);
break;
}
int result = eviction_loop(".", std::move(policy));
exit(result);
}

// parent
if (pid > 0) {
close(stdinPipe[read_side]);
close(stdoutPipe[write_side]);

evict_pid = pid;
evict_stdin = stdinPipe[write_side];
evict_stdout = stdoutPipe[read_side];
}
}

void DaemonCache::reap_evict_loop() {
close(evict_stdin);
close(evict_stdout);
waitpid(evict_pid, nullptr, 0);
impl->policy->write(job_id);
}

DaemonCache::~DaemonCache() { reap_evict_loop(); }
DaemonCache::~DaemonCache() {}

void DaemonCache::handle_new_client() {
// Accept the new client socket. We accept as non-blocking so that we can
Expand Down
3 changes: 0 additions & 3 deletions src/job_cache/daemon_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ class DaemonCache {
std::unordered_map<int, MessageSender> message_senders;
bool exit_now = false;

void launch_evict_loop();
void reap_evict_loop();

FindJobResponse read(const FindJobRequest &find_request);
void add(const AddJobRequest &add_request);
void remove_corrupt_job(int64_t job_id);
Expand Down
50 changes: 6 additions & 44 deletions src/job_cache/eviction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,10 @@ static void garbage_collect_orphan_folders(std::shared_ptr<job_cache::Database>
}
}

void LRUEvictionPolicy::init(const std::string& cache_dir) {
std::shared_ptr<job_cache::Database> db = std::make_unique<job_cache::Database>(cache_dir);
void LRUEvictionPolicy::init(std::shared_ptr<job_cache::Database> db,
const std::string& cache_dir) {
impl = std::make_unique<LRUEvictionPolicyImpl>(cache_dir, db);

// To keep this thread alive, we assign it to a static thread object.
// This starts the collection but if the programs ends so too will this thread.
gc_thread = std::thread(garbage_collect_orphan_folders, db);
garbage_collect_orphan_folders(db);
}

void LRUEvictionPolicy::read(int job_id) { impl->mark_new_use(job_id); }
Expand All @@ -427,14 +424,11 @@ LRUEvictionPolicy::LRUEvictionPolicy(uint64_t low, uint64_t max)

LRUEvictionPolicy::~LRUEvictionPolicy() {}

void TTLEvictionPolicy::init(const std::string& cache_dir) {
std::shared_ptr<job_cache::Database> db = std::make_unique<job_cache::Database>(cache_dir);
void TTLEvictionPolicy::init(std::shared_ptr<job_cache::Database> db,
const std::string& cache_dir) {
impl = std::make_unique<TTLEvictionPolicyImpl>(cache_dir, db);
impl->cleanup(seconds_to_live);

// To keep this thread alive, we assign it to a static thread object.
// This starts the collection but if the programs ends so too will this thread.
gc_thread = std::thread(garbage_collect_orphan_folders, db);
garbage_collect_orphan_folders(db);
}

void TTLEvictionPolicy::read(int job_id) {}
Expand All @@ -453,36 +447,4 @@ TTLEvictionPolicy::TTLEvictionPolicy(uint64_t seconds_to_live) : seconds_to_live

TTLEvictionPolicy::~TTLEvictionPolicy() {}

int eviction_loop(const std::string& cache_dir, std::unique_ptr<EvictionPolicy> policy) {
policy->init(cache_dir);

// Famous last words "a timeout of a year is plenty"
MessageParser msg_parser(STDIN_FILENO, 60 * 60 * 24 * 30 * 12);
MessageParserState state;
do {
std::vector<std::string> msgs;
state = msg_parser.read_messages(msgs);

for (const auto& m : msgs) {
auto cmd = EvictionCommand::parse(m);
if (!cmd) {
exit(EXIT_FAILURE);
}

switch (cmd->type) {
case EvictionCommandType::Read:
policy->read(cmd->job_id);
break;
case EvictionCommandType::Write:
policy->write(cmd->job_id);
break;
default:
exit(EXIT_FAILURE);
}
}
} while (state == MessageParserState::Continue);

exit(state == MessageParserState::StopSuccess ? EXIT_SUCCESS : EXIT_FAILURE);
}

} // namespace job_cache
16 changes: 7 additions & 9 deletions src/job_cache/eviction_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@
#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include "job_cache/db_helpers.h"

namespace job_cache {

struct EvictionPolicy {
virtual void init(const std::string& cache_dir) = 0;
virtual void init(std::shared_ptr<job_cache::Database> db, const std::string& cache_dir) = 0;
virtual void read(int id) = 0;
virtual void write(int id) = 0;
virtual ~EvictionPolicy() {}
};

struct NilEvictionPolicy : EvictionPolicy {
virtual void init(const std::string& cache_dir) override {
virtual void init(std::shared_ptr<job_cache::Database> db,
const std::string& cache_dir) override {
std::cerr << "NilEvictionPolicy::init()" << std::endl;
}

Expand All @@ -56,7 +58,6 @@ class LRUEvictionPolicy : public EvictionPolicy {
std::unique_ptr<LRUEvictionPolicyImpl> impl;
uint64_t max_cache_size;
uint64_t low_cache_size;
std::thread gc_thread;

public:
explicit LRUEvictionPolicy(uint64_t low_cache_size, uint64_t max_cache_size);
Expand All @@ -65,7 +66,7 @@ class LRUEvictionPolicy : public EvictionPolicy {
LRUEvictionPolicy(LRUEvictionPolicy&&) = delete;
virtual ~LRUEvictionPolicy();

virtual void init(const std::string& cache_dir) override;
virtual void init(std::shared_ptr<job_cache::Database> db, const std::string& cache_dir) override;

virtual void read(int id) override;

Expand All @@ -78,7 +79,6 @@ class TTLEvictionPolicy : public EvictionPolicy {
// We need to touch the database so we use pimpl to hide the implementation
std::unique_ptr<TTLEvictionPolicyImpl> impl;
uint64_t seconds_to_live;
std::thread gc_thread;

public:
explicit TTLEvictionPolicy(uint64_t seconds_to_live);
Expand All @@ -87,13 +87,11 @@ class TTLEvictionPolicy : public EvictionPolicy {
TTLEvictionPolicy(TTLEvictionPolicy&&) = delete;
virtual ~TTLEvictionPolicy();

virtual void init(const std::string& cache_dir) override;
virtual void init(std::shared_ptr<job_cache::Database> db, const std::string& cache_dir) override;

virtual void read(int id) override;

virtual void write(int id) override;
};

int eviction_loop(const std::string& cache_dir, std::unique_ptr<EvictionPolicy> policy);

} // namespace job_cache

0 comments on commit 33032fe

Please sign in to comment.