Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering): introduce incremental sync for clustering #13157

Merged
merged 163 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
163 commits
Select commit Hold shift + click to select a range
937f118
Revert "fix(rpc): disable cluster_rpc for 3.7"
dndx May 16, 2024
a07e83b
WIP incremental sync
dndx Jun 3, 2024
fb278b8
add migration to luarocks file
dndx Jun 4, 2024
d80109e
fix bugs in RPC handler
dndx Jun 4, 2024
883a531
fix pagination bug
dndx Jun 4, 2024
2e2c6bf
feat(dao): support diff transaction (#13586)
chronolaw Sep 5, 2024
7b3391f
WIP
dndx Sep 5, 2024
63e202b
refactored import logic so it can be shared by DB-less import and
dndx Sep 9, 2024
fd22f06
add update and upsert support
dndx Sep 10, 2024
a290b60
fixed default workspace recording issue
dndx Sep 10, 2024
e5a140e
fix incorrect type for select_by_name
dndx Sep 10, 2024
84439b4
fix(pdk/vault): handle crud events for dbless (#13663)
chronolaw Sep 20, 2024
5b1463d
lint fix
chronolaw Sep 20, 2024
96f4242
migration test
chronolaw Sep 20, 2024
e62f918
migration test
chronolaw Sep 20, 2024
ab4ecd2
pdk/vault.lua
chronolaw Sep 20, 2024
a0a8148
fix spec/01-unit/01-db/10-declarative_spec.lua
chronolaw Sep 20, 2024
8486b70
spec/01-unit/01-db/04-dao_spec.lua
chronolaw Sep 20, 2024
dcf81f6
fix spec/02-integration/20-wasm/10-wasmtime_spec.lua
chronolaw Sep 20, 2024
f30b373
kong.clustering.tls only in http subsystem
chronolaw Sep 20, 2024
c400726
if not exist in 024_370_to_380.lua
chronolaw Sep 20, 2024
87236d8
kong.default_workspace
chronolaw Sep 21, 2024
8450026
sync/hooks.lua clean
chronolaw Sep 21, 2024
68d5e65
sync/init.lua clean
chronolaw Sep 21, 2024
d3703e9
sync/rpc.lua clean
chronolaw Sep 21, 2024
df4c97f
sync/strategies/postgres.lua clean
chronolaw Sep 21, 2024
ee162d7
kong/runloop/events.lua clean
chronolaw Sep 21, 2024
14c899b
schema/others/declarative_config.lua clean
chronolaw Sep 21, 2024
87e9d54
try to check is_foreign
chronolaw Sep 21, 2024
0cbc301
tmp fix 01-unit/01-db/10-declarative_spec.lua
chronolaw Sep 21, 2024
8702c64
workspaces returns {id = lmdb.get}
chronolaw Sep 23, 2024
d4582f6
encode_base64 last_key
chronolaw Sep 23, 2024
29013d5
enable kong.clustering in init.lua
chronolaw Sep 23, 2024
f2ca3e2
enable kong.clustering # 2 in init.lua
chronolaw Sep 23, 2024
67ee03f
some todo comments
chronolaw Sep 24, 2024
55da472
save dp ipaddr in rpc manager
chronolaw Sep 24, 2024
a8e96aa
dedicated_config_processing = "off" in tests
chronolaw Sep 24, 2024
2d56cf8
clean
chronolaw Sep 24, 2024
5db5100
enable_privileged_agent
chronolaw Sep 24, 2024
70b3c00
kong.core_cache
chronolaw Sep 24, 2024
368e7c0
conf.cluster_incremental_sync
chronolaw Sep 24, 2024
806cbde
migrations/core/024_380_to_390
chronolaw Sep 24, 2024
5477bd7
01-unit/01-db/11-declarative_lmdb_spec.lua
chronolaw Sep 24, 2024
7495125
tmp pass 11-declarative_lmdb_spec.lua
chronolaw Sep 24, 2024
fe1f256
09-hybrid_mode/03-pki_spec.lua
chronolaw Sep 24, 2024
b7df082
test lint
chronolaw Sep 25, 2024
d59f48b
clustering init
chronolaw Sep 25, 2024
89f427d
18-hybrid_rpc
chronolaw Sep 25, 2024
73a6880
fix foreign value key
chronolaw Sep 25, 2024
9faaa48
detect_changes correctly
chronolaw Sep 25, 2024
a1c0765
fix insert_entity_for_txn
chronolaw Sep 25, 2024
b72ea77
fix 08-lazy_export_spec.lua
chronolaw Sep 25, 2024
2991bf7
skip 09-hybrid_mode/12-errors_spec.lua
chronolaw Sep 25, 2024
861c69b
skip 09-hybrid_mode/09-config-compat_spec.lua
chronolaw Sep 25, 2024
062bc48
add ttl for v2.get_delta
chronolaw Sep 26, 2024
0357020
fix some in 09-hybrid_mode/01-sync_spec.lua
chronolaw Sep 26, 2024
f777362
skip 02-cmd/14-vault_spec.lua
chronolaw Sep 26, 2024
4988a0a
get_default_workspace, fix 02-cmd/14-vault_spec.lua
chronolaw Sep 26, 2024
4345d33
rename to UNINIT_WORKSPACE_ID
chronolaw Sep 26, 2024
1fa9fd5
fix 07-sdk/03-cluster_spec.lua
chronolaw Sep 26, 2024
8ca8b52
20-wasm/06-clustering_spec.lua add inc_sync
chronolaw Sep 26, 2024
8b300a4
09-hybrid_mode/11-status_spec.lua add inc_sync
chronolaw Sep 26, 2024
c37cc57
clean get_default_workspace
chronolaw Sep 26, 2024
0e5eb56
skip some flaky cases
chronolaw Sep 26, 2024
9d58c61
skip_inc_sync for some tests
chronolaw Sep 26, 2024
c6b324d
skip another test
chronolaw Sep 27, 2024
57a8da6
09-key-auth/04-hybrid_mode_spec.lua
chronolaw Sep 27, 2024
d7bf07c
init events in rpc.concentrator
chronolaw Sep 27, 2024
80b41c7
skip 09-hybrid_mode/10-forward-proxy_spec.lua
chronolaw Sep 27, 2024
bbdf0aa
skip 09-hybrid_mode/09-node-id-persistence_spec.lua
chronolaw Sep 27, 2024
04cae45
skip 09-hybrid_mode/01-sync_spec.lua
chronolaw Sep 27, 2024
91ed117
try to skip 20-wasm/10-wasmtime_spec.lua
chronolaw Sep 27, 2024
e00406a
inc_sync on 05-ocsp_spec.lua
chronolaw Sep 27, 2024
2009e11
inc_sync on 13-deprecations_spec.lua
chronolaw Sep 27, 2024
20057c4
skip flaky 04-hybrid_mode_spec.lua
chronolaw Sep 27, 2024
f25066d
skip 11-correlation-id/02-schema_spec.lua
chronolaw Sep 27, 2024
3e46c4b
setenv in 20-wasm/10-wasmtime_spec.lua
chronolaw Sep 27, 2024
db2db1d
setenv in 20-wasm/10-wasmtime_spec.lua
chronolaw Sep 27, 2024
90da6d8
events init in sync service
chronolaw Sep 27, 2024
ec8e2f1
code clean
chronolaw Sep 27, 2024
620714c
dbless(no dp) no events
chronolaw Sep 27, 2024
2521605
rpc.get_peer_ip
chronolaw Sep 28, 2024
1bb8043
clean kong/db/strategies/off/init.lua
chronolaw Sep 28, 2024
8a762c4
fix spec/02-integration/02-cmd/14-vault_spec.lua
chronolaw Sep 28, 2024
8a95acc
Revert "fix spec/02-integration/02-cmd/14-vault_spec.lua"
chronolaw Sep 28, 2024
a37c1d8
fix sync rpc mistake
chronolaw Sep 28, 2024
cb87516
Revert "Revert "fix spec/02-integration/02-cmd/14-vault_spec.lua""
chronolaw Sep 28, 2024
8e08356
clean declarative_config.lua
chronolaw Sep 29, 2024
59d2cbc
clean 18-hybrid_rpc/01-rpc_spec.lua
chronolaw Sep 29, 2024
cd20aa1
18-hybrid_rpc/03-inert_spec.lua rpc_capabilities
chronolaw Sep 29, 2024
9577c39
20-wasm/10-wasmtime_spec.lua
chronolaw Sep 29, 2024
6e5cf2c
try to fix 10-wasmtime_spec.lua
chronolaw Sep 29, 2024
771a69d
Revert "try to fix 10-wasmtime_spec.lua"
chronolaw Sep 29, 2024
bae8a2f
fix 04-admin_api/15-off_spec.lua
chronolaw Sep 29, 2024
508f051
skip 10-wasmtime_spec.lua
chronolaw Sep 29, 2024
cbeb927
log lvl in hooks.lua
chronolaw Sep 29, 2024
745bef2
refactor hook funcs
chronolaw Sep 29, 2024
6d80187
clean rpc.sync.init
chronolaw Sep 29, 2024
3c4c4b0
clean sync/rpc.lua
chronolaw Sep 29, 2024
b8f3bd4
clean sync/rpc.lua #2
chronolaw Sep 29, 2024
ba9af4c
clean sync/strategies/postgres.lua
chronolaw Sep 29, 2024
0457da9
clean hooks.lua
chronolaw Sep 29, 2024
1b0316d
conf loader init
chronolaw Sep 29, 2024
170bb71
fix mistake in hooks.lua
chronolaw Sep 29, 2024
e017d0f
clean strategies/off/init.lua
chronolaw Sep 29, 2024
9197188
lmdb.get may be nil
chronolaw Sep 29, 2024
04fc10e
Revert "migrations/core/024_380_to_390"
chronolaw Sep 29, 2024
3728ff6
only init hooks in cp
chronolaw Sep 29, 2024
b5c1b34
clean declarative/import.lua
chronolaw Sep 30, 2024
7d4444d
try to preserve nulls in entity
chronolaw Sep 30, 2024
0842d09
check null when inserting txn
chronolaw Sep 30, 2024
ca9c6c6
small style
chronolaw Sep 30, 2024
7cb9a22
refactor _set_entity_for_txn()
chronolaw Sep 30, 2024
ba10aac
clean 01-db/10-declarative_spec.lua
chronolaw Sep 30, 2024
875bae7
09-hybrid_mode/05-ocsp_spec.lua
chronolaw Sep 30, 2024
952b4ea
clean hooks.lua
chronolaw Sep 30, 2024
1c6e565
clean rpc.lua
chronolaw Sep 30, 2024
1b17440
fix delete event in rpc.lua
chronolaw Sep 30, 2024
635c79e
Revert "fix delete event in rpc.lua"
chronolaw Sep 30, 2024
a8e7f40
clean rpc.lua
chronolaw Sep 30, 2024
8c83e15
clean hooks.lua
chronolaw Oct 2, 2024
d25167b
spec/02-integration/04-admin_api/15-off_spec.lua
chronolaw Oct 2, 2024
b3ebbdf
applay chobits suggestions
chronolaw Oct 8, 2024
bea39b8
disable inc_sync for testing
chronolaw Oct 8, 2024
2f53772
fix(incremental): reduce the use of timers (#13732)
chobits Oct 8, 2024
a06885b
fix 04-admin_api/15-off_spec.lua
chronolaw Oct 9, 2024
63eff2e
typo fix in test
chronolaw Oct 9, 2024
edefc69
handler.lua style clean
chronolaw Oct 9, 2024
8230d22
fix incorrect return value from Workspaces:select_by_name (#13733)
chobits Oct 10, 2024
3fd0d9e
default ws_id if not exists in lmdb
chronolaw Oct 10, 2024
45ba99b
lint fix
chronolaw Oct 10, 2024
dd1e217
tests(helpers): fixed incorrect conf.database setting in get_db_utils
chobits Oct 10, 2024
9262391
remove incorrect comment
chronolaw Oct 10, 2024
a4c6860
remove unused param `new_version`
chronolaw Oct 10, 2024
b47d302
comment change
chronolaw Oct 10, 2024
ce075cd
some comments
chronolaw Oct 11, 2024
04aa262
clean rpc.lua
chronolaw Oct 11, 2024
98f86e5
dont return the default value from select_by_name for dbless mode (#1…
chobits Oct 11, 2024
6aaf011
check get_latest_version()
chronolaw Oct 11, 2024
c22b78e
return default workspace constant value if master could not get it fr…
chobits Oct 11, 2024
64f500c
code clean: start_sync_timer
chronolaw Oct 12, 2024
072df3c
small clean
chronolaw Oct 12, 2024
faa01fe
clean sync postgres
chronolaw Oct 12, 2024
33554f6
clean import.lua
chronolaw Oct 12, 2024
6d34229
fix strategies/postgres.lua
chronolaw Oct 12, 2024
3307076
clean import.lua
chronolaw Oct 12, 2024
1dea181
restore pagesize in db conncetor
chronolaw Oct 12, 2024
ffc4fa0
fix(tests): 10-wasmtime_spec.lua: enable inc_sync
chobits Oct 12, 2024
b41d1a8
refactor _set_entity_for_txn() in import.lua
chronolaw Oct 12, 2024
7aae9bd
Revert "restore pagesize in db conncetor"
chronolaw Oct 14, 2024
1a15452
clean hooks.lua
chronolaw Oct 14, 2024
4db1fc8
clean post_hook_delete_func in hooks.lua
chronolaw Oct 14, 2024
bbedc8e
refactor do_sync/sync_handler
chronolaw Oct 14, 2024
9dcf4f0
exchange do_sync/sync_handler
chronolaw Oct 14, 2024
cdd8e5d
clean import.lua
chronolaw Oct 14, 2024
305653c
fix post_hook_delete_func
chronolaw Oct 14, 2024
b280566
clean post_hook_delete_func
chronolaw Oct 14, 2024
51d915a
dont need ynamic-log-level-rpc.yml
chronolaw Oct 15, 2024
3df6275
update changelog entry
chronolaw Oct 15, 2024
23344c8
Fix(tests): enable incremental sync for hybrid case (#13746)
chobits Oct 16, 2024
4af810c
Revert "dont need ynamic-log-level-rpc.yml"
chronolaw Oct 16, 2024
2e6cafb
remove unused `remove_nulls` in off/init.lua
chronolaw Oct 16, 2024
7c02759
clean pdk/vault.lua
chronolaw Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/kong/cp-dp-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "Added a remote procedure call (RPC) framework for Hybrid mode deployments."
type: feature
scope: Clustering
6 changes: 6 additions & 0 deletions changelog/unreleased/kong/dynamic-log-level-rpc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message: |
Dynamic log level over Hybrid mode RPC which allows setting DP log level
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
to a different level for specified duration before reverting back
to the `kong.conf` configured value.
type: feature
scope: Clustering
9 changes: 7 additions & 2 deletions kong-3.9.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ build = {
["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua",
["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua",
["kong.clustering.tls"] = "kong/clustering/tls.lua",
["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",

["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua",
["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua",
Expand All @@ -99,6 +98,12 @@ build = {
["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua",
["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua",

["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua",
["kong.clustering.services.sync"] = "kong/clustering/services/sync/init.lua",
["kong.clustering.services.sync.rpc"] = "kong/clustering/services/sync/rpc.lua",
["kong.clustering.services.sync.hooks"] = "kong/clustering/services/sync/hooks.lua",
["kong.clustering.services.sync.strategies.postgres"] = "kong/clustering/services/sync/strategies/postgres.lua",

["kong.cluster_events"] = "kong/cluster_events/init.lua",
["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua",
["kong.cluster_events.strategies.off"] = "kong/cluster_events/strategies/off.lua",
Expand Down Expand Up @@ -289,7 +294,6 @@ build = {
["kong.db.strategies.postgres.plugins"] = "kong/db/strategies/postgres/plugins.lua",
["kong.db.strategies.off"] = "kong/db/strategies/off/init.lua",
["kong.db.strategies.off.connector"] = "kong/db/strategies/off/connector.lua",
["kong.db.strategies.off.tags"] = "kong/db/strategies/off/tags.lua",

["kong.db.migrations.state"] = "kong/db/migrations/state.lua",
["kong.db.migrations.subsystems"] = "kong/db/migrations/subsystems.lua",
Expand All @@ -316,6 +320,7 @@ build = {
["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua",
["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua",
["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua",
["kong.db.migrations.core.024_370_to_380"] = "kong/db/migrations/core/024_370_to_380.lua",
["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua",
["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua",
["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua",
Expand Down
19 changes: 15 additions & 4 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function _M.new(conf, node_id)
-- clients[node_id]: { socket1 => true, socket2 => true, ... }
clients = {},
client_capabilities = {},
client_ips = {}, -- store DP node's ip addr
node_id = node_id,
conf = conf,
cluster_cert = assert(clustering_tls.get_cluster_cert(conf)),
Expand Down Expand Up @@ -75,16 +76,18 @@ end


function _M:_remove_socket(socket)
local sockets = assert(self.clients[socket.node_id])
local node_id = socket.node_id
local sockets = assert(self.clients[node_id])

assert(sockets[socket])

sockets[socket] = nil

if table_isempty(sockets) then
self.clients[socket.node_id] = nil
self.client_capabilities[socket.node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(socket.node_id))
self.clients[node_id] = nil
self.client_ips[node_id] = nil
self.client_capabilities[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end

Expand Down Expand Up @@ -255,6 +258,9 @@ function _M:handle_websocket()
local s = socket.new(self, wb, node_id)
self:_add_socket(s, rpc_capabilities)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -362,4 +368,9 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
end


return _M
179 changes: 179 additions & 0 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
local _M = {}
local _MT = { __index = _M, }


local hooks = require("kong.hooks")
local EMPTY = require("kong.tools.table").EMPTY


local ipairs = ipairs
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG


local DEFAULT_PAGE_SIZE = 512


function _M.new(strategy)
local self = {
strategy = strategy,
}

return setmetatable(self, _MT)
end


local function get_all_nodes_with_sync_cap()
local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not use :each() here instead of simply fetching one page? It seems that this function will only ever return DEFAULT_PAGE_SIZE entries even if there are more in the database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err then
return nil, "unable to query DB " .. err
end

if not res then
return EMPTY
end

local ret = {}
local ret_n = 0

for _, row in ipairs(res) do
for _, c in ipairs(row.rpc_capabilities) do
if c == "kong.sync.v2" then
ret_n = ret_n + 1
ret[ret_n] = row.id
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
break
end
end
end

return ret
end


function _M:notify_all_nodes()
jschmid1 marked this conversation as resolved.
Show resolved Hide resolved
local latest_version, err = self.strategy:get_latest_version()
if not latest_version then
ngx_log(ngx_ERR, "can not get the latest version: ", err)
return
end

local msg = { default = { new_version = latest_version, }, }

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
local res, err = kong.rpc:call(node, "kong.sync.v2.notify_new_version", msg)
if not res then
if not err:find("requested capability does not exist", nil, true) then
chobits marked this conversation as resolved.
Show resolved Hide resolved
ngx_log(ngx_ERR, "unable to notify new version: ", err)
end

else
ngx_log(ngx_DEBUG, "notified ", node, " ", latest_version)
end
end
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
local deltas = {
{
type = name,
id = row.id,
ws_id = ws_id,
row = is_delete and ngx_null or row,
},
}

local res, err = self.strategy:insert_delta(deltas)
if not res then
self.strategy:cancel_txn()
return nil, err
end

res, err = self.strategy:commit_txn()
if not res then
self.strategy:cancel_txn()
return nil, err
end

self:notify_all_nodes()

return row -- for other hooks
end


-- only control plane has these delta operations
function _M:register_dao_hooks()
local function is_db_export(name)
local db_export = kong.db[name].schema.db_export
return db_export == nil or db_export == true
end

-- common hook functions (pre/fail/post)

local function pre_hook_func(entity, name, options)
if not is_db_export(name) then
return true
end

return self.strategy:begin_txn()
end

local function fail_hook_func(err, entity, name)
if not is_db_export(name) then
return
end

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
end
end

local function post_hook_writer_func(row, name, options, ws_id)
if not is_db_export(name) then
return row
end

return self:entity_delta_writer(row, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
end

local dao_hooks = {
-- dao:insert
["dao:insert:pre"] = pre_hook_func,
["dao:insert:fail"] = fail_hook_func,
["dao:insert:post"] = post_hook_writer_func,

-- dao:delete
["dao:delete:pre"] = pre_hook_func,
["dao:delete:fail"] = fail_hook_func,
["dao:delete:post"] = post_hook_delete_func,

-- dao:update
["dao:update:pre"] = pre_hook_func,
["dao:update:fail"] = fail_hook_func,
["dao:update:post"] = post_hook_writer_func,

-- dao:upsert
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
hooks.register_hook(ev, func)
end
end


return _M
63 changes: 63 additions & 0 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
local _M = {}
local _MT = { __index = _M, }


local events = require("kong.clustering.events")
local strategy = require("kong.clustering.services.sync.strategies.postgres")
local rpc = require("kong.clustering.services.sync.rpc")


-- TODO: what is the proper value?
local FIRST_SYNC_DELAY = 0.5 -- seconds
jschmid1 marked this conversation as resolved.
Show resolved Hide resolved
local EACH_SYNC_DELAY = 30 -- seconds


function _M.new(db, is_cp)
local strategy = strategy.new(db)

local self = {
db = db,
strategy = strategy,
rpc = rpc.new(strategy),
is_cp = is_cp,
}

-- only cp needs hooks
if is_cp then
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
self.hooks = require("kong.clustering.services.sync.hooks").new(strategy)
chobits marked this conversation as resolved.
Show resolved Hide resolved
end

return setmetatable(self, _MT)
end


function _M:init(manager)
if self.hooks then
self.hooks:register_dao_hooks()
end
self.rpc:init(manager, self.is_cp)
end


function _M:init_worker()
-- is CP, enable clustering broadcasts
if self.is_cp then
events.init()

self.strategy:init_worker()
return
end

-- is DP, sync only in worker 0
if ngx.worker.id() ~= 0 then
return
end

-- sync to CP ASAP
assert(self.rpc:sync_once(FIRST_SYNC_DELAY))

assert(self.rpc:sync_every(EACH_SYNC_DELAY))
end


return _M
Loading
Loading