Skip to content

Commit

Permalink
[Storage] Continue storage deletion in sky storage delete when some…
Browse files Browse the repository at this point in the history
… fail (#4454)

* feat: support storage deletion recovery

* fix: check handle None later and add storage name consistency check

* Apply suggestions from code review

Co-authored-by: Tian Xia <[email protected]>

* refactor: use storage_name=* to replace a trivial function

* refactor: fine-grained catch

* revert: run_in_parallel `continue_on_errors`

* chore: remove TODO

* style: comment on why *

Co-authored-by: Tian Xia <[email protected]>

* revert: use broad-except

---------

Co-authored-by: Tian Xia <[email protected]>
  • Loading branch information
andylizf and cblmemo authored Jan 17, 2025
1 parent bdde47b commit 0a810ee
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 26 deletions.
14 changes: 10 additions & 4 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3530,11 +3530,11 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
if sum([bool(names), all]) != 1:
raise click.UsageError('Either --all or a name must be specified.')
if all:
storages = sky.storage_ls()
if not storages:
# Use '*' to get all storages.
names = global_user_state.get_glob_storage_name(storage_name='*')
if not names:
click.echo('No storage(s) to delete.')
return
names = [s['name'] for s in storages]
else:
names = _get_glob_storages(names)
if names:
Expand All @@ -3548,7 +3548,13 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
abort=True,
show_default=True)

subprocess_utils.run_in_parallel(sky.storage_delete, names)
def delete_storage(name: str) -> None:
try:
sky.storage_delete(name)
except Exception as e: # pylint: disable=broad-except
click.secho(f'Error deleting storage {name}: {e}', fg='red')

subprocess_utils.run_in_parallel(delete_storage, names)


@cli.group(cls=_NaturalOrderGroup)
Expand Down
13 changes: 8 additions & 5 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None:
handle = global_user_state.get_handle_from_storage_name(name)
if handle is None:
raise ValueError(f'Storage name {name!r} not found.')
else:
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()

assert handle.storage_name == name, (
f'In global_user_state, storage name {name!r} does not match '
f'handle.storage_name {handle.storage_name!r}')
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()
23 changes: 16 additions & 7 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,18 +1083,16 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
if not self.stores:
logger.info('No backing stores found. Deleting storage.')
global_user_state.remove_storage(self.name)
if store_type:
if store_type is not None:
store = self.stores[store_type]
is_sky_managed = store.is_sky_managed
# We delete a store from the cloud if it's sky managed. Else just
# remove handle and return
if is_sky_managed:
if store.is_sky_managed:
self.handle.remove_store(store)
store.delete()
# Check remaining stores - if none is sky managed, remove
# the storage from global_user_state.
delete = all(
s.is_sky_managed is False for s in self.stores.values())
delete = all(not s.is_sky_managed for s in self.stores.values())
if delete:
global_user_state.remove_storage(self.name)
else:
Expand Down Expand Up @@ -1689,6 +1687,9 @@ def _delete_s3_bucket(self, bucket_name: str) -> bool:
Returns:
bool; True if bucket was deleted, False if it was deleted externally.
Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -2179,6 +2180,11 @@ def _delete_gcs_bucket(
Returns:
bool; True if bucket was deleted, False if it was deleted externally.
Raises:
StorageBucketDeleteError: If deleting the bucket fails.
PermissionError: If the bucket is external and the user is not
allowed to delete it.
"""
if _bucket_sub_path is not None:
command_suffix = f'/{_bucket_sub_path}'
Expand Down Expand Up @@ -3478,6 +3484,9 @@ def _delete_r2_bucket(self, bucket_name: str) -> bool:
Returns:
bool; True if bucket was deleted, False if it was deleted externally.
Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -3932,7 +3941,7 @@ def _create_cos_bucket(self,

def _delete_cos_bucket_objects(self,
bucket: Any,
prefix: Optional[str] = None):
prefix: Optional[str] = None) -> None:
bucket_versioning = self.s3_resource.BucketVersioning(bucket.name)
if bucket_versioning.status == 'Enabled':
if prefix is not None:
Expand All @@ -3947,7 +3956,7 @@ def _delete_cos_bucket_objects(self,
res = list(bucket.objects.delete())
logger.debug(f'Deleted bucket\'s content:\n{res}, prefix: {prefix}')

def _delete_cos_bucket(self):
def _delete_cos_bucket(self) -> None:
bucket = self.s3_resource.Bucket(self.name)
try:
self._delete_cos_bucket_objects(bucket)
Expand Down
2 changes: 1 addition & 1 deletion sky/global_user_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def get_storage_names_start_with(starts_with: str) -> List[str]:


def get_storage() -> List[Dict[str, Any]]:
rows = _DB.cursor.execute('select * from storage')
rows = _DB.cursor.execute('SELECT * FROM storage')
records = []
for name, launched_at, handle, last_use, status in rows:
# TODO: use namedtuple instead of dict
Expand Down
20 changes: 11 additions & 9 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ def run_in_parallel(func: Callable,
num_threads: Optional[int] = None) -> List[Any]:
"""Run a function in parallel on a list of arguments.
The function 'func' should raise a CommandError if the command fails.
Args:
func: The function to run in parallel
args: Iterable of arguments to pass to func
Expand All @@ -112,19 +110,23 @@ def run_in_parallel(func: Callable,
Returns:
A list of the return values of the function func, in the same order as the
arguments.
arguments.
Raises:
Exception: The first exception encountered.
"""
# Short-circuit for short lists
if len(args) == 0:
return []
# Short-circuit for single element
if len(args) == 1:
return [func(args[0])]
# Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long
processes = num_threads if num_threads is not None else get_parallel_threads(
)

processes = (num_threads
if num_threads is not None else get_parallel_threads())

with pool.ThreadPool(processes=processes) as p:
# Run the function in parallel on the arguments, keeping the order.
return list(p.imap(func, args))
ordered_iterators = p.imap(func, args)
return list(ordered_iterators)


def handle_returncode(returncode: int,
Expand Down

0 comments on commit 0a810ee

Please sign in to comment.