From b18175b6e1c54a44d50e311092dcceac4b113395 Mon Sep 17 00:00:00 2001 From: xumin Date: Wed, 15 Jan 2025 15:58:41 +0800 Subject: [PATCH] refactor(sync.v2): improve readability and debugability 1. abstract functions and reshape the code, including error handling 2. extra call to notify CP the state of sync; 3. commit DB purge before full sync; 4. more debuging logs; 5. sync once retry will not count timeout as retry, allowing DP to sync deltas right after a full sync KAG-6177 --- kong/clustering/services/sync/rpc.lua | 240 +++++++++++++++++--------- 1 file changed, 156 insertions(+), 84 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 56e2a9f1d271..929f21546a19 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -8,6 +8,8 @@ local constants = require("kong.constants") local concurrency = require("kong.concurrency") local isempty = require("table.isempty") local events = require("kong.runloop.events") +local yield = require("kong.tools.yield").yield +local build_router = require("kong.runloop.handler").build_router local insert_entity_for_txn = declarative.insert_entity_for_txn @@ -26,6 +28,7 @@ local ipairs = ipairs local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR +local ngx_INFO = ngx.INFO local ngx_DEBUG = ngx.DEBUG @@ -173,7 +176,109 @@ local function is_rpc_ready() end -local function do_sync() +local function lmdb_purge() + ngx_log(ngx_INFO, "purging the LMDB") + + local t = assert(txn.begin(512)) + t:db_drop(false) + t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH) + assert(t:commit()) -- not much we can do if this fails + + -- we are at a unready state + -- consider the config empty + assert(kong.core_cache:purge()) + assert(kong.cache:purge()) +end + + +local function reset_dp() + ngx_log(ngx_INFO, "resetting the data plane") + + lmdb_purge() + assert(build_router("init")) +end + + +local do_sync + + +local function error_handler(err, is_full_sync) + ngx_log(ngx_ERR, err) + + if is_full_sync then + -- in case of full sync, reset the DP to initial state and retry immediately + reset_dp() + return do_sync() -- retry without releasing the mutex as the DP has nothing to do except syncing + end + + return nil, err +end + + +local function lmdb_upsert(db, t, delta, opts) + local delta_type = delta.type + local delta_entity = delta.entity + + -- upsert the entity + -- does the entity already exists? + local old_entity, err = db[delta_type]:select(delta_entity) + + if err then + return nil, err + end + + if old_entity then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) + if not res then + return nil, err + end + end + + local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts) + if not res then + return nil, err + end + + return { delta_type, old_entity and "update" or "create", delta_entity, old_entity, } +end + + +local function lmdb_delete(db, t, delta, opts, is_full_sync) + local delta_type = delta.type + + -- NOTE: the first page of full sync MUST NOT contain delete events + -- assert(not t.purged, "unexpected delete event in frist page of full sync") + + local old_entity, err = db[delta_type]:select(delta.pk, opts) + if err then + return nil, err + end + + -- entity does not exist, no need to delete + if not old_entity then + return true + end + + -- delete the entity + local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) + + if is_full_sync then + local entity_not_found = err and err:match("MDB_NOTFOUND") + -- full sync needs extra tolerance for errors + if not res and not entity_not_found then + return nil, err + end + else + if not res then + return nil, err + end + end + + return { delta_type, "delete", old_entity, } +end + + +function do_sync() if not is_rpc_ready() then return nil, "rpc is not ready" end @@ -182,22 +287,21 @@ local function do_sync() local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) if not ns_deltas then - ngx_log(ngx_ERR, "sync get_delta error: ", err) - return true + return error_handler("sync get_delta error: " .. err) end -- ns_deltas should look like: - -- { default = { deltas = { ... }, wipe = true, }, } + -- { default = { deltas = { ... } }, } local ns_delta = ns_deltas.default if not ns_delta then - return nil, "default namespace does not exist inside params" + return error_handler("default namespace does not exist inside params") end local deltas = ns_delta.deltas if not deltas then - return nil, "sync get_delta error: deltas is null" + return error_handler("sync get_delta error: deltas is null") end if isempty(deltas) then @@ -221,9 +325,12 @@ local function do_sync() local t = txn.begin(512) - local wipe = ns_delta.wipe - if wipe then - t:db_drop(false) + local is_full_sync = ns_delta.wipe + if is_full_sync then + -- a sync begins + -- reset the dataplane to a clean state + ngx_log(ngx_INFO, "[kong.sync.v2] sync begins") + lmdb_purge() end local db = kong.db @@ -239,72 +346,26 @@ local function do_sync() local delta_version = delta.version local delta_type = delta.type local delta_entity = delta.entity - local ev + local delta_is_upsert = delta_entity ~= nil and delta_entity ~= ngx_null + local operation_name = delta_is_upsert and "upsert" or "delete" + local operation = delta_is_upsert and lmdb_upsert or lmdb_delete + + ngx_log(ngx_DEBUG, + "[kong.sync.v2] ", operation_name, " entity", + ", version: ", delta_version, + ", type: ", delta_type) -- delta should have ws_id to generate the correct lmdb key -- if entity is workspaceable -- set the correct workspace for item opts.workspace = delta.ws_id - if delta_entity ~= nil and delta_entity ~= ngx_null then - -- upsert the entity - -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_entity) - if err then - return nil, err - end - - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) - if not res then - return nil, err - end - end - - local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts) - if not res then - return nil, err - end - - ngx_log(ngx_DEBUG, - "[kong.sync.v2] update entity", - ", version: ", delta_version, - ", type: ", delta_type) - - -- wipe the whole lmdb, should not have events - if not wipe then - ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, } - end - - else - -- delete the entity, opts for getting correct lmdb key - local old_entity, err = db[delta_type]:select(delta.pk, opts) -- composite key - if err then - return nil, err - end - - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) - if not res then - return nil, err - end - end - - ngx_log(ngx_DEBUG, - "[kong.sync.v2] delete entity", - ", version: ", delta_version, - ", type: ", delta_type) - - -- wipe the whole lmdb, should not have events - if not wipe then - ev = { delta_type, "delete", old_entity, } - end - end -- if delta_entity ~= nil and delta_entity ~= ngx_null - - -- wipe the whole lmdb, should not have events - if not wipe then + local ev, err = operation(db, t, delta, opts) + if not ev then + return error_handler("failed to " .. operation_name .. " entity: " .. err, is_full_sync) + end + + if not is_full_sync then crud_events_n = crud_events_n + 1 crud_events[crud_events_n] = ev end @@ -323,16 +384,18 @@ local function do_sync() -- record the default workspace into LMDB for any of the following case: -- * wipe is false, but the default workspace has been changed -- * wipe is true (full sync) - if default_ws_changed or wipe then + if default_ws_changed or is_full_sync then t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace) end local ok, err = t:commit() if not ok then - return nil, err + return error_handler("failed to commit transaction: " .. err, is_full_sync) end - if wipe then + if is_full_sync then + -- all deltas are applied. Emit events to notify the changes + ngx_log(ngx_INFO, "[kong.sync.v2] sync ends") kong.core_cache:purge() kong.cache:purge() @@ -341,16 +404,12 @@ local function do_sync() -- Full sync could rebuild route, plugins and balancer route, so their -- hashes are nil. local reconfigure_data = { kong.default_workspace, nil, nil, nil, } - local ok, err = events.declarative_reconfigure_notify(reconfigure_data) - if not ok then - return nil, err - end + return events.declarative_reconfigure_notify(reconfigure_data) + end - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.entity, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) - end + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.entity, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) end return true @@ -363,9 +422,12 @@ local function sync_handler(premature) end local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, do_sync) + if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end + + return res, err end @@ -387,7 +449,7 @@ function sync_once_impl(premature, retry_count) return end - sync_handler() + local _, err = sync_handler() -- check if "kong.sync.v2.notify_new_version" updates the latest version @@ -400,17 +462,27 @@ function sync_once_impl(premature, retry_count) local current_version = get_current_version() if current_version >= latest_notified_version then ngx_log(ngx_DEBUG, "version already updated") - return + return sync_handler() -- call get_delta once more to report to the CP that we are up-to-date end -- retry if the version is not updated retry_count = retry_count or 0 + -- we do not count a timed out sync. just retry + if err ~= "timeout" then + retry_count = retry_count + 1 + end + + -- in some cases, the new spawned timer will be switched to immediately, + -- preventing the coroutine who possesses the mutex to run + -- to let other coroutines has a chance to run + yield() + if retry_count > MAX_RETRY then ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) return end - return start_sync_once_timer(retry_count + 1) + return start_sync_once_timer(retry_count) end