Skip to content

Commit

Permalink
Bucket Notifications (#8337)
Browse files Browse the repository at this point in the history
Bucket Notifications - phase 1

Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz authored Oct 30, 2024
1 parent 9ad101e commit 9dece0e
Show file tree
Hide file tree
Showing 29 changed files with 821 additions and 39 deletions.
6 changes: 6 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,12 @@ config.PERSISTENT_BUCKET_LOG_DIR = process.env.GUARANTEED_LOGS_PATH;
config.PERSISTENT_BUCKET_LOG_NS = 'bucket_logging';
config.BUCKET_LOG_CONCURRENCY = 10;

////////////////////////////////
// NOTIFICATIONS //
////////////////////////////////
config.NOTIFICATION_LOG_NS = 'notification_logging';
config.NOTIFICATION_LOG_DIR = process.env.NOTIFICATION_LOG_DIR;

///////////////////////////
// KEY ROTATOR //
///////////////////////////
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"nan": "2.20.0",
"ncp": "2.0.0",
"node-addon-api": "8.1.0",
"node-rdkafka": "3.0.1",
"performance-now": "2.1.0",
"pg": "8.13.0",
"ping": "0.4.4",
Expand Down
59 changes: 59 additions & 0 deletions src/api/bucket_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,59 @@ module.exports = {
}
},

get_bucket_notification: {
method: 'GET',
params: {
type: 'object',
required: [
'name'
],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
}
},
reply: {
type: 'object',
required: [
'notifications'
],
properties: {
notifications: {
type: 'array',
items: {
$ref: 'common_api#/definitions/bucket_notification'
}
}
}
},
auth: {
system: ['admin', 'user']
}
},

put_bucket_notification: {
method: 'PUT',
params: {
type: 'object',
required: [
'notifications',
'name',
],
properties: {
name: { $ref: 'common_api#/definitions/bucket_name' },
notifications: {
type: 'array',
items: {
$ref: 'common_api#/definitions/bucket_notification'
}
}
}
},
auth: {
system: ['admin', 'user'],
}
},

read_bucket_sdk_info: {
method: 'GET',
params: {
Expand Down Expand Up @@ -1130,6 +1183,12 @@ module.exports = {
bucket_info: {
$ref: '#/definitions/bucket_info'
},
notifications: {
type: 'array',
items: {
$ref: 'common_api#/definitions/bucket_notification'
}
}
}
},

Expand Down
40 changes: 40 additions & 0 deletions src/api/common_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,46 @@ module.exports = {
type: 'string',
},
}
},
bucket_notification: {
type: 'object',
required: ['Id', 'Connect'],
properties: {
Id: {
type: 'string'
},
Connect: {
type: 'string'
},
Events: {
type: 'array',
items: {
type: 'string',
enum: [
's3:TestEvent',
's3:ObjectCreated:*',
's3:ObjectCreated:Put',
's3:ObjectCreated:Post',
's3:ObjectCreated:Copy',
's3:ObjectCreated:CompleteMultipartUpload',
's3:ObjectRemoved:*',
's3:ObjectRemoved:Delete',
's3:ObjectRemoved:DeleteMarkerCreated',
's3:ObjectRestore:*',
's3:ObjectRestore:Post',
's3:ObjectRestore:Completed',
's3:ObjectRestore:Delete',
's3:ObjectTagging:*',
's3:ObjectTagging:Put',
's3:ObjectTagging:Delete',
/*We plan to support LifecycleExpiration
's3:LifecycleExpiration:*',
's3:LifecycleExpiration:Delete',
's3:LifecycleExpiration:DeleteMarkerCreated',*/
],
}
}
}
}
}
};
3 changes: 3 additions & 0 deletions src/api/object_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ module.exports = {
content_type: { type: 'string' },
content_encoding: { type: 'string' },
size: { type: 'integer' },
seq: { type: 'integer' },
}
},
auth: { system: ['admin', 'user'] }
Expand Down Expand Up @@ -653,6 +654,7 @@ module.exports = {
deleted_delete_marker: { type: 'boolean' },
created_version_id: { type: 'string' },
created_delete_marker: { type: 'boolean' },
seq: { type: 'integer' },
}
},
auth: { system: ['admin', 'user'] }
Expand Down Expand Up @@ -690,6 +692,7 @@ module.exports = {
deleted_delete_marker: { type: 'boolean' },
created_version_id: { type: 'string' },
created_delete_marker: { type: 'boolean' },
seq: { type: 'integer' },
err_code: {
type: 'string',
enum: ['AccessDenied', 'InternalError']
Expand Down
24 changes: 20 additions & 4 deletions src/cmd/manage_nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const { throw_cli_error, get_bucket_owner_account_by_name,
is_name_update, is_access_key_update } = require('../manage_nsfs/manage_nsfs_cli_utils');
const manage_nsfs_validations = require('../manage_nsfs/manage_nsfs_validations');
const nc_mkm = require('../manage_nsfs/nc_master_key_manager').get_instance();
const notifications_util = require('../util/notifications_util');

let config_fs;

Expand Down Expand Up @@ -70,6 +71,8 @@ async function main(argv = minimist(process.argv.slice(2))) {
await noobaa_cli_diagnose.manage_diagnose_operations(action, user_input, config_fs);
} else if (type === TYPES.UPGRADE) {
await noobaa_cli_upgrade.manage_upgrade_operations(action, user_input, config_fs);
} else if (type === TYPES.NOTIFICATION) {
await notification_management();
} else {
throw_cli_error(ManageCLIError.InvalidType);
}
Expand Down Expand Up @@ -100,8 +103,9 @@ async function fetch_bucket_data(action, user_input) {
should_create_underlying_storage: action === ACTIONS.ADD ? false : undefined,
new_name: user_input.new_name === undefined ? undefined : String(user_input.new_name),
fs_backend: user_input.fs_backend === undefined ? config.NSFS_NC_STORAGE_BACKEND : String(user_input.fs_backend),
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag)
};
force_md5_etag: user_input.force_md5_etag === undefined || user_input.force_md5_etag === '' ? user_input.force_md5_etag : get_boolean_or_string_value(user_input.force_md5_etag),
notifications: user_input.notifications
};

if (user_input.bucket_policy !== undefined) {
if (typeof user_input.bucket_policy === 'string') {
Expand Down Expand Up @@ -196,14 +200,22 @@ async function get_bucket_status(data) {
* @param {Object} data
* @returns { Promise<{ code: typeof ManageCLIResponse.BucketUpdated, detail: Object }>}
*/
async function update_bucket(data) {
async function update_bucket(data, user_input) {
const cur_name = data.name;
const new_name = data.new_name;
const name_update = is_name_update(data);
const cli_bucket_flags_to_remove = ['new_name'];
data = _.omit(data, cli_bucket_flags_to_remove);

let parsed_bucket_data;

if (user_input.notifications) {
//notifications are tested before they can be updated
const test_notif_err = await notifications_util.test_notifications(data);
if (test_notif_err) {
throw_cli_error(ManageCLIError.InvalidArgument, "Failed to update notifications", test_notif_err);
}
}
if (name_update) {
parsed_bucket_data = await config_fs.create_bucket_config_file({ ...data, name: new_name });
await config_fs.delete_bucket_config_file(cur_name);
Expand Down Expand Up @@ -271,7 +283,7 @@ async function bucket_management(action, user_input) {
} else if (action === ACTIONS.STATUS) {
response = await get_bucket_status(data);
} else if (action === ACTIONS.UPDATE) {
response = await update_bucket(data);
response = await update_bucket(data, user_input);
} else if (action === ACTIONS.DELETE) {
const force = get_boolean_or_string_value(user_input.force);
response = await delete_bucket(data, force);
Expand Down Expand Up @@ -706,5 +718,9 @@ async function logging_management() {
await manage_nsfs_logging.export_bucket_logging(config_fs);
}

async function notification_management() {
new notifications_util.Notificator({fs_context: config_fs.fs_context}).process_notification_files();
}

exports.main = main;
if (require.main === module) main();
11 changes: 7 additions & 4 deletions src/cmd/nsfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function print_usage() {
let nsfs_config_root;

class NsfsObjectSDK extends ObjectSDK {
constructor(fs_root, fs_config, account, versioning, config_root) {
constructor(fs_root, fs_config, account, versioning, config_root, nsfs_system) {
// const rpc_client_hooks = new_rpc_client_hooks();
// rpc_client_hooks.account.read_account_by_access_key = async ({ access_key }) => {
// if (access_key) {
Expand Down Expand Up @@ -152,6 +152,7 @@ class NsfsObjectSDK extends ObjectSDK {
this.nsfs_account = account;
this.nsfs_versioning = versioning;
this.nsfs_namespaces = {};
this.nsfs_system = nsfs_system;
if (!config_root) {
this._get_bucket_namespace = bucket_name => this._simple_get_single_bucket_namespace(bucket_name);
this.load_requesting_account = auth_req => this._simple_load_requesting_account(auth_req);
Expand Down Expand Up @@ -239,6 +240,7 @@ class NsfsAccountSDK extends AccountSDK {
}
}

/* eslint-disable max-statements */
async function main(argv = minimist(process.argv.slice(2))) {
try {
config.DB_TYPE = 'none';
Expand Down Expand Up @@ -318,15 +320,16 @@ async function main(argv = minimist(process.argv.slice(2))) {
nsfs_config_root,
});

let system_data;
if (!simple_mode) {
// Do not move this function - we need to create/update RPM changes before starting the endpoint
const config_fs = new ConfigFS(nsfs_config_root);
const system_data = await config_fs.get_system_config_file({ silent_if_missing: true });
system_data = await config_fs.get_system_config_file({ silent_if_missing: true });
if (system_data && system_data[os.hostname()]) {
const nc_upgrade_manager = new NCUpgradeManager(config_fs);
await nc_upgrade_manager.update_rpm_upgrade();
} else {
await config_fs.init_nc_system();
system_data = await config_fs.init_nc_system();
}
}

Expand All @@ -340,7 +343,7 @@ async function main(argv = minimist(process.argv.slice(2))) {
forks,
nsfs_config_root,
init_request_sdk: (req, res) => {
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root);
req.object_sdk = new NsfsObjectSDK(fs_root, fs_config, account, versioning, nsfs_config_root, system_data);
req.account_sdk = new NsfsAccountSDK(fs_root, fs_config, account, nsfs_config_root);
}
});
Expand Down
14 changes: 12 additions & 2 deletions src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dbg.log0('endpoint: replacing old umask: ', old_umask.toString(8), 'with new uma
* sts_sdk?: StsSDK;
* virtual_hosts?: readonly string[];
* bucket_logger?: PersistentLogger;
* notification_logger?: PersistentLogger;
* }} EndpointRequest
*/

Expand Down Expand Up @@ -100,6 +101,7 @@ async function create_https_server(ssl_cert_info, honorCipherOrder, endpoint_han
/* eslint-disable max-statements */
async function main(options = {}) {
let bucket_logger;
let notification_logger;
try {
// setting process title needed for letting GPFS to identify the noobaa endpoint processes see issue #8039.
if (config.ENDPOINT_PROCESS_TITLE) {
Expand Down Expand Up @@ -137,6 +139,12 @@ async function main(options = {}) {
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

notification_logger = config.NOTIFICATION_LOG_DIR &&
new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, {
locking: 'SHARED',
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
});

process.on('warning', e => dbg.warn(e.stack));

let internal_rpc_client;
Expand Down Expand Up @@ -174,7 +182,8 @@ async function main(options = {}) {
init_request_sdk = create_init_request_sdk(rpc, internal_rpc_client, object_io);
}

const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false, bucket_logger);
const endpoint_request_handler = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ false,
bucket_logger, notification_logger);
const endpoint_request_handler_sts = create_endpoint_handler(init_request_sdk, virtual_hosts, /*is_sts?*/ true);

const ssl_cert_info = await ssl_utils.get_ssl_cert_info('S3', options.nsfs_config_root);
Expand Down Expand Up @@ -266,7 +275,7 @@ async function main(options = {}) {
* @param {readonly string[]} virtual_hosts
* @returns {EndpointHandler}
*/
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger, notification_logger) {
const blob_rest_handler = process.env.ENDPOINT_BLOB_ENABLED === 'true' ? blob_rest : unavailable_handler;
const lambda_rest_handler = config.DB_TYPE === 'mongodb' ? lambda_rest : unavailable_handler;

Expand All @@ -276,6 +285,7 @@ function create_endpoint_handler(init_request_sdk, virtual_hosts, sts, logger) {
endpoint_utils.prepare_rest_request(req);
req.virtual_hosts = virtual_hosts;
if (logger) req.bucket_logger = logger;
if (notification_logger) req.notification_logger = notification_logger;
init_request_sdk(req, res);
if (req.url.startsWith('/2015-03-31/functions')) {
return lambda_rest_handler(req, res);
Expand Down
2 changes: 2 additions & 0 deletions src/endpoint/s3/ops/s3_delete_object.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ async function delete_object(req, res) {
} else if (del_res.created_delete_marker) {
res.setHeader('x-amz-version-id', del_res.created_version_id);
res.setHeader('x-amz-delete-marker', 'true');
req.s3_event_method = 'DeleteMarkerCreated';
}
res.seq = del_res.seq;
}

module.exports = {
Expand Down
21 changes: 17 additions & 4 deletions src/endpoint/s3/ops/s3_get_bucket_notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
* http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETnotification.html
*/
async function get_bucket_notification(req) {
await req.object_sdk.read_bucket({ name: req.params.bucket });
return {
NotificationConfiguration: ''
};

const result = await req.object_sdk.get_bucket_notification({
bucket_name: req.params.bucket,
});


const reply = result && result.length > 0 ?
{
//return result inside TopicConfiguration tag
NotificationConfiguration: {
TopicConfiguration: result
}
} :
//if there's no notification, reuturn empty NotificationConfiguration tag
{ NotificationConfiguration: {} };

return reply;
}

module.exports = {
Expand Down
Loading

0 comments on commit 9dece0e

Please sign in to comment.