From cc705a948cf6797f669b64649605f007f6f26a42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A1bio=20Urquiza?= Date: Mon, 20 Mar 2023 10:31:33 -0300 Subject: [PATCH 1/7] Fix redis connection race condition on many workers (#666) --- src/nchan_module.c | 25 ++++-------- src/store/memory/ipc-handlers.c | 31 +++++++++++++++ src/store/memory/ipc-handlers.h | 2 + src/store/memory/memstore.c | 70 +++++++++++++++++++++++---------- 4 files changed, 89 insertions(+), 39 deletions(-) diff --git a/src/nchan_module.c b/src/nchan_module.c index ad6f3534..95b51521 100644 --- a/src/nchan_module.c +++ b/src/nchan_module.c @@ -39,7 +39,6 @@ int nchan_redis_stats_enabled = 0; static void nchan_publisher_body_handler(ngx_http_request_t *r); -static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r); //#define DEBUG_LEVEL NGX_LOG_WARN //#define DEBUG_LEVEL NGX_LOG_DEBUG @@ -746,18 +745,6 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) { return NGX_OK; } - if(cf->redis.enabled && !nchan_store_redis_ready(cf)) { - //using redis, and it's not ready yet - if(r->method == NGX_HTTP_POST || r->method == NGX_HTTP_PUT) { - //discard request body before responding - nchan_http_publisher_handler(r, nchan_publisher_unavailable_body_handler); - } - else { - nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); - } - return NGX_OK; - } - if(cf->pub.websocket || cf->pub.http) { char *err; if(!nchan_parse_message_buffer_config(r, cf, &err)) { @@ -841,6 +828,13 @@ ngx_int_t nchan_pubsub_handler(ngx_http_request_t *r) { #if FAKESHARD memstore_sub_debug_start(); #endif + + if(cf->redis.enabled && ngx_process_slot == memstore_channel_owner(channel_id) && !nchan_store_redis_ready(cf)) { + //using redis, and it's not ready yet + nchan_respond_status(r, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); + return NGX_OK; + } + if((msg_id = nchan_subscriber_get_msg_id(r)) == NULL) { goto bad_msgid; } @@ -1249,11 +1243,6 @@ static ngx_int_t nchan_publisher_body_authorize_handler(ngx_http_request_t *r, v return NGX_OK; } -static void nchan_publisher_unavailable_body_handler(ngx_http_request_t *r) { - nchan_http_finalize_request(r, NGX_HTTP_SERVICE_UNAVAILABLE); - return; -} - static void nchan_publisher_body_handler(ngx_http_request_t *r) { ngx_str_t *channel_id; nchan_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_nchan_module); diff --git a/src/store/memory/ipc-handlers.c b/src/store/memory/ipc-handlers.c index 55df2fd3..bee7c1d9 100644 --- a/src/store/memory/ipc-handlers.c +++ b/src/store/memory/ipc-handlers.c @@ -47,6 +47,8 @@ L(benchmark_finish_reply) \ L(redis_stats_request) \ L(redis_stats_reply) \ + L(redis_conn_ready) \ + L(redis_conn_ready_reply) \ @@ -1205,6 +1207,35 @@ static void receive_redis_stats_reply(ngx_int_t sender, redis_stats_request_data shm_free(nchan_store_memory_shmem, data->stats); } +////////// REDIS CONN READY DATA //////////////// +typedef struct { + ngx_int_t ready; + nchan_loc_conf_t *cf; + callback_pt callback; + void *privdata; +} redis_conn_ready_data_t; + +ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata) { + DBG("send redis_conn_ready to %i", dst); + redis_conn_ready_data_t data; + DEBUG_MEMZERO(&data); + data.ready = 0; + data.cf = cf; + data.callback = callback; + data.privdata = privdata; + + return ipc_cmd(redis_conn_ready, dst, &data); +} + +static void receive_redis_conn_ready(ngx_int_t sender, redis_conn_ready_data_t *d) { + DBG("received redis_conn_ready request for privdata %p", d->privdata); + d->ready = nchan_store_redis_ready(d->cf); + ipc_cmd(redis_conn_ready_reply, sender, d); +} + +static void receive_redis_conn_ready_reply(ngx_int_t sender, redis_conn_ready_data_t *d) { + d->callback(d->ready, NULL, d->privdata); +} #define MAKE_ipc_cmd_handler(val) [offsetof(ipc_handlers_t, val)/sizeof(ipc_handler_pt)] = (ipc_handler_pt )receive_ ## val, static ipc_handler_pt ipc_cmd_handler[] = { diff --git a/src/store/memory/ipc-handlers.h b/src/store/memory/ipc-handlers.h index e22c4a0d..7fd9c19c 100644 --- a/src/store/memory/ipc-handlers.h +++ b/src/store/memory/ipc-handlers.h @@ -26,3 +26,5 @@ ngx_int_t memstore_ipc_broadcast_benchmark_finish(void); ngx_int_t memstore_ipc_broadcast_benchmark_abort(void); ngx_int_t memstore_ipc_broadcast_redis_stats_request(void *nodeset, callback_pt cb, void *pd); + +ngx_int_t memstore_ipc_send_redis_conn_ready(ngx_int_t dst, nchan_loc_conf_t *cf, callback_pt callback, void* privdata); \ No newline at end of file diff --git a/src/store/memory/memstore.c b/src/store/memory/memstore.c index f8b4ee8b..e1e1c7a8 100755 --- a/src/store/memory/memstore.c +++ b/src/store/memory/memstore.c @@ -2079,12 +2079,13 @@ static void subscribe_data_free(subscribe_data_t *d) { #define SUB_CHANNEL_NOTSURE 2 static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d); -static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d); +static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d); +static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d); static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) { ngx_int_t owner = memstore_channel_owner(channel_id); subscribe_data_t *d = subscribe_data_alloc(sub->cf->redis.enabled ? -1 : owner); - + assert(d != NULL); d->channel_owner = owner; @@ -2095,24 +2096,51 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) d->channel_exists = 0; d->group_channel_limit_pass = 0; d->msg_id = sub->last_msgid; - - if(sub->cf->subscribe_only_existing_channel || sub->cf->max_channel_subscribers > 0) { + + if(sub->cf->redis.enabled && memstore_slot() != owner) { + ngx_int_t rc; sub->fn->reserve(sub); d->reserved = 1; - if(memstore_slot() != owner) { - ngx_int_t rc; - rc = memstore_ipc_send_channel_existence_check(owner, channel_id, sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d); - if(rc == NGX_DECLINED) { // out of memory - nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d); - return NGX_ERROR; + rc = memstore_ipc_send_redis_conn_ready(d->channel_owner, sub->cf, (callback_pt)nchan_store_subscribe_stage2, d); + if(rc == NGX_DECLINED) { // out of memory + nchan_store_subscribe_stage2(0, NULL, d); + return NGX_ERROR; + } + } else { + return nchan_store_subscribe_stage2(1, NULL, d); + } + return NGX_OK; +} + +static ngx_int_t nchan_store_subscribe_stage2(ngx_int_t continue_subscription, void* _, subscribe_data_t *d) { + if(continue_subscription) { + if(d->sub->cf->subscribe_only_existing_channel || d->sub->cf->max_channel_subscribers > 0) { + if(!d->reserved) { + d->sub->fn->reserve(d->sub); + d->reserved = 1; + } + if(memstore_slot() != d->channel_owner) { + ngx_int_t rc; + rc = memstore_ipc_send_channel_existence_check(d->channel_owner, d->channel_id, d->sub->cf, (callback_pt )nchan_store_subscribe_channel_existence_check_callback, d); + if(rc == NGX_DECLINED) { // out of memory + nchan_store_subscribe_channel_existence_check_callback(SUB_CHANNEL_UNAUTHORIZED, NULL, d); + return NGX_ERROR; + } + } + else { + return nchan_store_subscribe_stage3(SUB_CHANNEL_NOTSURE, NULL, d); } } else { - return nchan_store_subscribe_continued(SUB_CHANNEL_NOTSURE, NULL, d); + return nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } - } - else { - return nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + } else { + //using redis, and it's not ready yet + if(d->sub->fn->release(d->sub, 0) == NGX_OK) { + d->reserved = 0; + nchan_respond_status(d->sub->request, NGX_HTTP_SERVICE_UNAVAILABLE, NULL, NULL, 0); + } + subscribe_data_free(d); } return NGX_OK; } @@ -2120,7 +2148,7 @@ static ngx_int_t nchan_store_subscribe(ngx_str_t *channel_id, subscriber_t *sub) static ngx_int_t nchan_store_subscribe_channel_existence_check_callback(ngx_int_t channel_status, void* _, subscribe_data_t *d) { if(d->sub->fn->release(d->sub, 0) == NGX_OK) { d->reserved = 0; - return nchan_store_subscribe_continued(channel_status, _, d); + return nchan_store_subscribe_stage3(channel_status, _, d); } else {//don't go any further, the sub has been deleted subscribe_data_free(d); @@ -2143,7 +2171,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo /* else if (cf->max_channel_subscribers > 0) { // don't check this anymore -- a total subscribers count check is less - // useful as a per-instance check, which is handled in nchan_store_subscribe_continued + // useful as a per-instance check, which is handled in nchan_store_subscribe_stage3 // shared total subscriber count check can be re-enabled with another config setting channel_status = channel->subscribers >= cf->max_channel_subscribers ? SUB_CHANNEL_UNAUTHORIZED : SUB_CHANNEL_AUTHORIZED; } @@ -2152,7 +2180,7 @@ static ngx_int_t redis_subscribe_channel_existence_callback(ngx_int_t status, vo channel_status = SUB_CHANNEL_AUTHORIZED; } - nchan_store_subscribe_continued(channel_status, NULL, data); + nchan_store_subscribe_stage3(channel_status, NULL, data); } else { //error!! @@ -2196,7 +2224,7 @@ static ngx_int_t group_subscribe_channel_limit_reached(ngx_int_t rc, nchan_chann if(d->sub->status != DEAD) { if(chaninfo) { //ok, channel already exists. - nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } else { //nope. no channel, no subscribing. @@ -2219,14 +2247,14 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t if(shm_group) { if(!shm_group->limit.channels || (shm_group->channels < shm_group->limit.channels)) { d->group_channel_limit_pass = 1; - rc = nchan_store_subscribe_continued(SUB_CHANNEL_AUTHORIZED, NULL, d); + rc = nchan_store_subscribe_stage3(SUB_CHANNEL_AUTHORIZED, NULL, d); } else if (shm_group->limit.channels && shm_group->channels == shm_group->limit.channels){ //no new channels! rc = nchan_store_find_channel(d->channel_id, d->sub->cf, (callback_pt )group_subscribe_channel_limit_reached, d); } else { - rc = nchan_store_subscribe_continued(SUB_CHANNEL_UNAUTHORIZED, NULL, d); + rc = nchan_store_subscribe_stage3(SUB_CHANNEL_UNAUTHORIZED, NULL, d); } } @@ -2246,7 +2274,7 @@ static ngx_int_t group_subscribe_channel_limit_check(ngx_int_t _, nchan_group_t return rc; } -static ngx_int_t nchan_store_subscribe_continued(ngx_int_t channel_status, void* _, subscribe_data_t *d) { +static ngx_int_t nchan_store_subscribe_stage3(ngx_int_t channel_status, void* _, subscribe_data_t *d) { memstore_channel_head_t *chanhead = NULL; int retry_null_chanhead = 1; //store_message_t *chmsg; From 693417038c79728493ea464d7ead215c12abdd26 Mon Sep 17 00:00:00 2001 From: Leo P Date: Tue, 25 Jul 2023 10:43:20 -0400 Subject: [PATCH 2/7] fix: subscriber info for Redis 7 cluster --- src/store/redis/rdsstore.c | 2 +- .../redis/redis-lua-scripts/request_subscriber_info.lua | 7 ++++--- src/store/redis/redis_lua_commands.c | 9 +++++---- src/store/redis/redis_lua_commands.h | 4 ++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/store/redis/rdsstore.c b/src/store/redis/rdsstore.c index 9302caa0..e88024c7 100755 --- a/src/store/redis/rdsstore.c +++ b/src/store/redis/rdsstore.c @@ -2673,7 +2673,7 @@ static ngx_int_t nchan_store_request_subscriber_info(ngx_str_t *channel_id, ngx_ } node_command_time_start(node, NCHAN_REDIS_CMD_CHANNEL_REQUEST_SUBSCRIBER_INFO); - nchan_redis_script(request_subscriber_info, node, &redis_request_subscriber_info_callback, NULL, channel_id, "%i", (int )request_id); + nchan_redis_script(request_subscriber_info, node, &redis_request_subscriber_info_callback, NULL, channel_id, "%s %i", nodeset->use_spublish ? "SPUBLISH" : "PUBLISH", (int )request_id); return NGX_DONE; } diff --git a/src/store/redis/redis-lua-scripts/request_subscriber_info.lua b/src/store/redis/redis-lua-scripts/request_subscriber_info.lua index afe40905..76f6fed0 100644 --- a/src/store/redis/redis-lua-scripts/request_subscriber_info.lua +++ b/src/store/redis/redis-lua-scripts/request_subscriber_info.lua @@ -1,15 +1,16 @@ ---input: keys: [], values: [ namespace, channel_id, info_response_id ] +--input: keys: [], values: [ namespace, channel_id, publish_cmd, info_response_id ] --output: -nothing- local ns = ARGV[1] local channel_id = ARGV[2] -local response_id = tonumber(ARGV[3]) +local publish_cmd = ARGV[3] +local response_id = tonumber(ARGV[4]) local pubsub = ('%s{channel:%s}:pubsub'):format(ns, channel_id) redis.call('echo', ' ####### REQUEST_SUBSCRIBER_INFO #######') local alert_msgpack =cmsgpack.pack({"alert", "subscriber info", response_id}) -redis.call('PUBLISH', pubsub, alert_msgpack) +redis.call(publish_cmd, pubsub, alert_msgpack) return true diff --git a/src/store/redis/redis_lua_commands.c b/src/store/redis/redis_lua_commands.c index 643caa02..0e4734d9 100644 --- a/src/store/redis/redis_lua_commands.c +++ b/src/store/redis/redis_lua_commands.c @@ -832,20 +832,21 @@ redis_lua_scripts_t redis_lua_scripts = { "\n" "return {ch, new_channel}\n"}, - {"request_subscriber_info", "93c500e094dfc5364251854eeac8d4331a0223c0", - "--input: keys: [], values: [ namespace, channel_id, info_response_id ]\n" + {"request_subscriber_info", "d0c593df4182f327f432da0161f85bbf51bbad04", + "--input: keys: [], values: [ namespace, channel_id, publish_cmd, info_response_id ]\n" "--output: -nothing-\n" "\n" "local ns = ARGV[1]\n" "local channel_id = ARGV[2]\n" - "local response_id = tonumber(ARGV[3])\n" + "local publish_cmd = ARGV[3]\n" + "local response_id = tonumber(ARGV[4])\n" "local pubsub = ('%s{channel:%s}:pubsub'):format(ns, channel_id)\n" "\n" "redis.call('echo', ' ####### REQUEST_SUBSCRIBER_INFO #######')\n" "\n" "local alert_msgpack =cmsgpack.pack({\"alert\", \"subscriber info\", response_id})\n" "\n" - "redis.call('PUBLISH', pubsub, alert_msgpack)\n" + "redis.call(publish_cmd, pubsub, alert_msgpack)\n" "\n" "return true\n"}, diff --git a/src/store/redis/redis_lua_commands.h b/src/store/redis/redis_lua_commands.h index 0fa489c5..bdf54837 100644 --- a/src/store/redis/redis_lua_commands.h +++ b/src/store/redis/redis_lua_commands.h @@ -52,7 +52,7 @@ typedef struct { //output: channel_hash {ttl, time_last_subscriber_seen, subscribers, last_message_id, messages}, channel_created_just_now? redis_lua_script_t publish; - //input: keys: [], values: [ namespace, channel_id, info_response_id ] + //input: keys: [], values: [ namespace, channel_id, publish_cmd, info_response_id ] //output: -nothing- redis_lua_script_t request_subscriber_info; @@ -76,7 +76,7 @@ typedef struct { } redis_lua_scripts_t; extern redis_lua_scripts_t redis_lua_scripts; extern const int redis_lua_scripts_count; -#define REDIS_LUA_SCRIPTS_ALL_HASHES "f3b5cc02b9902e94db37949cf0eba6b3fb30376f 7bfe076302b20eeb2e5b0a325599325c96fabc80 a928e8b91abe4c7be327b4ed79a7e0b4a6a13236 0c5d0e0663393ed714801cbe68940d4c8f81e076 fb9c46d33b3798a11d4eca6e0f7a3f92beba8685 304efcd42590f99e0016686572530defd3de1383 3490d5bc3fdc7ed065d9d54b4a0cb8ad6b62c180 35696def4f2ec62f9b91a72fa17bba7a4e4cb6cf 1c68d9e05fe55e2992a917c41255f23e1330b255 93c500e094dfc5364251854eeac8d4331a0223c0 2fca046fa783d6cc25e493c993c407e59998e6e8 24643f71942769759df94b4ddfea15925612f595 51f4b6919ec97d42f5a9a7a10ee7742579b6a8f4" +#define REDIS_LUA_SCRIPTS_ALL_HASHES "f3b5cc02b9902e94db37949cf0eba6b3fb30376f 7bfe076302b20eeb2e5b0a325599325c96fabc80 a928e8b91abe4c7be327b4ed79a7e0b4a6a13236 0c5d0e0663393ed714801cbe68940d4c8f81e076 fb9c46d33b3798a11d4eca6e0f7a3f92beba8685 304efcd42590f99e0016686572530defd3de1383 3490d5bc3fdc7ed065d9d54b4a0cb8ad6b62c180 35696def4f2ec62f9b91a72fa17bba7a4e4cb6cf 1c68d9e05fe55e2992a917c41255f23e1330b255 d0c593df4182f327f432da0161f85bbf51bbad04 2fca046fa783d6cc25e493c993c407e59998e6e8 24643f71942769759df94b4ddfea15925612f595 51f4b6919ec97d42f5a9a7a10ee7742579b6a8f4" #define REDIS_LUA_SCRIPTS_COUNT 13 #define REDIS_LUA_SCRIPTS_EACH(script) \ for((script)=(redis_lua_script_t *)&redis_lua_scripts; (script) < (redis_lua_script_t *)(&redis_lua_scripts + 1); (script)++) From 7765ec14be78c32a3522c019b35b79cc1b585968 Mon Sep 17 00:00:00 2001 From: "Leo P." Date: Fri, 26 Jan 2024 02:23:25 -0500 Subject: [PATCH 3/7] fix: invalid message id may result in worker crash (expired message retrieval was at times not being handled correctly) --- dev/test.rb | 32 +++++++++++++++++++++++++++- src/store/spool.c | 54 +++++++++++++++++++++++------------------------ 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/dev/test.rb b/dev/test.rb index e0d7b7b2..4d32a1f7 100755 --- a/dev/test.rb +++ b/dev/test.rb @@ -1214,7 +1214,37 @@ def test_urlencoded_msgid sub2.terminate end - def test_invalid_etag + def test_invalid_msgid + chan_id=short_id + pub = Publisher.new url("/pub/2_sec_message_timeout/#{chan_id}"), accept: 'text/json' + + pub.post "1. one!!" + sleep 3 + + sub = Subscriber.new(url("/sub/broadcast/#{chan_id}"), 1, client: :websocket, quit_message: 'FIN', retry_delay: 1, timeout: 20) + sub.run + sleep 0.5 + + + sub2 = Subscriber.new(url("/sub/broadcast/#{chan_id}?last_event_id=1:0"), 1, client: :websocket, quit_message: 'FIN', retry_delay: 1, timeout: 20) + sub2.run + sleep 0.5 + + pub.post "2..two" + + pub.post "FIN" + sub.wait + sub2.wait + pub.messages.remove_old 1 + verify(pub, sub) + verify(pub, sub2) + sub2.terminate + sub.terminate + + end + + + def test_invalid_multi_etag chan_id=short_id pub = Publisher.new url("/pub/#{chan_id}"), accept: 'text/json' diff --git a/src/store/spool.c b/src/store/spool.c index 5dcd466c..062023e9 100755 --- a/src/store/spool.c +++ b/src/store/spool.c @@ -247,35 +247,33 @@ static ngx_int_t spool_nextmsg(subscriber_pool_t *spool, nchan_msg_id_t *new_las ERR("nextmsg id same as curmsg (%V)", msgid_to_str(&spool->id)); assert(0); } + + newspool = !immortal_spool ? find_spool(spl, &new_id) : get_spool(spl, &new_id); + + if(newspool != NULL) { + //move subs to next, already existing spool + assert(spool != newspool); + spool_transfer_subscribers(spool, newspool, 0); + if(!immortal_spool && spool->reserved == 0) destroy_spool(spool); + } else { - newspool = !immortal_spool ? find_spool(spl, &new_id) : get_spool(spl, &new_id); + //next spool doesn't exist. reuse this one as the next + ngx_rbtree_node_t *node; + assert(!immortal_spool); + node = rbtree_node_from_data(spool); + rbtree_remove_node(&spl->spoolseed, node); + nchan_copy_msg_id(&spool->id, &new_id, NULL); + rbtree_insert_node(&spl->spoolseed, node); + spool->msg_status = MSG_INVALID; + spool->msg = NULL; + newspool = spool; - if(newspool != NULL) { - //move subs to next, already existing spool - assert(spool != newspool); - spool_transfer_subscribers(spool, newspool, 0); - if(!immortal_spool && spool->reserved == 0) destroy_spool(spool); - } - else { - //next spool doesn't exist. reuse this one as the next - ngx_rbtree_node_t *node; - assert(!immortal_spool); - node = rbtree_node_from_data(spool); - rbtree_remove_node(&spl->spoolseed, node); - nchan_copy_msg_id(&spool->id, &new_id, NULL); - rbtree_insert_node(&spl->spoolseed, node); - spool->msg_status = MSG_INVALID; - spool->msg = NULL; - newspool = spool; - - /* - newspool = get_spool(spl, &new_id); - assert(spool != newspool); - spool_transfer_subscribers(spool, newspool, 0); - destroy_spool(spool); - */ - } - + /* + newspool = get_spool(spl, &new_id); + assert(spool != newspool); + spool_transfer_subscribers(spool, newspool, 0); + destroy_spool(spool); + */ if(newspool->non_internal_sub_count > 0 && spl->handlers->use != NULL) { spl->handlers->use(spl, spl->handlers_privdata); @@ -354,7 +352,7 @@ static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetc // ♫ It's gonna be the future soon ♫ if(spool->id.time == NCHAN_NTH_MSGID_TIME) { //wait for message in the NEWEST_ID spool - spool_nextmsg(spool, &latest_msg_id); + spool_nextmsg(spool, &latest_msg_id); } else { spool->msg_status = findmsg_status; From bacb7b674899e37b56a277f9d4107d1b14696cd7 Mon Sep 17 00:00:00 2001 From: "Leo P." Date: Fri, 26 Jan 2024 10:53:41 -0500 Subject: [PATCH 4/7] fix: some invalid OR expired message ids may result in worker crash --- dev/test.rb | 27 ++++++++++++++++++++++++++- src/store/spool.c | 38 +++----------------------------------- 2 files changed, 29 insertions(+), 36 deletions(-) diff --git a/dev/test.rb b/dev/test.rb index 4d32a1f7..3f4e32df 100755 --- a/dev/test.rb +++ b/dev/test.rb @@ -1214,7 +1214,7 @@ def test_urlencoded_msgid sub2.terminate end - def test_invalid_msgid + def test_expired_msgid chan_id=short_id pub = Publisher.new url("/pub/2_sec_message_timeout/#{chan_id}"), accept: 'text/json' @@ -1244,6 +1244,31 @@ def test_invalid_msgid end + def test_invalid_msgid + chan_id=short_id + + sub = Subscriber.new(url("/sub/broadcast/#{chan_id}?last_event_id=12:0"), 1, timeout: 20) + sub.run + sleep 0.2 + sub.terminate + + + sub = Subscriber.new(url("/sub/broadcast/#{chan_id}?last_event_id=1:0"), 1, quit_message: 'FIN', timeout: 20) + sub.run + sleep 0.2 + + + pub = Publisher.new url("/pub/#{chan_id}"), accept: 'text/json' + pub.post "HEY" + pub.post "FIN" + + sub.wait + + verify pub, sub + sub.terminate + end + + def test_invalid_multi_etag chan_id=short_id pub = Publisher.new url("/pub/#{chan_id}"), accept: 'text/json' diff --git a/src/store/spool.c b/src/store/spool.c index 062023e9..52da29ee 100755 --- a/src/store/spool.c +++ b/src/store/spool.c @@ -370,42 +370,10 @@ static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetc break; case MSG_NOTFOUND: - if(spl->fetching_strategy == NCHAN_SPOOL_FETCH_IGNORE_MSG_NOTFOUND) { - spool->msg_status = prev_status; - break; - } - /*fallthrough*/ + spool_nextmsg(spool, &latest_msg_id); + break; case MSG_EXPIRED: - //is this right? - //TODO: maybe message-expired notification - spool->msg_status = findmsg_status; - spool_respond_general(spool, NULL, NGX_HTTP_NO_CONTENT, NULL, 0); - nuspool = get_spool(spool->spooler, &oldest_msg_id); - if(spool != nuspool) { - spool_transfer_subscribers(spool, nuspool, 1); - if(spool->reserved == 0) { - destroy_spool(spool); - } - spool_fetch_msg(nuspool); - } - else if(spool->id.tagcount == 1 && nchan_compare_msgids(&spool->id, &oldest_msg_id) == 0) { - // oldest msgid not found or expired. that means there are no messages in this channel, - // so move these subscribers over to the current_msg_spool - nuspool = get_spool(spool->spooler, &latest_msg_id); - assert(spool != nuspool); - spool_transfer_subscribers(spool, nuspool, 1); - if(spool->reserved == 0) { - destroy_spool(spool); - } - } - else if(spool == &spool->spooler->current_msg_spool) { - //sit there and wait, i guess - spool->msg_status = MSG_EXPECTED; - } - else { - ERR("Unexpected spool == nuspool during spool fetch_msg_callback. This is weird, please report this to the developers. findmsg_status: %i", findmsg_status); - assert(0); - } + spool_nextmsg(spool, &oldest_msg_id); break; case MSG_PENDING: From 0492d01aef78d419bc0908ab4ebe08c3ac53952d Mon Sep 17 00:00:00 2001 From: Ben Naylor Date: Mon, 29 Jan 2024 08:58:29 +0100 Subject: [PATCH 5/7] Remove unused variables creating compile errors --- src/store/spool.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/store/spool.c b/src/store/spool.c index 52da29ee..95dd4e96 100755 --- a/src/store/spool.c +++ b/src/store/spool.c @@ -301,8 +301,7 @@ static ngx_int_t spool_nextmsg(subscriber_pool_t *spool, nchan_msg_id_t *new_las static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetchmsg_data_t *data) { nchan_msg_status_t findmsg_status = code; - nchan_msg_status_t prev_status; - subscriber_pool_t *spool, *nuspool; + subscriber_pool_t *spool; channel_spooler_t *spl = data->spooler; int free_msg_id = 1; @@ -335,8 +334,6 @@ static ngx_int_t spool_fetch_msg_callback(ngx_int_t code, nchan_msg_t *msg, fetc return NGX_ERROR; } - prev_status = spool->msg_status; - switch(findmsg_status) { case MSG_FOUND: spool->msg_status = findmsg_status; From bcb823029c122c471d1e1f541ad58fcc703e171c Mon Sep 17 00:00:00 2001 From: "Leo P." Date: Wed, 28 Feb 2024 17:18:18 -0500 Subject: [PATCH 6/7] node_disconnect should respect the node->connecting state and clean up the connector --- src/store/redis/redis_nodeset.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/store/redis/redis_nodeset.c b/src/store/redis/redis_nodeset.c index 2c4646c4..ef57b224 100644 --- a/src/store/redis/redis_nodeset.c +++ b/src/store/redis/redis_nodeset.c @@ -1733,9 +1733,16 @@ int node_connect(redis_node_t *node) { } int node_disconnect(redis_node_t *node, int disconnected_state) { + assert(disconnected_state <= REDIS_NODE_DISCONNECTED); + + if(node->connecting) { + node_connector_fail(node, NULL); + assert(node->state == REDIS_NODE_FAILED); //this is what node_connector_fail should have done + node->state = disconnected_state; + return 1; + } ngx_int_t prev_state = node->state; node->state = disconnected_state; - node->connecting = 0; redisAsyncContext *ac; redisContext *c; if(node->connect_timeout) { @@ -2342,6 +2349,11 @@ static void node_connector_callback(redisAsyncContext *ac, void *rep, void *priv ngx_str_t rest; redis_lua_script_t *scripts = (redis_lua_script_t *)&redis_lua_scripts; + if(!node->connecting) { + //must have been called during a disconnect cleanup callback + return; + } + switch(node->state) { case REDIS_NODE_CONNECTION_TIMED_OUT: return node_connector_fail(node, "connection timed out"); From 3995f5d07d26674f9836cb894cfd651333a7869b Mon Sep 17 00:00:00 2001 From: "Leo P." Date: Wed, 28 Feb 2024 17:19:19 -0500 Subject: [PATCH 7/7] fix: May fail to connect to Redis in regular master-slave mode when more than 1 server is specified in the upstream. --- src/store/redis/redis_nodeset.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/store/redis/redis_nodeset.c b/src/store/redis/redis_nodeset.c index ef57b224..c65c152d 100644 --- a/src/store/redis/redis_nodeset.c +++ b/src/store/redis/redis_nodeset.c @@ -1495,7 +1495,7 @@ static void node_discover_slave(redis_node_t *master, redis_connect_params_t *rc redis_node_t *slave; if((slave = nodeset_node_find_by_connect_params(master->nodeset, rcp))!= NULL) { //we know about it already - if(slave->role != REDIS_NODE_ROLE_SLAVE && slave->state > REDIS_NODE_GET_INFO) { + if(slave->role != REDIS_NODE_ROLE_SLAVE && slave->state > REDIS_NODE_GETTING_INFO) { node_log_notice(slave, "Node appears to have changed to slave -- need to update"); node_set_role(slave, REDIS_NODE_ROLE_UNKNOWN); node_disconnect(slave, REDIS_NODE_FAILED); @@ -1522,7 +1522,7 @@ static void node_discover_slave(redis_node_t *master, redis_connect_params_t *rc static void node_discover_master(redis_node_t *slave, redis_connect_params_t *rcp) { redis_node_t *master; if ((master = nodeset_node_find_by_connect_params(slave->nodeset, rcp)) != NULL) { - if(master->role != REDIS_NODE_ROLE_MASTER && master->state > REDIS_NODE_GET_INFO) { + if(master->role != REDIS_NODE_ROLE_MASTER && master->state > REDIS_NODE_GETTING_INFO) { node_log_notice(master, "Node appears to have changed to master -- need to update"); node_set_role(master, REDIS_NODE_ROLE_UNKNOWN); node_disconnect(master, REDIS_NODE_FAILED);