-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(stream/refactoring) Upgrade BSM to api-v2 #91
base: master
Are you sure you want to change the base?
Changes from all commits
1debbb4
30cb05a
69cb151
17414d5
5cef5f5
ecb5147
84fb5bf
85b28b7
fac6bb3
68eb015
5fcca75
92641dd
92c7063
d41d326
ce03a50
6f23a77
2cd14e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,372 @@ | ||||||||||||||||||
-- | ||||||||||||||||||
-- Copyright © 2021 Centreon | ||||||||||||||||||
-- | ||||||||||||||||||
-- Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||
-- you may not use this file except in compliance with the Licensself.sc_event.event. | ||||||||||||||||||
-- You may obtain a copy of the License at | ||||||||||||||||||
-- | ||||||||||||||||||
-- http://www.apachself.sc_event.event.org/licenses/LICENSE-2.0 | ||||||||||||||||||
-- | ||||||||||||||||||
-- Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||
-- See the License for the specific language governing permissions and | ||||||||||||||||||
-- limitations under the Licensself.sc_event.event. | ||||||||||||||||||
-- | ||||||||||||||||||
-- For more information : [email protected] | ||||||||||||||||||
-- | ||||||||||||||||||
-- To work you need to provide to this script a Broker stream connector output configuration | ||||||||||||||||||
-- with the following informations: | ||||||||||||||||||
-- | ||||||||||||||||||
-- source_ci (string): Name of the transmiter, usually Centreon server name | ||||||||||||||||||
-- http_server_url (string): the full HTTP URL. Default: https://my.bsm.server:30005/bsmc/rest/events/ws-centreon/. | ||||||||||||||||||
-- http_proxy_string (string): the full proxy URL if needed to reach the BSM server. Default: empty. | ||||||||||||||||||
-- log_path (string): the log file to use | ||||||||||||||||||
-- log_level (number): the log level (0, 1, 2, 3) where 3 is the maximum level. 0 logs almost nothing. 1 logs only the beginning of the script and errors. 2 logs a reasonable amount of verbosself.sc_event.event. 3 logs almost everything possible, to be used only for debug. Recommended value in production: 1. | ||||||||||||||||||
-- max_buffer_size (number): how many events to store before sending them to the server. | ||||||||||||||||||
-- max_buffer_age (number): flush the events when the specified time (in second) is reached (even if max_size is not reached). | ||||||||||||||||||
|
||||||||||||||||||
-- Libraries | ||||||||||||||||||
local curl = require "cURL" | ||||||||||||||||||
|
||||||||||||||||||
-- Centreon lua core libraries | ||||||||||||||||||
local sc_common = require("centreon-stream-connectors-lib.sc_common") | ||||||||||||||||||
local sc_logger = require("centreon-stream-connectors-lib.sc_logger") | ||||||||||||||||||
local sc_broker = require("centreon-stream-connectors-lib.sc_broker") | ||||||||||||||||||
local sc_event = require("centreon-stream-connectors-lib.sc_event") | ||||||||||||||||||
local sc_params = require("centreon-stream-connectors-lib.sc_params") | ||||||||||||||||||
local sc_macros = require("centreon-stream-connectors-lib.sc_macros") | ||||||||||||||||||
local sc_flush = require("centreon-stream-connectors-lib.sc_flush") | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
-- EventQueue class | ||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
|
||||||||||||||||||
local EventQueue = {} | ||||||||||||||||||
EventQueue.__index = EventQueue | ||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
-- Constructor | ||||||||||||||||||
-- @param conf The table given by the init() function and returned from the GUI | ||||||||||||||||||
-- @return the new EventQueue | ||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
|
||||||||||||||||||
function EventQueue.new(params) | ||||||||||||||||||
local self = {} | ||||||||||||||||||
self.fail = false | ||||||||||||||||||
|
||||||||||||||||||
local mandatory_parameters = { | ||||||||||||||||||
"http_server_url" | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
-- set up log configuration | ||||||||||||||||||
local logfile = params.logfile or "/var/log/centreon-broker/bsm_event-apiv2.log" | ||||||||||||||||||
local log_level = params.log_level or 1 | ||||||||||||||||||
|
||||||||||||||||||
-- initiate mandatory objects | ||||||||||||||||||
self.sc_logger = sc_logger.new(logfile, log_level) | ||||||||||||||||||
self.sc_common = sc_common.new(self.sc_logger) | ||||||||||||||||||
self.sc_broker = sc_broker.new(self.sc_logger) | ||||||||||||||||||
self.sc_params = sc_params.new(self.sc_common, self.sc_logger) | ||||||||||||||||||
|
||||||||||||||||||
-- checking mandatory parameters and setting a fail flag | ||||||||||||||||||
if not self.sc_params:is_mandatory_config_set(mandatory_parameters, params) then | ||||||||||||||||||
self.fail = true | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- overriding default parameters for this stream connector if the default values doesn't suit the basic needs | ||||||||||||||||||
self.sc_params.params.accepted_categories = params.accepted_categories or "neb" | ||||||||||||||||||
self.sc_params.params.accepted_elements = params.accepted_elements or "host_status,service_status" | ||||||||||||||||||
self.sc_params.params.source_ci = params.source_ci or "Centreon" | ||||||||||||||||||
self.sc_params.params.max_output_length = params.max_output_length or 1024 | ||||||||||||||||||
|
||||||||||||||||||
-- apply users params and check syntax of standard ones | ||||||||||||||||||
self.sc_params:param_override(params) | ||||||||||||||||||
self.sc_params:check_params() | ||||||||||||||||||
|
||||||||||||||||||
self.sc_macros = sc_macros.new(self.sc_params.params, self.sc_logger) | ||||||||||||||||||
self.format_template = self.sc_params:load_event_format_file(true) | ||||||||||||||||||
self.sc_params:build_accepted_elements_info() | ||||||||||||||||||
self.sc_flush = sc_flush.new(self.sc_params.params, self.sc_logger) | ||||||||||||||||||
|
||||||||||||||||||
local categories = self.sc_params.params.bbdo.categories | ||||||||||||||||||
local elements = self.sc_params.params.bbdo.elements | ||||||||||||||||||
|
||||||||||||||||||
self.format_event = { | ||||||||||||||||||
[categories.neb.id] = { | ||||||||||||||||||
[elements.host_status.id] = function () return self:format_event_host() end, | ||||||||||||||||||
[elements.service_status.id] = function () return self:format_event_service() end | ||||||||||||||||||
}, | ||||||||||||||||||
[categories.bam.id] = {} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
self.send_data_method = { | ||||||||||||||||||
[1] = function (payload) return self:send_data(payload) end | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's a metadata system for queues now
Suggested change
|
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
self.build_payload_method = { | ||||||||||||||||||
[1] = function (payload, event) return self:build_payload(payload, event) end | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
-- return EventQueue object | ||||||||||||||||||
setmetatable(self, { __index = EventQueue }) | ||||||||||||||||||
return self | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
---- EventQueue:format_event method | ||||||||||||||||||
--------------------------------------------------------------------------------- | ||||||||||||||||||
function EventQueue:format_accepted_event() | ||||||||||||||||||
local category = self.sc_event.event.category | ||||||||||||||||||
local element = self.sc_event.event.element | ||||||||||||||||||
local template = self.sc_params.params.format_template[category][element] | ||||||||||||||||||
self.sc_logger:debug("[EventQueue:format_event]: starting format event") | ||||||||||||||||||
self.sc_event.event.formated_event = {} | ||||||||||||||||||
|
||||||||||||||||||
if self.format_template and template ~= nil and template ~= "" then | ||||||||||||||||||
self.sc_event.event.formated_event = self.sc_macros:replace_sc_macro(template, self.sc_event.event, true) | ||||||||||||||||||
else | ||||||||||||||||||
-- can't format event if stream connector is not handling this kind of event and that it is not handled with a template file | ||||||||||||||||||
if not self.format_event[category][element] then | ||||||||||||||||||
self.sc_logger:error("[format_event]: You are trying to format an event with category: " | ||||||||||||||||||
.. tostring(self.sc_params.params.reverse_category_mapping[category]) .. " and element: " | ||||||||||||||||||
.. tostring(self.sc_params.params.reverse_element_mapping[category][element]) | ||||||||||||||||||
.. ". If it is a not a misconfiguration, you should create a format file to handle this kind of element") | ||||||||||||||||||
else | ||||||||||||||||||
self.format_event[category][element]() | ||||||||||||||||||
end | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
self:add() | ||||||||||||||||||
self.sc_logger:debug("[EventQueue:format_event]: event formatting is finished") | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- Format XML file with host infoamtion | ||||||||||||||||||
function EventQueue:format_event_host() | ||||||||||||||||||
local xml_host_severity = self.sc_broker:get_severity(self.sc_event.event.host_id) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed to call broker cache severity is already stored in |
||||||||||||||||||
local xml_url = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.action_url, 'no action url for this host') | ||||||||||||||||||
local xml_notes = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.notes, 'no notes found on host') | ||||||||||||||||||
|
||||||||||||||||||
if xml_host_severity == false then | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
nil and false are not the same, both can be handled with "if not" |
||||||||||||||||||
xml_host_severity = 0 | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
self.sc_event.event.formated_event = { | ||||||||||||||||||
hostname = self.sc_event.event.cache.host.name, | ||||||||||||||||||
host_severity = xml_host_severity, | ||||||||||||||||||
host_notes = xml_notes, | ||||||||||||||||||
url = xml_url, | ||||||||||||||||||
source_ci = self.sc_common:ifnil_or_empty(self.source_ci, 'Centreon'), | ||||||||||||||||||
source_host_id = self.sc_common:ifnil_or_empty(self.sc_event.event.host_id, 0), | ||||||||||||||||||
scheduled_downtime_depth = self.sc_common:ifnil_or_empty(self.sc_event.event.scheduled_downtime_depth, 0) | ||||||||||||||||||
} | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- Format XML file with service infoamtion | ||||||||||||||||||
function EventQueue:format_event_service() | ||||||||||||||||||
local xml_url = self.sc_common:ifnil_or_empty(self.sc_event.event.cache.host.notes_url, 'no url for this service') | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not needed to call broker cache severity is already stored in self.sc_event.event.cache.severity.service |
||||||||||||||||||
local xml_service_severity = self.sc_broker:get_severity(self.sc_event.event.host_id, self.sc_event.event.service_id) | ||||||||||||||||||
|
||||||||||||||||||
if xml_service_severity == false then | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
xml_service_severity = 0 | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
self.sc_event.event.formated_event = { | ||||||||||||||||||
hostname = self.sc_event.event.cache.host.name, | ||||||||||||||||||
svc_desc = self.sc_event.event.cache.service.description, | ||||||||||||||||||
state = self.sc_event.event.state, | ||||||||||||||||||
last_update = self.sc_event.event.last_update, | ||||||||||||||||||
output = string.match(self.sc_event.event.output, "^(.*)\n"), | ||||||||||||||||||
service_severity = xml_service_severity, | ||||||||||||||||||
url = xml_url, | ||||||||||||||||||
source_host_id = self.sc_common:ifnil_or_empty(self.sc_event.event.host_id, 0), | ||||||||||||||||||
source_svc_id = self.sc_common:ifnil_or_empty(self.sc_event.event.service_id, 0), | ||||||||||||||||||
scheduled_downtime_depth = self.sc_common:ifnil_or_empty(self.sc_event.event.scheduled_downtime_depth, 0) | ||||||||||||||||||
} | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
-- EventQueue:add method | ||||||||||||||||||
-- @param e An event | ||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
|
||||||||||||||||||
function EventQueue:add() | ||||||||||||||||||
-- store event in self.events lists | ||||||||||||||||||
local category = self.sc_event.event.category | ||||||||||||||||||
local element = self.sc_event.event.element | ||||||||||||||||||
|
||||||||||||||||||
self.sc_logger:debug("[EventQueue:add]: add event in queue category: " .. tostring(self.sc_params.params.reverse_category_mapping[category]) | ||||||||||||||||||
.. " element: " .. tostring(self.sc_params.params.reverse_element_mapping[category][element])) | ||||||||||||||||||
|
||||||||||||||||||
self.sc_logger:debug("[EventQueue:add]: queue size before adding event: " .. tostring(#self.sc_flush.queues[category][element].events)) | ||||||||||||||||||
self.sc_flush.queues[category][element].events[#self.sc_flush.queues[category][element].events + 1] = self.sc_event.event.formated_event | ||||||||||||||||||
|
||||||||||||||||||
self.sc_logger:info("[EventQueue:add]: queue size is now: " .. tostring(#self.sc_flush.queues[category][element].events) | ||||||||||||||||||
.. "max is: " .. tostring(self.sc_params.params.max_buffer_size)) | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
-- EventQueue:build_payload, concatenate data so it is ready to be sent | ||||||||||||||||||
-- @param payload {string} json encoded string | ||||||||||||||||||
-- @param event {table} the event that is going to be added to the payload | ||||||||||||||||||
-- @return payload {string} json encoded string | ||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
function EventQueue:build_payload(payload, event) | ||||||||||||||||||
if not payload then | ||||||||||||||||||
payload = "<event_data>" | ||||||||||||||||||
for index, xml_str in pairs(event) do | ||||||||||||||||||
payload = payload .. "<" .. tostring(index) .. ">" .. tostring(self.sc_common:xml_escape(xml_str)) .. "</" .. tostring(index) .. ">" | ||||||||||||||||||
end | ||||||||||||||||||
payload = payload .. "</event_data>" | ||||||||||||||||||
|
||||||||||||||||||
else | ||||||||||||||||||
payload = payload .. "<event_data>" | ||||||||||||||||||
for index, xml_str in pairs(event) do | ||||||||||||||||||
payload = payload .. "<" .. tostring(index) .. ">" .. tostring(self.sc_common:xml_escape(xml_str)) .. "</" .. tostring(index) .. ">" | ||||||||||||||||||
end | ||||||||||||||||||
payload = payload .. "</event_data>" | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
return payload | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
function EventQueue:send_data(payload) | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
need to add the metadata with the new system |
||||||||||||||||||
self.sc_logger:debug("[EventQueue:send_data]: Starting to send data") | ||||||||||||||||||
|
||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. before the send _data_test condition, there is a whole new system to create the curl command. in this case it would look like queue_metadata.headers = {
"Content-Type: text/xml",
"content-length: " .. string.len(payload)
}
self.sc_logger:log_curl_command(self.sc_params.params.http_server_url, queue_metadata, self.sc_params.params, payload) |
||||||||||||||||||
-- write payload in the logfile for test purpose | ||||||||||||||||||
if self.sc_params.params.send_data_test == 1 then | ||||||||||||||||||
self.sc_logger:notice("[send_data]: " .. tostring(payload)) | ||||||||||||||||||
return true | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
self.sc_logger:info("[EventQueue:send_data]: Going to send the following xml " .. tostring(payload)) | ||||||||||||||||||
self.sc_logger:info("[EventQueue:send_data]: BSM Http Server URL is: \"" .. tostring(self.sc_params.params.http_server_url .. "\"")) | ||||||||||||||||||
|
||||||||||||||||||
local http_response_body = "" | ||||||||||||||||||
local http_request = curl.easy() | ||||||||||||||||||
:setopt_url(self.sc_params.params.http_server_url) | ||||||||||||||||||
:setopt_writefunction( | ||||||||||||||||||
function (response) | ||||||||||||||||||
http_response_body = http_response_body .. tostring(response) | ||||||||||||||||||
end | ||||||||||||||||||
) | ||||||||||||||||||
:setopt(curl.OPT_TIMEOUT, self.sc_params.params.connection_timeout) | ||||||||||||||||||
:setopt(curl.OPT_SSL_VERIFYPEER, self.sc_params.params.allow_insecure_connection) | ||||||||||||||||||
:setopt( | ||||||||||||||||||
curl.OPT_HTTPHEADER, | ||||||||||||||||||
{ | ||||||||||||||||||
"Content-Type: text/xml", | ||||||||||||||||||
"content-length: " .. string.len(payload) | ||||||||||||||||||
} | ||||||||||||||||||
) | ||||||||||||||||||
Comment on lines
+256
to
+262
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
with the log_curl_command system, headers are already built before. Therefore, we can simplify this line |
||||||||||||||||||
|
||||||||||||||||||
-- set proxy address configuration | ||||||||||||||||||
if (self.sc_params.params.proxy_address ~= '') then | ||||||||||||||||||
if (self.sc_params.params.proxy_port ~= '') then | ||||||||||||||||||
http_request:setopt(curl.OPT_PROXY, self.sc_params.params.proxy_address .. ':' .. self.sc_params.params.proxy_port) | ||||||||||||||||||
else | ||||||||||||||||||
self.sc_logger:error("[EventQueue:send_data]: proxy_port parameter is not set but proxy_address is used") | ||||||||||||||||||
end | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- set proxy user configuration | ||||||||||||||||||
if (self.sc_params.params.proxy_username ~= '') then | ||||||||||||||||||
if (self.sc_params.params.proxy_password ~= '') then | ||||||||||||||||||
http_request:setopt(curl.OPT_PROXYUSERPWD, self.sc_params.params.proxy_username .. ':' .. self.sc_params.params.proxy_password) | ||||||||||||||||||
else | ||||||||||||||||||
self.sc_logger:error("[EventQueue:send_data]: proxy_password parameter is not set but proxy_username is used") | ||||||||||||||||||
end | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- adding the HTTP POST data | ||||||||||||||||||
http_request:setopt_postfields(payload) | ||||||||||||||||||
-- performing the HTTP request | ||||||||||||||||||
http_request:perform() | ||||||||||||||||||
-- collecting results | ||||||||||||||||||
http_response_code = http_request:getinfo(curl.INFO_RESPONSE_CODE) | ||||||||||||||||||
http_request:close() | ||||||||||||||||||
|
||||||||||||||||||
-- Handling the return code | ||||||||||||||||||
local retval = false | ||||||||||||||||||
if http_response_code == 202 or http_response_code == 200 then | ||||||||||||||||||
self.sc_logger:info("[EventQueue:send_data]: HTTP POST request successful: return code is " .. tostring(http_response_code)) | ||||||||||||||||||
retval = true | ||||||||||||||||||
else | ||||||||||||||||||
self.sc_logger:error("[EventQueue:send_data]: HTTP POST request FAILED, return code is " .. tostring(http_response_code) .. ". Message is: " .. tostring(http_response_body)) | ||||||||||||||||||
end | ||||||||||||||||||
return retval | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
-- Required functions for Broker StreamConnector | ||||||||||||||||||
-------------------------------------------------------------------------------- | ||||||||||||||||||
|
||||||||||||||||||
local queue | ||||||||||||||||||
|
||||||||||||||||||
-- Fonction init() | ||||||||||||||||||
function init(conf) | ||||||||||||||||||
queue = EventQueue.new(conf) | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- Fonction write() | ||||||||||||||||||
function write(event) | ||||||||||||||||||
-- skip event if a mandatory parameter is missing | ||||||||||||||||||
if queue.fail then | ||||||||||||||||||
queue.sc_logger:error("Skipping event because a mandatory parameter is not set") | ||||||||||||||||||
return false | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- initiate event object | ||||||||||||||||||
queue.sc_event = sc_event.new(event, queue.sc_params.params, queue.sc_common, queue.sc_logger, queue.sc_broker) | ||||||||||||||||||
|
||||||||||||||||||
if queue.sc_event:is_valid_category() then | ||||||||||||||||||
if queue.sc_event:is_valid_element() then | ||||||||||||||||||
-- format event if it is validated | ||||||||||||||||||
if queue.sc_event:is_valid_event() then | ||||||||||||||||||
queue:format_accepted_event() | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
--- log why the event has been dropped | ||||||||||||||||||
else | ||||||||||||||||||
queue.sc_logger:debug("dropping event because element is not valid. Event element is: " | ||||||||||||||||||
.. tostring(queue.sc_params.params.reverse_element_mapping[queue.sc_event.event.category][queue.sc_event.event.element])) | ||||||||||||||||||
end | ||||||||||||||||||
else | ||||||||||||||||||
queue.sc_logger:debug("dropping event because category is not valid. Event category is: " | ||||||||||||||||||
.. tostring(queue.sc_params.params.reverse_category_mapping[queue.sc_event.event.category])) | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
return flush() | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- flush method is called by broker every now and then (more often when broker has nothing else to do) | ||||||||||||||||||
function flush() | ||||||||||||||||||
local queues_size = queue.sc_flush:get_queues_size() | ||||||||||||||||||
|
||||||||||||||||||
-- nothing to flush | ||||||||||||||||||
if queues_size == 0 then | ||||||||||||||||||
return true | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- flush all queues because last global flush is too old | ||||||||||||||||||
if queue.sc_flush.last_global_flush < os.time() - queue.sc_params.params.max_all_queues_age then | ||||||||||||||||||
if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then | ||||||||||||||||||
return false | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
return true | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- flush queues because too many events are stored in them | ||||||||||||||||||
if queues_size > queue.sc_params.params.max_buffer_size then | ||||||||||||||||||
if not queue.sc_flush:flush_all_queues(queue.build_payload_method[1], queue.send_data_method[1]) then | ||||||||||||||||||
return false | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
return true | ||||||||||||||||||
end | ||||||||||||||||||
|
||||||||||||||||||
-- there are events in the queue but they were not ready to be send | ||||||||||||||||||
return false | ||||||||||||||||||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in today's world, we have a custom code feature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/centreon/centreon-stream-connector-scripts/blob/develop/modules/docs/custom_code.md