Skip to content

Commit

Permalink
Map and serialize every endpoint connection
Browse files Browse the repository at this point in the history
Note: a single connection can listen to a maximum of 1024 streams
  • Loading branch information
mussonero committed Oct 16, 2020
1 parent 2851b5f commit 3282989
Showing 1 changed file with 149 additions and 91 deletions.
240 changes: 149 additions & 91 deletions src/binance_websocket.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
Author: tensaix2j
Date : 2017/10/15
C++ library for Binance API.
*/

Expand All @@ -12,14 +12,11 @@
#include <libwebsockets.h>
#include <map>
#include <csignal>
#include <cassert>

using namespace binance;
using namespace std;

static struct lws_context *context;
static map<lws*, CB> handles;
static lws_sorted_usec_list_t _sul;
static atomic<int> lws_service_cancelled(0);
void connect_client(lws_sorted_usec_list_t *sul);

Expand All @@ -28,13 +25,17 @@ void connect_client(lws_sorted_usec_list_t *sul);
* the connection bound to it
*/
struct endpoint_connection {
lws_sorted_usec_list_t sul; /* schedule connection retry */
lws_sorted_usec_list_t _sul; /* schedule connection retry */
struct lws *wsi; /* related wsi if any */
uint16_t retry_count; /* count of consequetive retries */
lws* conn;
CB callback_jason_func;
CB json_cb;
char* ws_path;
} endpoint_prop;
};

static std::map<int,int> concurrent;
static std::map<int, endpoint_connection> endpoints_prop;
static pthread_mutex_t lock_concurrent; /* serialize access */

/*
* The retry and backoff policy we want to use for our client connections
Expand Down Expand Up @@ -86,17 +87,24 @@ const struct lws_extension extensions[] = {

int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
struct endpoint_connection *endpoint_prop = (struct endpoint_connection *)user;
int m;

switch (reason)
{
case LWS_CALLBACK_CLIENT_ESTABLISHED :
lwsl_user("%s: established\n", __func__);
lws_callback_on_writable(wsi);
endpoint_prop->wsi = wsi;
case LWS_CALLBACK_CLIENT_ESTABLISHED :
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
lws_callback_on_writable(wsi);
m = n.second;
endpoints_prop[m].wsi = wsi;
lwsl_user("%s: connection established with success concurrent:%d ws_path::%s\n",
__func__, n.second, endpoints_prop[n.second].ws_path);
break;
}
}
break;

case LWS_CALLBACK_CLIENT_RECEIVE :
case LWS_CALLBACK_CLIENT_RECEIVE :
{
// Handle incomming messages here.
try
Expand All @@ -106,24 +114,40 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
Json::Value json_result;
reader.parse(str_result , json_result);

if (handles.find(wsi) != handles.end())
handles[wsi](json_result);
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
m = n.second;
endpoints_prop[n.second].json_cb(json_result);
endpoints_prop[m].retry_count = 0;
lwsl_user("%s: incomming messages from %s\n",
__func__ , endpoints_prop[n.second].ws_path);
break;
}
}
}
catch (exception &e)
{
Logger::write_log("<binance::Websocket::event_cb> Error parsing incoming message : %s\n", e.what());
return 1;
Logger::write_log("<binance::Websocket::event_cb> Error parsing incoming message : %s\n", e.what());
return 1;
}
}
break;
break;

case LWS_CALLBACK_CLIENT_WRITEABLE :
break;
case LWS_CALLBACK_CLIENT_WRITEABLE :
break;

case LWS_CALLBACK_CLOSED :
goto do_retry;
lwsl_err("CALLBACK_CLOSED: %s\n",
in ? (char *)in : "");
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
m = n.second;
goto do_retry;
}
}
break;

case LWS_CALLBACK_GET_THREAD_ID:
case LWS_CALLBACK_GET_THREAD_ID:
{
#ifdef __APPLE__
// On OS X pthread_threadid_np() is used, as pthread_self() returns a structure.
Expand All @@ -138,19 +162,28 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
break;

case LWS_CALLBACK_CLIENT_CONNECTION_ERROR :
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
in ? (char *)in : "(null)");
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
lws_cancel_service(lws_get_context(wsi));
atomic_store(&lws_service_cancelled, 1);
return -1;
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
atomic_store(&lws_service_cancelled, 1);
endpoints_prop.erase(n.second);
concurrent.erase(n.second);
lwsl_err("CLIENT_CONNECTION_ERROR Unknown WIS: %s\n",
in ? (char *)in : "(null)");
return -1;
}
}
break;

case LWS_CALLBACK_CLIENT_CLOSED:
lwsl_err("CLIENT_CALLBACK_CLIENT_CLOSED: %s\n",
in ? (char *)in : "");
goto do_retry;
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
m = n.second;
goto do_retry;
}
}
break;

default :
// Make compiler happy regarding unhandled enums.
Expand All @@ -161,16 +194,25 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s

do_retry:
try{
if (lws_retry_sul_schedule_retry_wsi(wsi, &endpoint_prop->sul, connect_client,
&endpoint_prop->retry_count))
if (lws_retry_sul_schedule_retry_wsi(endpoints_prop[m].wsi, &endpoints_prop[m]._sul, connect_client,
&endpoints_prop[m].retry_count))
{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
lws_cancel_service(lws_get_context(wsi));
lwsl_err("%s: connection attempts exhausted\n", __func__);
atomic_store(&lws_service_cancelled, 1);
return -1;
if(endpoints_prop[m].retry_count > (LWS_ARRAY_SIZE(backoff_ms))){
endpoints_prop.erase(m);
concurrent.erase(m);
atomic_store(&lws_service_cancelled, 1);
return -1;
}
{
lwsl_err("%s: connection attempts exhausted,we will keep retrying count:%d ws_path:%s\n",
__func__, endpoints_prop[m].retry_count, endpoints_prop[m].ws_path);
atomic_store(&lws_service_cancelled, 0);
lws_sul_schedule(lws_get_context(endpoints_prop[m].wsi), 0, &endpoints_prop[m]._sul, connect_client, 10 * LWS_US_PER_SEC);
return 0;
}
}
lwsl_user("%s: connection attempts success, retrying count:%d ws_path:%s\n",
__func__, endpoints_prop[m].retry_count, endpoints_prop[m].ws_path);
}catch (exception &e)
{
Logger::write_log("<binance::Websocket::event_cb> Error do_retry message : %s\n", e.what());
Expand Down Expand Up @@ -205,53 +247,54 @@ sigint_handler(int sig)
*/
void connect_client(lws_sorted_usec_list_t *sul)
{
struct endpoint_connection *endpoint_prop = lws_container_of(&_sul, struct endpoint_connection, sul);
struct lws_client_connect_info ccinfo;

memset(&ccinfo, 0, sizeof(ccinfo));

ccinfo.context = context;
ccinfo.port = BINANCE_WS_PORT;
ccinfo.address = BINANCE_WS_HOST;
ccinfo.path = endpoint_prop->ws_path;
ccinfo.host = lws_canonical_hostname(context);
ccinfo.origin = "origin";
ccinfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
ccinfo.protocol = protocols[0].name;
ccinfo.local_protocol_name = protocols[0].name;
ccinfo.retry_and_idle_policy = &retry;
ccinfo.userdata = endpoint_prop;
/*
* We store the new wsi here early in the connection process,
* this gives the callback a way to identify which wsi faced the error
* even before the new wsi is returned and even if ultimately no wsi is returned.
*/
ccinfo.pwsi = &endpoint_prop->wsi;

endpoint_prop->conn = lws_client_connect_via_info(&ccinfo);
if (!endpoint_prop->conn)
{
/*
* Failed... schedule a retry... we can't use the _retry_wsi()
* convenience wrapper api here because no valid wsi at this
* point.
*/
if (lws_retry_sul_schedule(context, 0, sul, &retry,
connect_client, &endpoint_prop->retry_count))
{
lwsl_err("%s: connection attempts exhausted\n", __func__);
atomic_store(&lws_service_cancelled, 1);
assert(endpoint_prop->wsi);
if ((endpoint_prop->wsi) && handles.find(endpoint_prop->wsi) != handles.end())
handles.erase(endpoint_prop->wsi);
return;
}
else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
for (std::pair<int, int> n : concurrent) {
if (&endpoints_prop[n.second]._sul == sul) {
lwsl_user("%s: success ws_path::%s\n",
__func__, endpoints_prop[n.second].ws_path);
struct lws_client_connect_info ccinfo;

memset(&ccinfo, 0, sizeof(ccinfo));

ccinfo.context = context;
ccinfo.port = BINANCE_WS_PORT;
ccinfo.address = BINANCE_WS_HOST;
ccinfo.path = endpoints_prop[n.second].ws_path;
ccinfo.host = lws_canonical_hostname(context);
ccinfo.origin = "origin";
ccinfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_PIPELINE;
ccinfo.protocol = protocols[0].name;
ccinfo.local_protocol_name = protocols[0].name;
ccinfo.retry_and_idle_policy = &retry;
ccinfo.userdata = &endpoints_prop[n.second];
/*
* We store the new wsi here early in the connection process,
* this gives the callback a way to identify which wsi faced the error
* even before the new wsi is returned and even if ultimately no wsi is returned.
*/
ccinfo.pwsi = &endpoints_prop[n.second].wsi;

endpoints_prop[n.second].conn = lws_client_connect_via_info(&ccinfo);
if (!endpoints_prop[n.second].conn)
{
/*
* Failed... schedule a retry... we can't use the _retry_wsi()
* convenience wrapper api here because no valid wsi at this
* point.
*/
if (lws_retry_sul_schedule(context, 0, &endpoints_prop[n.second]._sul, &retry,
connect_client, &endpoints_prop[n.second].retry_count))
{
lwsl_err("%s: connection attempts exhausted\n", __func__);
endpoints_prop.erase(n.second);
concurrent.erase(n.second);
atomic_store(&lws_service_cancelled, 1);
return;
}
}
break;
}
}else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}

}

void binance::Websocket::init()
Expand Down Expand Up @@ -282,13 +325,27 @@ void binance::Websocket::init()
// Register call backs
void binance::Websocket::connect_endpoint(CB cb, const char* path)
{
struct endpoint_connection *endpoint_prop = lws_container_of(&_sul, struct endpoint_connection, sul);
endpoint_prop->ws_path = const_cast<char *>(path);
endpoint_prop->callback_jason_func = cb;
connect_client(&_sul);
pthread_mutex_lock(&lock_concurrent);
if(concurrent.size() > 1024){
lwsl_err("%s: maximum of 1024 connect_endpoints reached,\n",
__func__);
pthread_mutex_unlock(&lock_concurrent);
return;
}
int n = concurrent.size();
concurrent.emplace(std::pair<int,int>(n,n));
endpoints_prop[n].ws_path = const_cast<char *>(path);
endpoints_prop[n].json_cb = cb;
connect_client(&endpoints_prop[n]._sul);

if (!lws_service_cancelled)
/* schedule the first client connection attempt to happen immediately */
lws_sul_schedule(context, 0, &endpoint_prop->sul, connect_client, 1);
{
/* schedule the first client connection attempt to happen immediately */
lws_sul_schedule(context, 0, &endpoints_prop[n]._sul, connect_client, 1);
lwsl_user("%s: concurrent:%d ws_path::%s\n",
__func__, n, endpoints_prop[n].ws_path);
}
pthread_mutex_unlock(&lock_concurrent);
}

// Entering event loop
Expand All @@ -306,7 +363,8 @@ void binance::Websocket::enter_event_loop(std::chrono::hours hours)
}
} while (n >= 0 && std::chrono::steady_clock::now() < end);

atomic_store(&lws_service_cancelled, 0);
concurrent.clear();
atomic_store(&lws_service_cancelled, 1);

lws_context_destroy(context);
}
}

0 comments on commit 3282989

Please sign in to comment.