Skip to content

Commit

Permalink
Fix etcd keepalive and watcher (#62)
Browse files Browse the repository at this point in the history
* Fix etcd keepalive

* fix leasId var name

* Fix keepalive thread being implicitly destroyed, fix tests

* [skip ci] add windows binary

* Fix etcd watcher and reconnection delay timer

* Fix broken CI builds

* [skip ci] add windows binary
  • Loading branch information
rsafonseca authored Jul 9, 2024
1 parent 4e736a6 commit 7d01490
Show file tree
Hide file tree
Showing 18 changed files with 67 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
- name: Install Conan
id: conan
uses: turtlebrowser/get-conan@main
env:
PIP_BREAK_SYSTEM_PACKAGES: 1

- name: Update conan profile
run: conan profile detect
Expand Down
5 changes: 3 additions & 2 deletions cpp-lib/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class PitayaCpp(ConanFile):
exports = 'version.txt'

def requirements(self):
self.requires("zlib/1.2.13")
self.requires("protobuf/3.21.12", force=True)
self.requires("zlib/1.3.1")
self.requires("protobuf/3.21.9", visible=True, force=True)
self.requires("boost/1.83.0", force=True)
self.requires("openssl/3.2.1", force=True)
self.requires("grpc/1.54.3")
Expand All @@ -44,6 +44,7 @@ def requirements(self):

def build_requirements(self):
self.tool_requires("grpc/1.54.3")
self.tool_requires("protobuf/3.21.9")

def layout(self):
cmake_layout(self, build_folder='_builds')
Expand Down
6 changes: 5 additions & 1 deletion cpp-lib/src/pitaya/c_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ CSDConfig::TryGetConfig(pitaya::EtcdServiceDiscoveryConfig& config)
config.logServerDetails = logServerDetails;
config.syncServersIntervalSec = std::chrono::seconds(syncServersIntervalSec);
config.maxNumberOfRetries = maxNumberOfRetries;
config.retryDelayMilliseconds = retryDelayMilliseconds;
if (retryDelayMilliseconds > 0) {
config.retryDelayMilliseconds = retryDelayMilliseconds;
} else {
config.retryDelayMilliseconds = 100;
}
return ParseServerTypeFilters(config.serverTypeFilters, this->serverTypeFilters);
}

Expand Down
1 change: 1 addition & 0 deletions cpp-lib/src/pitaya/etcd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class EtcdClient
virtual void CancelWatch() = 0;

virtual void LeaseKeepAlive(int64_t ttl,
int64_t leaseId,
std::function<void(std::exception_ptr)> onExit) = 0;
virtual void StopLeaseKeepAlive() = 0;
};
Expand Down
18 changes: 10 additions & 8 deletions cpp-lib/src/pitaya/etcd_client_v3.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "pitaya/etcd_client_v3.h"

#include "pitaya.h"

#include "spdlog/sinks/stdout_color_sinks.h"
#include "pitaya/utils.h"

using std::placeholders::_1;

Expand All @@ -13,8 +12,7 @@ EtcdClientV3::EtcdClientV3(const std::string& endpoint,
bool logHeartbeat,
std::chrono::seconds grpcTimeout,
const char* loggerName)
: _log(loggerName ? spdlog::get(loggerName)->clone("etcd_client_v3")
: spdlog::stdout_color_mt("etcd_client_v3"))
: _log(utils::CloneLoggerOrCreate(loggerName,"etcd_client_v3"))
, _endpoint(endpoint)
, _prefix(prefix)
, _client(endpoint)
Expand Down Expand Up @@ -91,9 +89,12 @@ EtcdClientV3::List(const std::string& prefix)
}

void
EtcdClientV3::LeaseKeepAlive(int64_t ttl, std::function<void(std::exception_ptr)> onExit)
EtcdClientV3::LeaseKeepAlive(int64_t ttl, int64_t leaseId, std::function<void(std::exception_ptr)> onExit)
{
etcd::KeepAlive _leaseKeepAlive(_client, onExit, ttl, ttl);

_log->info("Starting keepalive with {} seconds interval for leaseID: {}", ttl/3, leaseId );
_leaseKeepAlive = std::make_shared<etcd::KeepAlive>(_client, onExit, ttl/3, leaseId);

}

void
Expand All @@ -108,9 +109,10 @@ void
EtcdClientV3::Watch(std::function<void(WatchResponse)> onWatch)
{
try {
_log->info("Starting ETCD Watcher with {} prefix.", _prefix );
_onWatch = std::move(onWatch);
_watcher = std::unique_ptr<etcd::Watcher>(
new etcd::Watcher(_endpoint, _prefix, std::bind(&EtcdClientV3::OnWatch, this, _1)));
_watcher = std::make_shared<etcd::Watcher>(_client, _prefix, std::bind(&EtcdClientV3::OnWatch, this, _1), true);

} catch (const std::runtime_error& exc) {
throw PitayaException(exc.what());
}
Expand Down
4 changes: 3 additions & 1 deletion cpp-lib/src/pitaya/etcd_client_v3.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <etcd/Client.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/KeepAlive.hpp>
#include "spdlog/spdlog.h"

namespace pitaya {

Expand All @@ -27,6 +28,7 @@ class EtcdClientV3 : public EtcdClient
void CancelWatch() override;

void LeaseKeepAlive(int64_t ttl,
int64_t leaseId,
std::function<void(std::exception_ptr)> onExit) override;
void StopLeaseKeepAlive() override;

Expand All @@ -38,7 +40,7 @@ class EtcdClientV3 : public EtcdClient
std::string _endpoint;
std::string _prefix;
etcd::Client _client;
std::unique_ptr<etcd::Watcher> _watcher;
std::shared_ptr<etcd::Watcher> _watcher;
std::function<void(WatchResponse)> _onWatch;
std::shared_ptr<etcd::KeepAlive> _leaseKeepAlive;
};
Expand Down
15 changes: 9 additions & 6 deletions cpp-lib/src/pitaya/etcdv3_service_discovery/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Worker::Worker(const EtcdServiceDiscoveryConfig& config,
, _server(std::move(server))
, _etcdClient(std::move(etcdClient))
, _log(utils::CloneLoggerOrCreate(loggerName, kLogTag))
, _numKeepAliveRetriesLeft(5)
, _numKeepAliveRetriesLeft(_config.maxNumberOfRetries)
, _syncServersTicker(config.syncServersIntervalSec, std::bind(&Worker::SyncServers, this)) {

if (_config.logServerSync) {
Expand Down Expand Up @@ -124,6 +124,7 @@ Worker::StartThread()
switch (job.info) {
case JobInfo::SyncServers: {
_log->debug("Will synchronize servers");


ListResponse res = _etcdClient->List(_config.etcdPrefix + "servers/metagame/");

Expand Down Expand Up @@ -221,7 +222,7 @@ Worker::StartThread()

while (_numKeepAliveRetriesLeft > 0) {
_log->info("ETCD retries left: {}", _numKeepAliveRetriesLeft);
auto delay_milliseconds = 300 << (5 - _numKeepAliveRetriesLeft);
auto delay_milliseconds = _config.retryDelayMilliseconds << (_config.maxNumberOfRetries - _numKeepAliveRetriesLeft);
_log->info("ETCD retry waiting for {}ms", delay_milliseconds);
std::this_thread::sleep_for(std::chrono::milliseconds(delay_milliseconds));

Expand Down Expand Up @@ -276,8 +277,8 @@ Worker::StartLeaseKeepAlive()
return;
}

_etcdClient->LeaseKeepAlive(_config.heartbeatTTLSec.count(), [this](std::exception_ptr exc) {
try {
std::function<void (std::exception_ptr)> handler = [this](std::exception_ptr exc) {
try {
if (exc) {
_log->error("lease keep alive failed!");
std::lock_guard<decltype(_jobQueue)> lock(_jobQueue);
Expand All @@ -291,7 +292,9 @@ Worker::StartLeaseKeepAlive()
} catch(const std::out_of_range& e) {
_log->error("ETCD Lease expired: {}", e.what());
}
});
};
_etcdClient->LeaseKeepAlive(_config.heartbeatTTLSec.count(), _leaseId, handler);

}

bool
Expand All @@ -302,7 +305,7 @@ Worker::Init()
_log->error("Service discovery bootstrap failed");
return false;
}

StartLeaseKeepAlive();
_syncServersTicker.Start();
return true;
Expand Down
Loading

0 comments on commit 7d01490

Please sign in to comment.