Skip to content

Commit

Permalink
Merge pull request #8485 from nadavMiz/delete_version_concurrency
Browse files Browse the repository at this point in the history
NSFS | GPFS | versioning | fix delete version concurrency issues
  • Loading branch information
nadavMiz authored Oct 29, 2024
2 parents 348fe72 + 3a05283 commit 15eab01
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,7 @@ class NamespaceFS {
dbg.warn(`NamespaceFS.read_object_md: retrying retries=${retries} file_path=${file_path}`, err);
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
}
}
this._throw_if_delete_marker(stat, params);
Expand Down Expand Up @@ -1037,6 +1038,7 @@ class NamespaceFS {
{bucket_path: this.bucket_path, object_name: params.key}, err);
throw err;
}
await P.delay(get_random_delay(config.NSFS_RANDOM_DELAY_BASE, 0, 50));
}
}
this._throw_if_delete_marker(stat, params);
Expand Down Expand Up @@ -2847,6 +2849,13 @@ class NamespaceFS {
const deleted_latest = file_path === latest_version_path;
if (deleted_latest) {
gpfs_options = await this._open_files_gpfs(fs_context, file_path, undefined, undefined, undefined, undefined, true);
if (gpfs_options) {
const src_stat = await gpfs_options.delete_version.src_file.stat(fs_context);
if (this._is_mismatch_version_id(src_stat, version_id)) {
dbg.warn('NamespaceFS._delete_single_object_versioned mismatch version_id', file_path, version_id, this._get_version_id_by_xattr(src_stat));
throw error_utils.new_error_code('MISMATCH_VERSION', 'file version does not match the version we asked for');
}
}
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
await native_fs_utils.safe_unlink(fs_context, file_path, version_info,
gpfs_options?.delete_version, bucket_tmp_dir_path);
Expand Down Expand Up @@ -3007,15 +3016,17 @@ class NamespaceFS {
let latest_ver_info;
for (;;) {
try {
// get latest version_id if exists
// TODO get latest version from file in POSIX like in GPFS path
latest_ver_info = await this._get_version_info(fs_context, latest_ver_path);
const versioned_path = latest_ver_info && this._get_version_path(params.key, latest_ver_info.version_id_str);
const versioned_info = latest_ver_info && await this._get_version_info(fs_context, versioned_path);

dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info, versioned_path, versioned_info);
dbg.log1('Namespace_fs._delete_latest_version:', latest_ver_info);
if (latest_ver_info) {
if (is_gpfs) {
gpfs_options = await this._open_files_gpfs(fs_context, latest_ver_path, undefined, undefined, undefined,
undefined, true, versioned_info);
undefined, true);
const latest_fd = gpfs_options?.move_to_dst?.dst_file;
latest_ver_info = latest_fd && await this._get_version_info(fs_context, undefined, latest_fd);
}
const versioned_path = latest_ver_info && this._get_version_path(params.key, latest_ver_info.version_id_str);

const suspended_and_latest_is_not_null = this._is_versioning_suspended() &&
latest_ver_info.version_id_str !== NULL_VERSION_ID;
Expand Down Expand Up @@ -3172,7 +3183,7 @@ class NamespaceFS {
let dir_file;
let versioned_file;
try {
// open /versions/key_ver file if exists
// open /versions/key_ver file if exists. TODO is versioned_file needed
versioned_file = versioned_info && await native_fs_utils.open_file(fs_context, this.bucket_path, versioned_info.path, 'r');

// open files for deletion flow
Expand Down

0 comments on commit 15eab01

Please sign in to comment.