From 9dece0e7961ba8be689926616d5b95fdc8e77f6e Mon Sep 17 00:00:00 2001 From: Amit Prinz Setter Date: Wed, 30 Oct 2024 20:17:13 +0200 Subject: [PATCH] Bucket Notifications (#8337) Bucket Notifications - phase 1 Signed-off-by: Amit Prinz Setter --- config.js | 6 + package.json | 1 + src/api/bucket_api.js | 59 +++ src/api/common_api.js | 40 ++ src/api/object_api.js | 3 + src/cmd/manage_nsfs.js | 24 +- src/cmd/nsfs.js | 11 +- src/endpoint/endpoint.js | 14 +- src/endpoint/s3/ops/s3_delete_object.js | 2 + .../s3/ops/s3_get_bucket_notification.js | 21 +- src/endpoint/s3/ops/s3_post_object_restore.js | 7 +- .../s3/ops/s3_put_bucket_notification.js | 24 +- src/endpoint/s3/ops/s3_put_object.js | 7 + src/endpoint/s3/s3_bucket_logging.js | 64 ++- src/manage_nsfs/manage_nsfs_cli_errors.js | 2 +- src/manage_nsfs/manage_nsfs_constants.js | 11 +- src/manage_nsfs/manage_nsfs_validations.js | 4 +- src/sdk/bucketspace_fs.js | 27 ++ src/sdk/bucketspace_nb.js | 17 + src/sdk/config_fs.js | 2 + src/sdk/namespace_fs.js | 8 +- src/sdk/nb.d.ts | 3 + src/sdk/object_sdk.js | 17 + src/server/bg_workers.js | 10 + src/server/object_services/object_server.js | 2 + src/server/system_services/bucket_server.js | 29 ++ .../system_services/schemas/bucket_schema.js | 6 + .../schemas/nsfs_bucket_schema.js | 6 + src/util/notifications_util.js | 433 ++++++++++++++++++ 29 files changed, 821 insertions(+), 39 deletions(-) create mode 100644 src/util/notifications_util.js diff --git a/config.js b/config.js index c4761da129..f404044cce 100644 --- a/config.js +++ b/config.js @@ -674,6 +674,12 @@ config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH; config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging'; config.BUCKET_LOG_CONCURRENCY = 10; +//////////////////////////////// +// NOTIFICATIONS // +//////////////////////////////// +config.NOTIFICATION_LOG_NS = 'notification_logging'; +config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR; + /////////////////////////// // KEY ROTATOR // /////////////////////////// diff --git a/package.json b/package.json index ce66de78db..184964c13c 100644 --- a/package.json +++ b/package.json @@ -108,6 +108,7 @@ "nan": "2.20.0", "ncp": "2.0.0", "node-addon-api": "8.1.0", + "node-rdkafka": "3.0.1", "performance-now": "2.1.0", "pg": "8.13.0", "ping": "0.4.4", diff --git a/src/api/bucket_api.js b/src/api/bucket_api.js index 7c8fe09b15..2a2088b9fc 100644 --- a/src/api/bucket_api.js +++ b/src/api/bucket_api.js @@ -300,6 +300,59 @@ module.exports = { } }, + get_bucket_notification: { + method: 'GET', + params: { + type: 'object', + required: [ + 'name' + ], + properties: { + name: { $ref: 'common_api#/definitions/bucket_name' }, + } + }, + reply: { + type: 'object', + required: [ + 'notifications' + ], + properties: { + notifications: { + type: 'array', + items: { + $ref: 'common_api#/definitions/bucket_notification' + } + } + } + }, + auth: { + system: ['admin', 'user'] + } + }, + + put_bucket_notification: { + method: 'PUT', + params: { + type: 'object', + required: [ + 'notifications', + 'name', + ], + properties: { + name: { $ref: 'common_api#/definitions/bucket_name' }, + notifications: { + type: 'array', + items: { + $ref: 'common_api#/definitions/bucket_notification' + } + } + } + }, + auth: { + system: ['admin', 'user'], + } + }, + read_bucket_sdk_info: { method: 'GET', params: { @@ -1130,6 +1183,12 @@ module.exports = { bucket_info: { $ref: '#/definitions/bucket_info' }, + notifications: { + type: 'array', + items: { + $ref: 'common_api#/definitions/bucket_notification' + } + } } }, diff --git a/src/api/common_api.js b/src/api/common_api.js index 3bec3e53b1..3a6641ed77 100644 --- a/src/api/common_api.js +++ b/src/api/common_api.js @@ -1377,6 +1377,46 @@ module.exports = { type: 'string', }, } + }, + bucket_notification: { + type: 'object', + required: ['Id', 'Connect'], + properties: { + Id: { + type: 'string' + }, + Connect: { + type: 'string' + }, + Events: { + type: 'array', + items: { + type: 'string', + enum: [ + 's3:TestEvent', + 's3:ObjectCreated:*', + 's3:ObjectCreated:Put', + 's3:ObjectCreated:Post', + 's3:ObjectCreated:Copy', + 's3:ObjectCreated:CompleteMultipartUpload', + 's3:ObjectRemoved:*', + 's3:ObjectRemoved:Delete', + 's3:ObjectRemoved:DeleteMarkerCreated', + 's3:ObjectRestore:*', + 's3:ObjectRestore:Post', + 's3:ObjectRestore:Completed', + 's3:ObjectRestore:Delete', + 's3:ObjectTagging:*', + 's3:ObjectTagging:Put', + 's3:ObjectTagging:Delete', + /*We plan to support LifecycleExpiration + 's3:LifecycleExpiration:*', + 's3:LifecycleExpiration:Delete', + 's3:LifecycleExpiration:DeleteMarkerCreated',*/ + ], + } + } + } } } }; diff --git a/src/api/object_api.js b/src/api/object_api.js index 366c8d4660..aabae97bff 100644 --- a/src/api/object_api.js +++ b/src/api/object_api.js @@ -176,6 +176,7 @@ module.exports = { content_type: { type: 'string' }, content_encoding: { type: 'string' }, size: { type: 'integer' }, + seq: { type: 'integer' }, } }, auth: { system: ['admin', 'user'] } @@ -653,6 +654,7 @@ module.exports = { deleted_delete_marker: { type: 'boolean' }, created_version_id: { type: 'string' }, created_delete_marker: { type: 'boolean' }, + seq: { type: 'integer' }, } }, auth: { system: ['admin', 'user'] } @@ -690,6 +692,7 @@ module.exports = { deleted_delete_marker: { type: 'boolean' }, created_version_id: { type: 'string' }, created_delete_marker: { type: 'boolean' }, + seq: { type: 'integer' }, err_code: { type: 'string', enum: ['AccessDenied', 'InternalError'] diff --git a/src/cmd/manage_nsfs.js b/src/cmd/manage_nsfs.js index 7b24a8d048..2f5766679c 100644 --- a/src/cmd/manage_nsfs.js +++ b/src/cmd/manage_nsfs.js @@ -27,6 +27,7 @@ const { throw_cli_error, get_bucket_owner_account_by_name, is_name_update, is_access_key_update } = require('../manage_nsfs/manage_nsfs_cli_utils'); const manage_nsfs_validations = require('../manage_nsfs/manage_nsfs_validations'); const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance(); +const notifications_util = require('../util/notifications_util'); let config_fs; @@ -70,6 +71,8 @@ async function main(argv = minimist(process.argv.slice(2))) { await noobaa_cli_diagnose.manage_diagnose_operations(action, user_input, config_fs); } else if (type === TYPES.UPGRADE) { await noobaa_cli_upgrade.manage_upgrade_operations(action, user_input, config_fs); + } else if (type === TYPES.NOTIFICATION) { + await notification_management(); } else { throw_cli_error(ManageCLIError.InvalidType); } @@ -100,8 +103,9 @@ async function fetch_bucket_data(action, user_input) { should_create_underlying_storage: action === ACTIONS.ADD ? false : undefined, new_name: user_input.new_name === undefined ? undefined : String(user_input.new_name), fs_backend: user_input.fs_backend === undefined ? config.NSFS_NC_STORAGE_BACKEND : String(user_input.fs_backend), - force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag) - }; + force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag), + notifications: user_input.notifications + }; if (user_input.bucket_policy !== undefined) { if (typeof user_input.bucket_policy === 'string') { @@ -196,7 +200,7 @@ async function get_bucket_status(data) { * @param {Object} data * @returns { Promise<{ code: typeof ManageCLIResponse.BucketUpdated, detail: Object }>} */ -async function update_bucket(data) { +async function update_bucket(data, user_input) { const cur_name = data.name; const new_name = data.new_name; const name_update = is_name_update(data); @@ -204,6 +208,14 @@ async function update_bucket(data) { data = _.omit(data, cli_bucket_flags_to_remove); let parsed_bucket_data; + + if (user_input.notifications) { + //notifications are tested before they can be updated + const test_notif_err = await notifications_util.test_notifications(data); + if (test_notif_err) { + throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err); + } + } if (name_update) { parsed_bucket_data = await config_fs.create_bucket_config_file({ ...data, name: new_name }); await config_fs.delete_bucket_config_file(cur_name); @@ -271,7 +283,7 @@ async function bucket_management(action, user_input) { } else if (action === ACTIONS.STATUS) { response = await get_bucket_status(data); } else if (action === ACTIONS.UPDATE) { - response = await update_bucket(data); + response = await update_bucket(data, user_input); } else if (action === ACTIONS.DELETE) { const force = get_boolean_or_string_value(user_input.force); response = await delete_bucket(data, force); @@ -706,5 +718,9 @@ async function logging_management() { await manage_nsfs_logging.export_bucket_logging(config_fs); } +async function notification_management() { + new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files(); +} + exports.main = main; if (require.main === module) main(); diff --git a/src/cmd/nsfs.js b/src/cmd/nsfs.js index 3a2a5645d7..4b630d44b6 100644 --- a/src/cmd/nsfs.js +++ b/src/cmd/nsfs.js @@ -121,7 +121,7 @@ function print_usage() { let nsfs_config_root; class NsfsObjectSDK extends ObjectSDK { - constructor(fs_root, fs_config, account, versioning, config_root) { + constructor(fs_root, fs_config, account, versioning, config_root, nsfs_system) { // const rpc_client_hooks = new_rpc_client_hooks(); // rpc_client_hooks.account.read_account_by_access_key = async ({ access_key }) => { // if (access_key) { @@ -152,6 +152,7 @@ class NsfsObjectSDK extends ObjectSDK { this.nsfs_account = account; this.nsfs_versioning = versioning; this.nsfs_namespaces = {}; + this.nsfs_system = nsfs_system; if (!config_root) { this._get_bucket_namespace = bucket_name => this._simple_get_single_bucket_namespace(bucket_name); this.load_requesting_account = auth_req => this._simple_load_requesting_account(auth_req); @@ -239,6 +240,7 @@ class NsfsAccountSDK extends AccountSDK { } } +/* eslint-disable max-statements */ async function main(argv = minimist(process.argv.slice(2))) { try { config.DB_TYPE = 'none'; @@ -318,15 +320,16 @@ async function main(argv = minimist(process.argv.slice(2))) { nsfs_config_root, }); + let system_data; if (!simple_mode) { // Do not move this function - we need to create/update RPM changes before starting the endpoint const config_fs = new ConfigFS(nsfs_config_root); - const system_data = await config_fs.get_system_config_file({ silent_if_missing: true }); + system_data = await config_fs.get_system_config_file({ silent_if_missing: true }); if (system_data && system_data[os.hostname()]) { const nc_upgrade_manager = new NCUpgradeManager(config_fs); await nc_upgrade_manager.update_rpm_upgrade(); } else { - await config_fs.init_nc_system(); + system_data = await config_fs.init_nc_system(); } } @@ -340,7 +343,7 @@ async function main(argv = minimist(process.argv.slice(2))) { forks, nsfs_config_root, init_request_sdk: (req, res) => { - req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root); + req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root, system_data); req.account_sdk = new NsfsAccountSDK(fs_root, fs_config, account, nsfs_config_root); } }); diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 2f2cc3687c..3fda31f315 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -65,6 +65,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma * sts_sdk?: StsSDK; * virtual_hosts?: readonly string[]; * bucket_logger?: PersistentLogger; + * notification_logger?: PersistentLogger; * }} EndpointRequest */ @@ -100,6 +101,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han /* eslint-disable max-statements */ async function main(options = {}) { let bucket_logger; + let notification_logger; try { // setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039. if (config.ENDPOINT_PROCESS_TITLE) { @@ -137,6 +139,12 @@ async function main(options = {}) { poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, }); + notification_logger = config.NOTIFICATION_LOG_DIR && + new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, { + locking: 'SHARED', + poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL, + }); + process.on('warning', e => dbg.warn(e.stack)); let internal_rpc_client; @@ -174,7 +182,8 @@ async function main(options = {}) { init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io); } - const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, bucket_logger); + const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, + bucket_logger, notification_logger); const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ true); const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root); @@ -266,7 +275,7 @@ async function main(options = {}) { * @param {readonly string[]} virtual_hosts * @returns {EndpointHandler} */ -function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) { +function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger, notification_logger) { const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler; const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler; @@ -276,6 +285,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) { endpoint_utils.prepare_rest_request(req); req.virtual_hosts = virtual_hosts; if (logger) req.bucket_logger = logger; + if (notification_logger) req.notification_logger = notification_logger; init_request_sdk(req, res); if (req.url.startsWith('/2015-03-31/functions')) { return lambda_rest_handler(req, res); diff --git a/src/endpoint/s3/ops/s3_delete_object.js b/src/endpoint/s3/ops/s3_delete_object.js index 00e08af0d3..2aded1cb88 100644 --- a/src/endpoint/s3/ops/s3_delete_object.js +++ b/src/endpoint/s3/ops/s3_delete_object.js @@ -26,7 +26,9 @@ async function delete_object(req, res) { } else if (del_res.created_delete_marker) { res.setHeader('x-amz-version-id', del_res.created_version_id); res.setHeader('x-amz-delete-marker', 'true'); + req.s3_event_method = 'DeleteMarkerCreated'; } + res.seq = del_res.seq; } module.exports = { diff --git a/src/endpoint/s3/ops/s3_get_bucket_notification.js b/src/endpoint/s3/ops/s3_get_bucket_notification.js index c7d2d04253..b29c6b85c8 100644 --- a/src/endpoint/s3/ops/s3_get_bucket_notification.js +++ b/src/endpoint/s3/ops/s3_get_bucket_notification.js @@ -5,10 +5,23 @@ * http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETnotification.html */ async function get_bucket_notification(req) { - await req.object_sdk.read_bucket({ name: req.params.bucket }); - return { - NotificationConfiguration: '' - }; + + const result = await req.object_sdk.get_bucket_notification({ + bucket_name: req.params.bucket, + }); + + + const reply = result && result.length > 0 ? + { + //return result inside TopicConfiguration tag + NotificationConfiguration: { + TopicConfiguration: result + } + } : + //if there's no notification, reuturn empty NotificationConfiguration tag + { NotificationConfiguration: {} }; + + return reply; } module.exports = { diff --git a/src/endpoint/s3/ops/s3_post_object_restore.js b/src/endpoint/s3/ops/s3_post_object_restore.js index 31ff552062..e3d014aabd 100644 --- a/src/endpoint/s3/ops/s3_post_object_restore.js +++ b/src/endpoint/s3/ops/s3_post_object_restore.js @@ -18,11 +18,14 @@ async function post_object_restore(req, res) { encryption, }; - const accepted = await req.object_sdk.restore_object(params); - if (accepted) { + const restore_object_result = await req.object_sdk.restore_object(params); + if (restore_object_result.accepted) { res.statusCode = 202; + //no need to set s3_event_method, it is 'Post' by default because req.method == 'Post' } else { res.statusCode = 200; + req.s3_event_method = 'Completed'; + res.restore_object_result = restore_object_result; } } diff --git a/src/endpoint/s3/ops/s3_put_bucket_notification.js b/src/endpoint/s3/ops/s3_put_bucket_notification.js index 65073b06b3..f167782430 100644 --- a/src/endpoint/s3/ops/s3_put_bucket_notification.js +++ b/src/endpoint/s3/ops/s3_put_bucket_notification.js @@ -7,9 +7,27 @@ const S3Error = require('../s3_errors').S3Error; * http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTnotification.html */ async function put_bucket_notification(req) { - await req.object_sdk.read_bucket({ name: req.params.bucket }); - // TODO S3 put_bucket_notification not implemented - throw new S3Error(S3Error.NotImplemented); + + const topic_configuration = req.body.NotificationConfiguration?.TopicConfiguration; + if (!topic_configuration || + typeof topic_configuration !== 'object') throw new S3Error(S3Error.MalformedXML); + + + //align request aws s3api sends + for (const notif of topic_configuration) { + if (Array.isArray(notif.Id)) notif.Id = notif.Id[0]; + notif.Connect = Array.isArray(notif.Topic) ? notif.Topic[0] : notif.Topic; + notif.Events = notif.Event; + delete notif.Event; + delete notif.Topic; + } + + const reply = await req.object_sdk.put_bucket_notification({ + bucket_name: req.params.bucket, + notifications: topic_configuration + }); + + return reply; } module.exports = { diff --git a/src/endpoint/s3/ops/s3_put_object.js b/src/endpoint/s3/ops/s3_put_object.js index 6d5898f7e5..c6efd66177 100644 --- a/src/endpoint/s3/ops/s3_put_object.js +++ b/src/endpoint/s3/ops/s3_put_object.js @@ -34,6 +34,8 @@ async function put_object(req, res) { dbg.log0('PUT OBJECT', req.params.bucket, req.params.key, req.headers['x-amz-copy-source'] || '', encryption || ''); + //for copy, use correct s3_event_method. otherwise, just use default (req.method) + req.s3_event_method = copy_source ? 'Copy' : undefined; const source_stream = req.chunked_content ? s3_utils.decode_chunked_upload(req) : req; const reply = await req.object_sdk.upload_object({ @@ -76,6 +78,11 @@ async function put_object(req, res) { }; } res.setHeader('ETag', `"${reply.etag}"`); + + if (reply.seq) { + res.seq = reply.seq; + delete reply.seq; + } } diff --git a/src/endpoint/s3/s3_bucket_logging.js b/src/endpoint/s3/s3_bucket_logging.js index e643d9a386..47acf8e4c2 100644 --- a/src/endpoint/s3/s3_bucket_logging.js +++ b/src/endpoint/s3/s3_bucket_logging.js @@ -6,22 +6,62 @@ const http_utils = require('../../util/http_utils'); const dgram = require('node:dgram'); const { Buffer } = require('node:buffer'); const config = require('../../../config'); +const {compose_notification, check_notif_relevant} = require('../../util/notifications_util'); async function send_bucket_op_logs(req, res) { - if (req.params && req.params.bucket && req.op_name !== 'put_bucket') { + if (req.params && req.params.bucket && + !(req.op_name === 'put_bucket' || + req.op_name === 'put_bucket_notification' || + req.op_name === 'get_bucket_notification' + )) { + //potentially, there could be two writes to two different files. + //we want to await for all writes together, instead of serially + //so we aggregate and issue the writes only in the end + const writes_aggregate = []; + const bucket_info = await req.object_sdk.read_bucket_sdk_config_info(req.params.bucket); dbg.log2("read_bucket_sdk_config_info = ", bucket_info); if (is_bucket_logging_enabled(bucket_info)) { dbg.log2("Bucket logging is enabled for Bucket : ", req.params.bucket); - await endpoint_bucket_op_logs(req.op_name, req, res, bucket_info); + endpoint_bucket_op_logs(req.op_name, req, res, bucket_info, writes_aggregate); + } + + if (req.notification_logger && bucket_info.notifications) { + for (const notif_conf of bucket_info.notifications) { + if (check_notif_relevant(notif_conf, req)) { + const notif = { + meta: { + connect: notif_conf.Connect, + name: notif_conf.name + }, + notif: compose_notification(req, res, bucket_info, notif_conf) + }; + dbg.log1("logging notif ", notif_conf, ", notif = ", notif); + writes_aggregate.push({ + file: req.notification_logger, + buffer: JSON.stringify(notif) + }); + } + } + } + + //by now we have all possible writes, + //issue them concurrently and then await them + if (writes_aggregate.length > 0) { + const promises = []; + for (const write of writes_aggregate) { + promises.push(new Promise((resolve, reject) => { + write.file.append(write.buffer).then(resolve); + })); + } + await Promise.all(promises); } } } function is_bucket_logging_enabled(source_bucket) { - if (!source_bucket || !source_bucket.bucket_info.logging) { return false; } @@ -54,7 +94,7 @@ const create_syslog_udp_socket = (() => { })(); -async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { +function endpoint_bucket_op_logs(op_name, req, res, source_bucket, writes_aggregate) { // 1 - Get all the information to be logged in a log message. // 2 - Format it and send it to log bucket/syslog. @@ -63,14 +103,18 @@ async function endpoint_bucket_op_logs(op_name, req, res, source_bucket) { switch (config.BUCKET_LOG_TYPE) { case 'PERSISTENT': { - await req.bucket_logger.append(JSON.stringify(s3_log)); + //remember this write in writes_aggregate, + //it'll be issued later (with other potential writes) + writes_aggregate.push({ + file: req.bucket_logger, + buffer: JSON.stringify(s3_log) + }); break; } default: { send_op_logs_to_syslog(req.object_sdk.rpc_client.rpc.router.syslog, s3_log); } } - } function send_op_logs_to_syslog(syslog, s3_log) { @@ -95,17 +139,17 @@ function get_bucket_log_record(op_name, source_bucket, req, res) { status_code = res.statusCode; } const log = { - noobaa_bucket_logging: "true", op: req.method, bucket_owner: source_bucket.bucket_owner, source_bucket: req.params.bucket, object_key: req.originalUrl, - log_bucket: source_bucket.bucket_info.logging.log_bucket, - log_prefix: source_bucket.bucket_info.logging.log_prefix, remote_ip: client_ip, request_uri: req.originalUrl, http_status: status_code, - request_id: req.request_id + request_id: req.request_id, + noobaa_bucket_logging: true, + log_bucket: source_bucket.bucket_info.logging.log_bucket, + log_prefix: source_bucket.bucket_info.logging.log_prefix, }; return log; diff --git a/src/manage_nsfs/manage_nsfs_cli_errors.js b/src/manage_nsfs/manage_nsfs_cli_errors.js index ba88e9936c..19ad998768 100644 --- a/src/manage_nsfs/manage_nsfs_cli_errors.js +++ b/src/manage_nsfs/manage_nsfs_cli_errors.js @@ -94,7 +94,7 @@ ManageCLIError.InvalidArgumentType = Object.freeze({ ManageCLIError.InvalidType = Object.freeze({ code: 'InvalidType', - message: 'Invalid type, available types are account, bucket, logging, whitelist or upgrade', + message: 'Invalid type, available types are account, bucket, logging, whitelist, upgrade or notification', http_code: 400, }); diff --git a/src/manage_nsfs/manage_nsfs_constants.js b/src/manage_nsfs/manage_nsfs_constants.js index e39134422d..4e4ae5605a 100644 --- a/src/manage_nsfs/manage_nsfs_constants.js +++ b/src/manage_nsfs/manage_nsfs_constants.js @@ -8,7 +8,8 @@ const TYPES = Object.freeze({ GLACIER: 'glacier', LOGGING: 'logging', DIAGNOSE: 'diagnose', - UPGRADE: 'upgrade' + UPGRADE: 'upgrade', + NOTIFICATION: 'notification' }); const ACTIONS = Object.freeze({ @@ -58,8 +59,8 @@ const VALID_OPTIONS_ANONYMOUS_ACCOUNT = { }; const VALID_OPTIONS_BUCKET = { - 'add': new Set(['name', 'owner', 'path', 'bucket_policy', 'fs_backend', 'force_md5_etag', FROM_FILE, ...CLI_MUTUAL_OPTIONS]), - 'update': new Set(['name', 'owner', 'path', 'bucket_policy', 'fs_backend', 'new_name', 'force_md5_etag', ...CLI_MUTUAL_OPTIONS]), + 'add': new Set(['name', 'owner', 'path', 'bucket_policy', 'fs_backend', 'force_md5_etag', 'notifications', FROM_FILE, ...CLI_MUTUAL_OPTIONS]), + 'update': new Set(['name', 'owner', 'path', 'bucket_policy', 'fs_backend', 'new_name', 'force_md5_etag', 'notifications', ...CLI_MUTUAL_OPTIONS]), 'delete': new Set(['name', 'force', ...CLI_MUTUAL_OPTIONS]), 'list': new Set(['wide', 'name', ...CLI_MUTUAL_OPTIONS]), 'status': new Set(['name', ...CLI_MUTUAL_OPTIONS]), @@ -134,7 +135,9 @@ const OPTION_TYPE = { expected_version: 'string', expected_hosts: 'string', custom_upgrade_scripts_dir: 'string', - skip_verification: 'boolean' + skip_verification: 'boolean', + //notifications + notifications: 'object' }; const BOOLEAN_STRING_VALUES = ['true', 'false']; diff --git a/src/manage_nsfs/manage_nsfs_validations.js b/src/manage_nsfs/manage_nsfs_validations.js index 5f01df6d71..af837a44de 100644 --- a/src/manage_nsfs/manage_nsfs_validations.js +++ b/src/manage_nsfs/manage_nsfs_validations.js @@ -171,8 +171,8 @@ function validate_options_type_by_value(input_options_with_data) { if (BOOLEAN_STRING_OPTIONS.has(option) && validate_boolean_string_value(value)) { continue; } - // special case for bucket_policy (from_file) - if (option === 'bucket_policy' && type_of_value === 'object') { + // special case for bucket_policy and notifications(from_file) + if ((option === 'bucket_policy' || option === 'notifications') && type_of_value === 'object') { continue; } const details = `type of flag ${option} should be ${type_of_option}`; diff --git a/src/sdk/bucketspace_fs.js b/src/sdk/bucketspace_fs.js index 4a2a6e0eac..82433eb03a 100644 --- a/src/sdk/bucketspace_fs.js +++ b/src/sdk/bucketspace_fs.js @@ -645,6 +645,33 @@ class BucketSpaceFS extends BucketSpaceSimpleFS { } } + ///////////////////////// + // BUCKET NOTIFICATION // + ///////////////////////// + + async put_bucket_notification(params) { + try { + const { bucket_name, notifications } = params; + dbg.log0('BucketSpaceFS.put_bucket_notification: Bucket name', bucket_name, ", notifications ", notifications); + const bucket = await this.config_fs.get_bucket_by_name(bucket_name); + bucket.notifications = notifications; + await this.config_fs.update_bucket_config_file(bucket); + } catch (error) { + throw translate_error_codes(error, entity_enum.BUCKET); + } + } + + async get_bucket_notification(params) { + try { + const { bucket_name } = params; + dbg.log0('BucketSpaceFS.get_bucket_notification: Bucket name', bucket_name); + const bucket = await this.config_fs.get_bucket_by_name(bucket_name); + return { notifications: bucket.notifications || [] }; + } catch (error) { + throw translate_error_codes(error, entity_enum.BUCKET); + } + } + ///////////////////////// // DEFAULT OBJECT LOCK // ///////////////////////// diff --git a/src/sdk/bucketspace_nb.js b/src/sdk/bucketspace_nb.js index 4e1038fcd7..69c918fe63 100644 --- a/src/sdk/bucketspace_nb.js +++ b/src/sdk/bucketspace_nb.js @@ -229,6 +229,23 @@ class BucketSpaceNB { }); } + ///////////////////////// + // BUCKET NOTIFICATION // + ///////////////////////// + + async put_bucket_notification(params) { + return this.rpc_client.bucket.put_bucket_notification({ + name: params.bucket_name, + notifications: params.notifications + }); + } + + async get_bucket_notification(params) { + return this.rpc_client.bucket.get_bucket_notification({ + name: params.bucket_name + }); + } + ///////////////////////// // DEFAULT OBJECT LOCK // ///////////////////////// diff --git a/src/sdk/config_fs.js b/src/sdk/config_fs.js index 29f5ce2fcf..4868afe4d5 100644 --- a/src/sdk/config_fs.js +++ b/src/sdk/config_fs.js @@ -1048,12 +1048,14 @@ class ConfigFS { updated_system_json = this._get_new_system_json_data(); await this.create_system_config_file(JSON.stringify(updated_system_json)); dbg.log0('created NC system data with version: ', pkg.version); + return updated_system_json; } else { if (updated_system_json[hostname]?.current_version) return; const new_host_data = this._get_new_hostname_data(); updated_system_json = { ...updated_system_json, new_host_data }; await this.update_system_config_file(JSON.stringify(updated_system_json)); dbg.log0('updated NC system data with version: ', pkg.version); + return updated_system_json; } } catch (err) { const msg = 'failed to create/update NC system data due to - ' + err.message; diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index 575934f02f..91b7bc7627 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -2150,7 +2150,7 @@ class NamespaceFS { * - XATTR_RESTORE_EXPIRY * @param {*} params * @param {nb.ObjectSDK} object_sdk - * @returns {Promise} + * @returns {Promise} */ async restore_object(params, object_sdk) { dbg.log0('namespace_fs.restore_object:', params); @@ -2187,7 +2187,7 @@ class NamespaceFS { }); // Should result in HTTP: 202 Accepted - return true; + return {accepted: true}; } if (restore_status.state === GlacierBackend.RESTORE_STATUS_ONGOING) { @@ -2207,7 +2207,9 @@ class NamespaceFS { }); // Should result in HTTP: 200 OK - return false; + return {accepted: false, + expires_on, + storage_class: s3_utils.STORAGE_CLASS_GLACIER}; } } catch (error) { dbg.error('namespace_fs.restore_object: failed with error: ', error, file_path); diff --git a/src/sdk/nb.d.ts b/src/sdk/nb.d.ts index 50d13b8c06..84290a5c22 100644 --- a/src/sdk/nb.d.ts +++ b/src/sdk/nb.d.ts @@ -852,6 +852,9 @@ interface BucketSpace { delete_bucket_policy(params: object): Promise; get_bucket_policy(params: object, object_sdk: ObjectSDK): Promise; + put_bucket_notification(params: object): Promise; + get_bucket_notification(params: object): Promise; + get_object_lock_configuration(params: object, object_sdk: ObjectSDK): Promise; put_object_lock_configuration(params: object, object_sdk: ObjectSDK): Promise; diff --git a/src/sdk/object_sdk.js b/src/sdk/object_sdk.js index 03189af0e2..3f350ba532 100644 --- a/src/sdk/object_sdk.js +++ b/src/sdk/object_sdk.js @@ -1064,6 +1064,23 @@ class ObjectSDK { }); } } + + ///////////////////////// + // BUCKET NOTIFICATION // + ///////////////////////// + + async put_bucket_notification(params) { + const bs = this._get_bucketspace(); + const res = bs.put_bucket_notification(params); + bucket_namespace_cache.invalidate_key(params.bucket_name); + return res; + } + + async get_bucket_notification(params) { + const { bucket } = await bucket_namespace_cache.get_with_cache({ sdk: this, name: params.bucket_name }); + return bucket.notifications; + } + //////////////////// // OBJECT LOCK // //////////////////// diff --git a/src/server/bg_workers.js b/src/server/bg_workers.js index ca046a6a65..4cb3dacfeb 100644 --- a/src/server/bg_workers.js +++ b/src/server/bg_workers.js @@ -41,6 +41,7 @@ const db_cleaner = require('./bg_services/db_cleaner'); const { KeyRotator } = require('./bg_services/key_rotator'); const prom_reporting = require('./analytic_services/prometheus_reporting'); const { TieringTTLWorker } = require('./bg_services/tier_ttl_worker'); +const { Notificator } = require('../util/notifications_util'); const MASTER_BG_WORKERS = [ 'scrubber', @@ -236,6 +237,15 @@ function run_master_workers() { client: server_rpc.client })); } + + if (config.NOTIFICATION_LOG_DIR) { + register_bg_worker(new Notificator({ + name: 'Notificator', + client: server_rpc.client, + })); + } else { + dbg.warn('NOTIFICATIONS NOT ENABLED'); + } } async function main() { diff --git a/src/server/object_services/object_server.js b/src/server/object_services/object_server.js index 4c4009c50b..f5418324f3 100644 --- a/src/server/object_services/object_server.js +++ b/src/server/object_services/object_server.js @@ -462,6 +462,7 @@ async function complete_object_upload(req) { encryption: obj.encryption, size: set_updates.size, content_type: obj.content_type, + seq: set_updates.version_seq, }; } @@ -881,6 +882,7 @@ async function delete_object(req) { if (obj) { dbg.log1(`${obj.key} was deleted by ${req.account && req.account.email.unwrap()}`); } + reply.seq = await MDStore.instance().alloc_object_version_seq(); return reply; } diff --git a/src/server/system_services/bucket_server.js b/src/server/system_services/bucket_server.js index 01738d5d07..7898f15740 100644 --- a/src/server/system_services/bucket_server.js +++ b/src/server/system_services/bucket_server.js @@ -548,6 +548,32 @@ async function delete_bucket_encryption(req) { }); } +/** + * + * NOTIFICATIONS + * + */ +async function put_bucket_notification(req) { + dbg.log0('put_bucket_notification:', req.rpc_params); + const bucket = find_bucket(req); + await system_store.make_changes({ + update: { + buckets: [{ + _id: bucket._id, + notifications: req.rpc_params.notifications + }] + } + }); +} + + +async function get_bucket_notification(req) { + dbg.log0('get_bucket_notification:', req.rpc_params); + const bucket = find_bucket(req); + return { + notifications: bucket.notifications ? bucket.notifications : [], + }; +} /** * @@ -612,6 +638,7 @@ async function read_bucket_sdk_info(req) { unused_refresh_tiering_alloc: bucket.tiering && node_allocator.refresh_tiering_alloc(bucket.tiering), }) .then(get_bucket_info), + notifications: bucket.notifications, }; if (bucket.namespace) { @@ -2066,6 +2093,8 @@ exports.get_bucket_website = get_bucket_website; exports.delete_bucket_policy = delete_bucket_policy; exports.put_bucket_policy = put_bucket_policy; exports.get_bucket_policy = get_bucket_policy; +exports.put_bucket_notification = put_bucket_notification; +exports.get_bucket_notification = get_bucket_notification; exports.update_all_buckets_default_pool = update_all_buckets_default_pool; diff --git a/src/server/system_services/schemas/bucket_schema.js b/src/server/system_services/schemas/bucket_schema.js index 9e77fe6d8f..4b1c1168d1 100644 --- a/src/server/system_services/schemas/bucket_schema.js +++ b/src/server/system_services/schemas/bucket_schema.js @@ -275,5 +275,11 @@ module.exports = { logging: { $ref: 'common_api#/definitions/bucket_logging', }, + notifications: { + type: 'array', + items: { + $ref: 'common_api#/definitions/bucket_notification' + } + }, } }; diff --git a/src/server/system_services/schemas/nsfs_bucket_schema.js b/src/server/system_services/schemas/nsfs_bucket_schema.js index c059d93eb6..5a38368fa4 100644 --- a/src/server/system_services/schemas/nsfs_bucket_schema.js +++ b/src/server/system_services/schemas/nsfs_bucket_schema.js @@ -78,5 +78,11 @@ module.exports = { lifecycle_configuration_rules: { $ref: 'common_api#/definitions/bucket_lifecycle_configuration', }, + notifications: { + type: 'array', + items: { + $ref: 'common_api#/definitions/bucket_notification' + } + }, } }; diff --git a/src/util/notifications_util.js b/src/util/notifications_util.js new file mode 100644 index 0000000000..af061a6f1f --- /dev/null +++ b/src/util/notifications_util.js @@ -0,0 +1,433 @@ +/* Copyright (C) 2024 NooBaa */ +'use strict'; + +const dbg = require('../util/debug_module')(__filename); +const config = require('../../config'); +const { PersistentLogger, LogFile } = require('../util/persistent_logger'); +const Kafka = require('node-rdkafka'); +const os = require('os'); +const fs = require('fs'); +const http = require('http'); +const https = require('https'); +const { get_process_fs_context } = require('./native_fs_utils'); +const nb_native = require('../util/nb_native'); +const http_utils = require('../util/http_utils'); + +const OP_TO_EVENT = Object.freeze({ + put_object: { name: 'ObjectCreated' }, + post_object: { name: 'ObjectCreated' }, + post_object_uploadId: { name: 'ObjectCreated', method: 'CompleteMultipartUpload' }, + delete_object: { name: 'ObjectRemoved' }, + post_object_restore: { name: 'ObjectRestore' }, + put_object_acl: { name: 'ObjectAcl' }, + put_object_tagging: { name: 'ObjectTagging' }, + delete_object_tagging: { name: 'ObjectTagging' }, +}); + +class Notificator { + + /** + * + * @param {Object} options + */ + + constructor({name, fs_context}) { + this.name = name; + this.connect_str_to_connection = new Map(); + this.notif_to_connect = new Map(); + this.fs_context = fs_context ?? get_process_fs_context(); + } + + async run_batch() { + if (!this._can_run()) return; + try { + await this.process_notification_files(); + } catch (err) { + dbg.error('Notificator failure:', err); + } + + return 100; //TODO + } + + _can_run() { + //requiring system_store takes about 2 seconds for the first time + //this time is unnecesarily added to manage_nsfs runtime + //so I'm trying to reduce places were system store is required to minimum + const system_store = require('../server/system_services/system_store').get_instance(); + const system_utils = require('../server/utils/system_utils'); + + if (!system_store.is_finished_initial_load) { + dbg.log0('system_store did not finish initial load'); + return false; + } + const system = system_store.data.systems[0]; + if (!system || system_utils.system_in_maintenance(system._id)) return false; + + return true; + } + + /** + * This function will process the persistent log of bucket logging + * and will send its notifications + */ + async process_notification_files() { + const seen_nodes = new Set(); + const entries = await nb_native().fs.readdir(this.fs_context, config.NOTIFICATION_LOG_DIR); + for (const entry of entries) { + if (!entry.name.endsWith('.log')) continue; + //get namespace + const namepsace_index = entry.name.indexOf(config.NOTIFICATION_LOG_NS); + if (namepsace_index === -1) continue; + const node_namespace = entry.name.substring(0, namepsace_index + config.NOTIFICATION_LOG_NS.length); + if (seen_nodes.has(node_namespace)) { + //already handled this node name + continue; + } else { + seen_nodes.add(node_namespace); + } + dbg.log1("process_notification_files node_namespace =", node_namespace, ", file =", entry.name); + const log = new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_namespace, { locking: 'EXCLUSIVE' }); + try { + await log.process(async (file, failure_append) => await this._notify(this.fs_context, file, failure_append)); + } catch (err) { + dbg.error('processing notifications log file failed', log.file); + throw err; + } finally { + await log.close(); + this.notif_to_connect.clear(); + } + } + } + + /** + * @param {nb.NativeFSContext} fs_context + * @param {string} log_file + * @returns {Promise} + */ + async _notify(fs_context, log_file, failure_append) { + const file = new LogFile(fs_context, log_file); + const send_promises = []; + await file.collect_and_process(async str => { + const notif = JSON.parse(str); + dbg.log2("notifying with notification =", notif); + let connect = this.notif_to_connect.get(notif.meta.name); + if (!connect) { + connect = parse_connect_file(notif.meta.connect); + this.notif_to_connect.set(notif.meta.name, connect); + } + let connection = this.connect_str_to_connection.get(notif.meta.name); + if (!connection) { + connection = get_connection(connect); + try { + await connection.connect(); + } catch (err) { + //failed to connect + dbg.error("Connection failed for", connect); + await failure_append(str); + return; + } + this.connect_str_to_connection.set(notif.meta.name, connection); + } + const send_promise = connection.promise_notify(notif, failure_append); + if (send_promise) send_promises.push(send_promise); + }); + //note we can't reject promises here, since Promise.all() is rejected on + //first rejected promise, and that would not await other send_promises() + await Promise.all(send_promises); + + //as failed sends are written to failure log, + //we can remove the currently processed persistent file + return true; + } +} + +class HttpNotificator { + + constructor(connect_obj) { + this.connect_obj = connect_obj; + this.protocol = connect_obj.notification_protocol.toLowerCase() === 'https' ? https : http; + } + + connect() { + this.agent = new this.protocol.Agent(this.connect_obj.agent_request_object); + } + + promise_notify(notif, promise_failure_cb) { + return new Promise(resolve => { + const req = this.protocol.request({ + agent: this.agent, + method: 'POST', + ...this.connect_obj.request_options_object}, + result => { + //result.pipe(process.stdout); + resolve(); + }); + req.on('error', err => { + if (req.destroyed) { + //error emitted because of timeout, nothing more to do + return; + } + dbg.error("Notify err =", err); + promise_failure_cb(JSON.stringify(notif)).then(resolve); + }); + req.on('timeout', () => { + dbg.error("Notify timeout"); + req.destroy(); + promise_failure_cb(JSON.stringify(notif)).then(resolve); + }); + req.write(JSON.stringify(notif.notif)); + req.end(); + }); + } + + destroy() { + this.agent.destroy(); + } +} + +class KafkaNotificator { + + constructor(connect_obj) { + this.connect_obj = connect_obj; + } + + async connect() { + //kafka client doens't like options it's not familiar with + //so delete them before connecting + const connect_for_kafka = structuredClone(this.connect_obj); + delete connect_for_kafka.topic; + delete connect_for_kafka.notification_protocol; + delete connect_for_kafka.name; + this.connection = new Kafka.HighLevelProducer(connect_for_kafka); + await new Promise((res, rej) => { + this.connection.on('ready', () => { + res(); + }); + this.connection.on('connection.failure', err => { + rej(err); + }); + this.connection.on('event.log', arg => { + dbg.log1("event log", arg); + }); + this.connection.connect(); + }); + this.connection.setPollInterval(100); + } + + promise_notify(notif, promise_failure_cb) { + const connect_obj = this.connect_obj; + return new Promise(resolve => { + this.connection.produce( + connect_obj.topic, + null, + Buffer.from(JSON.stringify(notif.notif)), + null, + Date.now(), + (err, offset) => { + if (err) { + promise_failure_cb(JSON.stringify(notif)).then(resolve); + } else { + resolve(); + } + } + ); + }); + } + + destroy() { + this.connection.flush(10000); + this.connection.disconnect(); + } +} + +//replace properties starting with 'local_file' +//with the content of the pointed file +//(useful for loading tls certificates into http options object) +function load_files(object) { + if (typeof object !== 'object') return; + for (const key in object) { + if (key.startsWith('local_file')) { + const new_key = key.substring('local_file_'.length); + const content = fs.readFileSync(object[key], 'utf-8'); + object[new_key] = content; + delete object[key]; + } else { + load_files(object[key]); + } + } + dbg.log2('load_files for obj =', object); +} + +function parse_connect_file(connect_filepath) { + const connect = {}; + const connect_strs = fs.readFileSync(connect_filepath, 'utf-8').split(os.EOL); + for (const connect_str of connect_strs) { + if (connect_str === '') continue; + const kv = connect_str.split('='); + //parse JSONs- + if (kv[0].endsWith('object')) { + kv[1] = JSON.parse(kv[1]); + } + connect[kv[0]] = kv[1]; + } + //parse file contents (useful for tls cert files) + load_files(connect); + return connect; +} + + +function get_connection(connect) { + switch (connect.notification_protocol.toLowerCase()) { + case 'http': + case 'https': + { + return new HttpNotificator(connect); + } + case 'kafka': { + return new KafkaNotificator(connect); + } + default: { + dbg.error("Unknown notification protocol", connect.notification_protocol); + //nothing more to possibly do with this notification, don't append to failure log + } + } +} + + +async function test_notifications(bucket) { + for (const notif of bucket.notifications) { + const connect = parse_connect_file(notif.connect); + dbg.log1("testing notif", notif); + try { + const connection = get_connection(connect); + await connection.connect(); + await connection.promise_notify({notif: "test notification"}, async err => err); + connection.destroy(); + } catch (err) { + dbg.error("Connection failed for", connect); + return err; + } + } +} + +//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html +function compose_notification(req, res, bucket, notif_conf) { + let eTag = res.getHeader('ETag'); + //eslint-disable-next-line + if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) { + eTag = eTag.substring(2, eTag.length - 2); + } + + const event = OP_TO_EVENT[req.op_name]; + const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase(); + const event_time = new Date(); + + const notif = { + eventVersion: '2.3', + eventSource: _get_system_name(req) + ':s3', + eventTime: event_time.toISOString(), + eventName: event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized), + userIdentity: { + principalId: req.object_sdk.requesting_account.name, + }, + requestParameters: { + sourceIPAddress: http_utils.parse_client_ip(req), + }, + responseElements: { + "x-amz-request-id": req.request_id, + "x-amz-id-2": req.request_id, + }, + s3: { + s3SchemaVersion: "1.0", + configurationId: notif_conf.name, + bucket: { + name: bucket.name, + ownerIdentity: { + principalId: bucket.bucket_owner.unwrap(), + }, + arn: "arn:aws:s3:::" + bucket.name, + }, + object: { + key: req.params.key, + size: res.getHeader('content-length'), + eTag, + versionId: res.getHeader('x-amz-version-id'), + }, + } + }; + + //handle glacierEventData + if (res.restore_object_result) { + notif.glacierEventData = { + restoreEventData: { + lifecycleRestorationExpiryTime: res.restore_object_result.expires_on.toISOString(), + lifecycleRestoreStorageClass: res.restore_object_result.storage_class, + }, + }; + } + + //handle sequencer + if (res.seq) { + //in noobaa-ns we have a sequence from db + notif.s3.object.sequencer = res.seq; + } else { + //fallback to time-based sequence + notif.s3.object.sequencer = event_time.getTime().toString(16); + } + + const records = [notif]; + + return {Records: records}; +} + + + +function _get_system_name(req) { + + if (req.object_sdk.nsfs_config_root) { + const name = Object.keys(req.object_sdk.nsfs_system)[0]; + return name; + } else { + //see comment on Notificator._can_run() for the require here + const system_store = require('../server/system_services/system_store').get_instance(); + return system_store.data.systems[0].name; + } +} + +function check_notif_relevant(notif, req) { + const op_event = OP_TO_EVENT[req.op_name]; + if (!op_event) { + //s3 op is not relevant for notifications + return false; + } + + //if no events were specified, always notify + if (!notif.Events) return true; + + //check request's event is in notification's events list + for (const notif_event of notif.Events) { + const notif_event_elems = notif_event.split(':'); + const notif_event_name = notif_event_elems[1]; + const notif_event_method = notif_event_elems[2]; + if (notif_event_name.toLowerCase() !== op_event.name.toLowerCase()) return false; + //is there filter by method? + if (notif_event_method === '*') { + //no filtering on method. we've passed the filter and need to send a notification + return true; + } + //take request method by this order + //1 op_event.method - in case method can be inferred from req.op_name, eg s3_post_object_uploadId + //2 op explicitly set req.s3_event_method, eg DeleteMarkerCreated + //3 default to req.method (aka "http verb") eg get/post/delete + const op_method = op_event.method || req.s3_event_method || req.method; + if (notif_event_method.toLowerCase() === op_method.toLowerCase()) return true; + } + + //request does not match any of the requested events + return false; +} + +exports.Notificator = Notificator; +exports.test_notifications = test_notifications; +exports.compose_notification = compose_notification; +exports.check_notif_relevant = check_notif_relevant;