Skip to content

Commit

Permalink
minor style clean
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Jul 4, 2024
1 parent 78fef43 commit 2692ea9
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 60 deletions.
107 changes: 56 additions & 51 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ local hooks = require("kong.hooks")
local constants = require("kong.constants")


local ipairs = ipairs


local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL


Expand Down Expand Up @@ -45,69 +48,71 @@ end


function _M:register_dao_hooks(is_cp)
if is_cp then
hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end
if not is_cp then
return
end

local latest_version = self.strategy:get_latest_version()
hooks.register_hook("dao:insert:post", function(row, name, options, ws_id)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = row, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
local latest_version = self.strategy:get_latest_version()

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
end

return row, name, options, ws_id
end)

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

local latest_version = self.strategy:get_latest_version()
return row, name, options, ws_id
end) -- dao:insert:post

hooks.register_hook("dao:delete:post", function(row, name, options, ws_id, cascade_entries)
local deltas = {
{
["type"] = name,
id = row.id,
ws_id = ws_id,
row = ngx.null, },
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
return nil, err
end

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end
local latest_version = self.strategy:get_latest_version()

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
for _, node in ipairs(get_all_nodes_with_sync_cap()) do
res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", latest_version)
if not res then
if not err:find("requested capability does not exist", nil, true) then
ngx.log(ngx.ERR, "unable to notify new version: ", err)
end

else
ngx.log(ngx.ERR, "notified ", node, " ", latest_version)
end
end

return row, name, options, ws_id, cascade_entries
end)
end
return row, name, options, ws_id, cascade_entries
end) -- dao:delete:post
end


Expand Down
25 changes: 16 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ local constants = require("kong.constants")
local concurrency = require("kong.concurrency")


local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local assert = assert
local tonumber = tonumber
local ipairs = ipairs
local string_format = string.format
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_null = ngx.null


local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


function _M.new(strategy)
Expand All @@ -36,13 +43,13 @@ function _M:init(manager, is_cp)
last_seen = ngx.time(),
hostname = node_id,
ip = "127.0.0.1",
version = "3.6.0.0",
version = "3.7.0.0", -- TODO
sync_status = "normal",
config_hash = string.format("%032d", version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
config_hash = string_format("%032d", version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
})
if not ok then
ngx.log(ngx.ERR, "unable to update clustering data plane status: ", err)
ngx_log(ngx_ERR, "unable to update clustering data plane status: ", err)
end

return self.strategy:get_delta(version)
Expand Down Expand Up @@ -73,14 +80,14 @@ function _M:sync_once(delay)
local delta, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
tonumber(declarative.get_current_hash()) or 0)
if not delta then
ngx.log(ngx.ERR, "sync get_delta error: ", err)
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

local version = 0

for _, d in ipairs(delta) do
if d.row ~= ngx.null then
if d.row ~= ngx_null then
assert(kong.db[d.type]:delete({
id = d.id,
}))
Expand All @@ -94,7 +101,7 @@ function _M:sync_once(delay)

if d.version ~= version then
version = d.version
assert(lmdb.set(DECLARATIVE_HASH_KEY, string.format("%032d", version)))
assert(lmdb.set(DECLARATIVE_HASH_KEY, string_format("%032d", version)))
end
end

Expand Down

0 comments on commit 2692ea9

Please sign in to comment.