Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix etcd keepalive and watcher #62

Merged
merged 7 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
- name: Install Conan
id: conan
uses: turtlebrowser/get-conan@main
env:
PIP_BREAK_SYSTEM_PACKAGES: 1

- name: Update conan profile
run: conan profile detect
Expand Down
5 changes: 3 additions & 2 deletions cpp-lib/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class PitayaCpp(ConanFile):
exports = 'version.txt'

def requirements(self):
self.requires("zlib/1.2.13")
self.requires("protobuf/3.21.12", force=True)
self.requires("zlib/1.3.1")
self.requires("protobuf/3.21.9", visible=True, force=True)
self.requires("boost/1.83.0", force=True)
self.requires("openssl/3.2.1", force=True)
self.requires("grpc/1.54.3")
Expand All @@ -44,6 +44,7 @@ def requirements(self):

def build_requirements(self):
self.tool_requires("grpc/1.54.3")
self.tool_requires("protobuf/3.21.9")

def layout(self):
cmake_layout(self, build_folder='_builds')
Expand Down
6 changes: 5 additions & 1 deletion cpp-lib/src/pitaya/c_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ CSDConfig::TryGetConfig(pitaya::EtcdServiceDiscoveryConfig& config)
config.logServerDetails = logServerDetails;
config.syncServersIntervalSec = std::chrono::seconds(syncServersIntervalSec);
config.maxNumberOfRetries = maxNumberOfRetries;
config.retryDelayMilliseconds = retryDelayMilliseconds;
if (retryDelayMilliseconds > 0) {
config.retryDelayMilliseconds = retryDelayMilliseconds;
} else {
config.retryDelayMilliseconds = 100;
}
return ParseServerTypeFilters(config.serverTypeFilters, this->serverTypeFilters);
}

Expand Down
1 change: 1 addition & 0 deletions cpp-lib/src/pitaya/etcd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class EtcdClient
virtual void CancelWatch() = 0;

virtual void LeaseKeepAlive(int64_t ttl,
int64_t leaseId,
std::function<void(std::exception_ptr)> onExit) = 0;
virtual void StopLeaseKeepAlive() = 0;
};
Expand Down
18 changes: 10 additions & 8 deletions cpp-lib/src/pitaya/etcd_client_v3.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#include "pitaya/etcd_client_v3.h"

#include "pitaya.h"

#include "spdlog/sinks/stdout_color_sinks.h"
#include "pitaya/utils.h"

using std::placeholders::_1;

Expand All @@ -13,8 +12,7 @@ EtcdClientV3::EtcdClientV3(const std::string& endpoint,
bool logHeartbeat,
std::chrono::seconds grpcTimeout,
const char* loggerName)
: _log(loggerName ? spdlog::get(loggerName)->clone("etcd_client_v3")
: spdlog::stdout_color_mt("etcd_client_v3"))
: _log(utils::CloneLoggerOrCreate(loggerName,"etcd_client_v3"))
, _endpoint(endpoint)
, _prefix(prefix)
, _client(endpoint)
Expand Down Expand Up @@ -91,9 +89,12 @@ EtcdClientV3::List(const std::string& prefix)
}

void
EtcdClientV3::LeaseKeepAlive(int64_t ttl, std::function<void(std::exception_ptr)> onExit)
EtcdClientV3::LeaseKeepAlive(int64_t ttl, int64_t leaseId, std::function<void(std::exception_ptr)> onExit)
{
etcd::KeepAlive _leaseKeepAlive(_client, onExit, ttl, ttl);

_log->info("Starting keepalive with {} seconds interval for leaseID: {}", ttl/3, leaseId );
_leaseKeepAlive = std::make_shared<etcd::KeepAlive>(_client, onExit, ttl/3, leaseId);

}

void
Expand All @@ -108,9 +109,10 @@ void
EtcdClientV3::Watch(std::function<void(WatchResponse)> onWatch)
{
try {
_log->info("Starting ETCD Watcher with {} prefix.", _prefix );
_onWatch = std::move(onWatch);
_watcher = std::unique_ptr<etcd::Watcher>(
new etcd::Watcher(_endpoint, _prefix, std::bind(&EtcdClientV3::OnWatch, this, _1)));
_watcher = std::make_shared<etcd::Watcher>(_client, _prefix, std::bind(&EtcdClientV3::OnWatch, this, _1), true);

} catch (const std::runtime_error& exc) {
throw PitayaException(exc.what());
}
Expand Down
4 changes: 3 additions & 1 deletion cpp-lib/src/pitaya/etcd_client_v3.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <etcd/Client.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/KeepAlive.hpp>
#include "spdlog/spdlog.h"

namespace pitaya {

Expand All @@ -27,6 +28,7 @@ class EtcdClientV3 : public EtcdClient
void CancelWatch() override;

void LeaseKeepAlive(int64_t ttl,
int64_t leaseId,
std::function<void(std::exception_ptr)> onExit) override;
void StopLeaseKeepAlive() override;

Expand All @@ -38,7 +40,7 @@ class EtcdClientV3 : public EtcdClient
std::string _endpoint;
std::string _prefix;
etcd::Client _client;
std::unique_ptr<etcd::Watcher> _watcher;
std::shared_ptr<etcd::Watcher> _watcher;
std::function<void(WatchResponse)> _onWatch;
std::shared_ptr<etcd::KeepAlive> _leaseKeepAlive;
};
Expand Down
15 changes: 9 additions & 6 deletions cpp-lib/src/pitaya/etcdv3_service_discovery/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Worker::Worker(const EtcdServiceDiscoveryConfig& config,
, _server(std::move(server))
, _etcdClient(std::move(etcdClient))
, _log(utils::CloneLoggerOrCreate(loggerName, kLogTag))
, _numKeepAliveRetriesLeft(5)
, _numKeepAliveRetriesLeft(_config.maxNumberOfRetries)
, _syncServersTicker(config.syncServersIntervalSec, std::bind(&Worker::SyncServers, this)) {

if (_config.logServerSync) {
Expand Down Expand Up @@ -124,6 +124,7 @@ Worker::StartThread()
switch (job.info) {
case JobInfo::SyncServers: {
_log->debug("Will synchronize servers");


ListResponse res = _etcdClient->List(_config.etcdPrefix + "servers/metagame/");

Expand Down Expand Up @@ -221,7 +222,7 @@ Worker::StartThread()

while (_numKeepAliveRetriesLeft > 0) {
_log->info("ETCD retries left: {}", _numKeepAliveRetriesLeft);
auto delay_milliseconds = 300 << (5 - _numKeepAliveRetriesLeft);
auto delay_milliseconds = _config.retryDelayMilliseconds << (_config.maxNumberOfRetries - _numKeepAliveRetriesLeft);
_log->info("ETCD retry waiting for {}ms", delay_milliseconds);
std::this_thread::sleep_for(std::chrono::milliseconds(delay_milliseconds));

Expand Down Expand Up @@ -276,8 +277,8 @@ Worker::StartLeaseKeepAlive()
return;
}

_etcdClient->LeaseKeepAlive(_config.heartbeatTTLSec.count(), [this](std::exception_ptr exc) {
try {
std::function<void (std::exception_ptr)> handler = [this](std::exception_ptr exc) {
try {
if (exc) {
_log->error("lease keep alive failed!");
std::lock_guard<decltype(_jobQueue)> lock(_jobQueue);
Expand All @@ -291,7 +292,9 @@ Worker::StartLeaseKeepAlive()
} catch(const std::out_of_range& e) {
_log->error("ETCD Lease expired: {}", e.what());
}
});
};
_etcdClient->LeaseKeepAlive(_config.heartbeatTTLSec.count(), _leaseId, handler);

}

bool
Expand All @@ -302,7 +305,7 @@ Worker::Init()
_log->error("Service discovery bootstrap failed");
return false;
}

StartLeaseKeepAlive();
_syncServersTicker.Start();
return true;
Expand Down
Loading