diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index a25441239c79..f8265788247c 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -6,6 +6,9 @@ local hooks = require("kong.hooks") local constants = require("kong.constants") +local ipairs = ipairs + + local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL @@ -45,69 +48,71 @@ end function _M:register_dao_hooks(is_cp) - if is_cp then - hooks.register_hook("dao:insert:post", function(row, name, options, ws_id) - local deltas = { - { - ["type"] = name, - id = row.id, - ws_id = ws_id, - row = row, }, - } - - local res, err = self.strategy:insert_delta(deltas) - if not res then - return nil, err - end + if not is_cp then + return + end - local latest_version = self.strategy:get_latest_version() + hooks.register_hook("dao:insert:post", function(row, name, options, ws_id) + local deltas = { + { + ["type"] = name, + id = row.id, + ws_id = ws_id, + row = row, }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + return nil, err + end - for _, node in ipairs(get_all_nodes_with_sync_cap()) do - res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) - if not res then - if not err:find("requested capability does not exist", nil, true) then - ngx.log(ngx.ERR, "unable to notify new version: ", err) - end + local latest_version = self.strategy:get_latest_version() - else - ngx.log(ngx.ERR, "notified ", node, " ", latest_version) + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx.log(ngx.ERR, "unable to notify new version: ", err) end - end - - return row, name, options, ws_id - end) - hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries) - local deltas = { - { - ["type"] = name, - id = row.id, - ws_id = ws_id, - row = ngx.null, }, - } - - local res, err = self.strategy:insert_delta(deltas) - if not res then - return nil, err + else + ngx.log(ngx.ERR, "notified ", node, " ", latest_version) end + end - local latest_version = self.strategy:get_latest_version() + return row, name, options, ws_id + end) -- dao:insert:post + + hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries) + local deltas = { + { + ["type"] = name, + id = row.id, + ws_id = ws_id, + row = ngx.null, }, + } + + local res, err = self.strategy:insert_delta(deltas) + if not res then + return nil, err + end - for _, node in ipairs(get_all_nodes_with_sync_cap()) do - res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) - if not res then - if not err:find("requested capability does not exist", nil, true) then - ngx.log(ngx.ERR, "unable to notify new version: ", err) - end + local latest_version = self.strategy:get_latest_version() - else - ngx.log(ngx.ERR, "notified ", node, " ", latest_version) + for _, node in ipairs(get_all_nodes_with_sync_cap()) do + res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version) + if not res then + if not err:find("requested capability does not exist", nil, true) then + ngx.log(ngx.ERR, "unable to notify new version: ", err) end + + else + ngx.log(ngx.ERR, "notified ", node, " ", latest_version) end + end - return row, name, options, ws_id, cascade_entries - end) - end + return row, name, options, ws_id, cascade_entries + end) -- dao:delete:post end diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index bcb49dcc1a7c..7123d31aa762 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -9,10 +9,17 @@ local constants = require("kong.constants") local concurrency = require("kong.concurrency") -local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY -local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } +local assert = assert +local tonumber = tonumber +local ipairs = ipairs +local string_format = string.format local ngx_log = ngx.log local ngx_ERR = ngx.ERR +local ngx_null = ngx.null + + +local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY +local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } function _M.new(strategy) @@ -36,13 +43,13 @@ function _M:init(manager, is_cp) last_seen = ngx.time(), hostname = node_id, ip = "127.0.0.1", - version = "3.6.0.0", + version = "3.7.0.0", -- TODO sync_status = "normal", - config_hash = string.format("%032d", version), - rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, + config_hash = string_format("%032d", version), + rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, }) if not ok then - ngx.log(ngx.ERR, "unable to update clustering data plane status: ", err) + ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err) end return self.strategy:get_delta(version) @@ -73,14 +80,14 @@ function _M:sync_once(delay) local delta, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", tonumber(declarative.get_current_hash()) or 0) if not delta then - ngx.log(ngx.ERR, "sync get_delta error: ", err) + ngx_log(ngx_ERR, "sync get_delta error: ", err) return true end local version = 0 for _, d in ipairs(delta) do - if d.row ~= ngx.null then + if d.row ~= ngx_null then assert(kong.db[d.type]:delete({ id = d.id, })) @@ -94,7 +101,7 @@ function _M:sync_once(delay) if d.version ~= version then version = d.version - assert(lmdb.set(DECLARATIVE_HASH_KEY, string.format("%032d", version))) + assert(lmdb.set(DECLARATIVE_HASH_KEY, string_format("%032d", version))) end end