From 3f9525c73c75513d5d719d46767beb481f1c7b58 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 8 Jan 2025 15:54:07 +0800 Subject: [PATCH 1/3] remove field _is_sky_managed --- sky/data/storage.py | 55 +++++++++++++---------------------- sky/utils/controller_utils.py | 12 ++++++-- sky/utils/schemas.py | 3 -- 3 files changed, 30 insertions(+), 40 deletions(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index 018cb2797ca..2916282e1a1 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -392,11 +392,11 @@ def upload(self) -> None: """ raise NotImplementedError - def delete(self) -> None: + def delete(self, force_delete_bucket: bool = False) -> None: """Removes the Storage from the cloud.""" raise NotImplementedError - def _delete_sub_path(self) -> None: + def delete_sub_path(self) -> None: """Removes objects from the sub path in the bucket.""" raise NotImplementedError @@ -550,8 +550,6 @@ def __init__( mode: StorageMode = StorageMode.MOUNT, sync_on_reconstruction: bool = True, # pylint: disable=invalid-name - _is_sky_managed: Optional[bool] = None, - # pylint: disable=invalid-name _bucket_sub_path: Optional[str] = None ) -> None: """Initializes a Storage object. @@ -591,16 +589,6 @@ def __init__( there. This is set to false when the Storage object is created not for direct use, e.g. for 'sky storage delete', or the storage is being re-used, e.g., for `sky start` on a stopped cluster. - _is_sky_managed: Optional[bool]; Indicates if the storage is managed - by Sky. Without this argument, the controller's behavior differs - from the local machine. For example, if a bucket does not exist: - Local Machine (is_sky_managed=True) → - Controller (is_sky_managed=False). - With this argument, the controller aligns with the local machine, - ensuring it retains the is_sky_managed information from the YAML. - During teardown, if is_sky_managed is True, the controller should - delete the bucket. Otherwise, it might mistakenly delete only the - sub-path, assuming is_sky_managed is False. _bucket_sub_path: Optional[str]; The subdirectory to use for the storage object. """ @@ -610,7 +598,6 @@ def __init__( self.mode = mode assert mode in StorageMode self.sync_on_reconstruction = sync_on_reconstruction - self._is_sky_managed = _is_sky_managed self._bucket_sub_path = _bucket_sub_path # TODO(romilb, zhwu): This is a workaround to support storage deletion @@ -1024,7 +1011,6 @@ def add_store(self, source=self.source, region=region, sync_on_reconstruction=self.sync_on_reconstruction, - is_sky_managed=self._is_sky_managed, _bucket_sub_path=self._bucket_sub_path) except exceptions.StorageBucketCreateError: # Creation failed, so this must be sky managed store. Add failure @@ -1100,7 +1086,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: else: global_user_state.set_storage_handle(self.name, self.handle) elif self.force_delete: - store.delete() + store.delete(force_delete_bucket=True) # Remove store from bookkeeping del self.stores[store_type] else: @@ -1109,7 +1095,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: self.handle.remove_store(store) store.delete() elif self.force_delete: - store.delete() + store.delete(force_delete_bucket=True) self.stores = {} # Remove storage from global_user_state if present global_user_state.remove_storage(self.name) @@ -1157,8 +1143,6 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage': mode_str = config.pop('mode', None) force_delete = config.pop('_force_delete', None) # pylint: disable=invalid-name - _is_sky_managed = config.pop('_is_sky_managed', None) - # pylint: disable=invalid-name _bucket_sub_path = config.pop('_bucket_sub_path', None) if force_delete is None: force_delete = False @@ -1180,7 +1164,6 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage': source=source, persistent=persistent, mode=mode, - _is_sky_managed=_is_sky_managed, _bucket_sub_path=_bucket_sub_path) if store is not None: storage_obj.add_store(StoreType(store.upper())) @@ -1205,12 +1188,9 @@ def add_if_not_none(key: str, value: Optional[Any]): add_if_not_none('source', self.source) stores = None - is_sky_managed = self._is_sky_managed if self.stores: stores = ','.join([store.value for store in self.stores]) - is_sky_managed = list(self.stores.values())[0].is_sky_managed add_if_not_none('store', stores) - add_if_not_none('_is_sky_managed', is_sky_managed) add_if_not_none('persistent', self.persistent) add_if_not_none('mode', self.mode.value) if self.force_delete: @@ -1415,8 +1395,9 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: - if self._bucket_sub_path is not None and not self.is_sky_managed: + def delete(self, force_delete_bucket: bool = False) -> None: + if (not force_delete_bucket and self._bucket_sub_path is not None and + not self.is_sky_managed): return self._delete_sub_path() deleted_by_skypilot = self._delete_s3_bucket(self.name) @@ -1900,8 +1881,9 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: - if self._bucket_sub_path is not None and not self.is_sky_managed: + def delete(self, force_delete_bucket: bool = False) -> None: + if (not force_delete_bucket and self._bucket_sub_path is not None and + not self.is_sky_managed): return self._delete_sub_path() deleted_by_skypilot = self._delete_gcs_bucket(self.name) @@ -2730,9 +2712,10 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: + def delete(self, force_delete_bucket: bool = False) -> None: """Deletes the storage.""" - if self._bucket_sub_path is not None and not self.is_sky_managed: + if (not force_delete_bucket and self._bucket_sub_path is not None and + not self.is_sky_managed): return self._delete_sub_path() deleted_by_skypilot = self._delete_az_bucket(self.name) @@ -3183,8 +3166,9 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: - if self._bucket_sub_path is not None and not self.is_sky_managed: + def delete(self, force_delete_bucket: bool = False) -> None: + if (not force_delete_bucket and self._bucket_sub_path is not None and + not self.is_sky_managed): return self._delete_sub_path() deleted_by_skypilot = self._delete_r2_bucket(self.name) @@ -3665,8 +3649,9 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: - if self._bucket_sub_path is not None and not self.is_sky_managed: + def delete(self, force_delete_bucket: bool = False) -> None: + if (not force_delete_bucket and self._bucket_sub_path is not None and + not self.is_sky_managed): return self._delete_sub_path() self._delete_cos_bucket() @@ -4100,7 +4085,7 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self) -> None: + def delete(self, force_delete_bucket: bool = False) -> None: deleted_by_skypilot = self._delete_oci_bucket(self.name) if deleted_by_skypilot: msg_str = f'Deleted OCI bucket {self.name}.' diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 39623085bbb..57ca7fa4ea6 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -700,6 +700,11 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: # we store all these files in same bucket from config. bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None) store_kwargs: Dict[str, Any] = {} + # Controllers don't have the knowledge of whether the bucket is managed by + # sky or not, By default we consider the sky create and managed the + # intermediate bucket so we let controller delete the buckets after job + # finishes. + force_delete = True if bucket_wth_prefix is None: store_type = store_cls = sub_path = None storage_account_name = region = None @@ -713,7 +718,8 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: store_kwargs['storage_account_name'] = storage_account_name if region is not None: store_kwargs['region'] = region - + # If the bucket is not managed by sky, we should not force delete it. + force_delete = False # Step 1: Translate the workdir to SkyPilot storage. new_storage_mounts = {} if task.workdir is not None: @@ -745,6 +751,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.COPY, stores=stores, _bucket_sub_path=bucket_sub_path) + storage_obj.force_delete = force_delete new_storage_mounts[constants.SKY_REMOTE_WORKDIR] = storage_obj # Check of the existence of the workdir in file_mounts is done in # the task construction. @@ -782,6 +789,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.COPY, stores=stores, _bucket_sub_path=bucket_sub_path) + storage_obj.force_delete = force_delete new_storage_mounts[dst] = storage_obj logger.info(f' {colorama.Style.DIM}Folder : {src!r} ' f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}') @@ -821,7 +829,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.MOUNT, stores=stores, _bucket_sub_path=file_mounts_tmp_subpath) - + storage_obj.force_delete = force_delete new_storage_mounts[file_mount_remote_tmp_dir] = storage_obj if file_mount_remote_tmp_dir in original_storage_mounts: with ux_utils.print_exception_no_traceback(): diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 3194dc79da5..cf74ea2309c 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -299,9 +299,6 @@ def get_storage_schema(): mode.value for mode in storage.StorageMode ] }, - '_is_sky_managed': { - 'type': 'boolean', - }, '_bucket_sub_path': { 'type': 'string', }, From ce4500173864d333da2d43919f264b39ced7dfb0 Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 8 Jan 2025 17:45:24 +0800 Subject: [PATCH 2/3] deduce is_sky_managed --- sky/data/storage.py | 62 +++++++++++++++++++++-------------- sky/utils/controller_utils.py | 12 ++----- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index 2916282e1a1..aa9b0cf16e0 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -78,6 +78,11 @@ _STORAGE_LOG_FILE_NAME = 'storage_sync.log' +def _is_sky_managed_intermediate_bucket(bucket_name: str) -> bool: + return re.match(r'skypilot-filemounts-.+-[a-f0-9]{8}', + bucket_name) is not None + + def get_cached_enabled_storage_clouds_or_refresh( raise_if_no_cloud_access: bool = False) -> List[str]: # This is a temporary solution until https://github.com/skypilot-org/skypilot/issues/1943 # pylint: disable=line-too-long @@ -392,7 +397,7 @@ def upload(self) -> None: """ raise NotImplementedError - def delete(self, force_delete_bucket: bool = False) -> None: + def delete(self) -> None: """Removes the Storage from the cloud.""" raise NotImplementedError @@ -1086,7 +1091,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: else: global_user_state.set_storage_handle(self.name, self.handle) elif self.force_delete: - store.delete(force_delete_bucket=True) + store.delete() # Remove store from bookkeeping del self.stores[store_type] else: @@ -1095,7 +1100,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None: self.handle.remove_store(store) store.delete() elif self.force_delete: - store.delete(force_delete_bucket=True) + store.delete() self.stores = {} # Remove storage from global_user_state if present global_user_state.remove_storage(self.name) @@ -1364,7 +1369,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def upload(self): """Uploads source to store bucket. @@ -1395,9 +1402,8 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: - if (not force_delete_bucket and self._bucket_sub_path is not None and - not self.is_sky_managed): + def delete(self) -> None: + if self._bucket_sub_path is not None and not self.is_sky_managed: return self._delete_sub_path() deleted_by_skypilot = self._delete_s3_bucket(self.name) @@ -1848,7 +1854,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def upload(self): """Uploads source to store bucket. @@ -1881,9 +1889,8 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: - if (not force_delete_bucket and self._bucket_sub_path is not None and - not self.is_sky_managed): + def delete(self) -> None: + if self._bucket_sub_path is not None and not self.is_sky_managed: return self._delete_sub_path() deleted_by_skypilot = self._delete_gcs_bucket(self.name) @@ -2424,7 +2431,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def _update_storage_account_name_and_resource(self): self.storage_account_name, self.resource_group_name = ( @@ -2712,10 +2721,9 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: + def delete(self) -> None: """Deletes the storage.""" - if (not force_delete_bucket and self._bucket_sub_path is not None and - not self.is_sky_managed): + if self._bucket_sub_path is not None and not self.is_sky_managed: return self._delete_sub_path() deleted_by_skypilot = self._delete_az_bucket(self.name) @@ -3135,7 +3143,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def upload(self): """Uploads source to store bucket. @@ -3166,9 +3176,8 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: - if (not force_delete_bucket and self._bucket_sub_path is not None and - not self.is_sky_managed): + def delete(self) -> None: + if self._bucket_sub_path is not None and not self.is_sky_managed: return self._delete_sub_path() deleted_by_skypilot = self._delete_r2_bucket(self.name) @@ -3615,7 +3624,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def upload(self): """Uploads files from local machine to bucket. @@ -3649,9 +3660,8 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: - if (not force_delete_bucket and self._bucket_sub_path is not None and - not self.is_sky_managed): + def delete(self) -> None: + if self._bucket_sub_path is not None and not self.is_sky_managed: return self._delete_sub_path() self._delete_cos_bucket() @@ -4060,7 +4070,9 @@ def initialize(self): # object (i.e., did not exist in global_user_state) and we should # set the is_sky_managed property. # If is_sky_managed is specified, then we take no action. - self.is_sky_managed = is_new_bucket + self.is_sky_managed = (is_new_bucket or + _is_sky_managed_intermediate_bucket( + self.name)) def upload(self): """Uploads source to store bucket. @@ -4085,7 +4097,7 @@ def upload(self): raise exceptions.StorageUploadError( f'Upload failed for store {self.name}') from e - def delete(self, force_delete_bucket: bool = False) -> None: + def delete(self) -> None: deleted_by_skypilot = self._delete_oci_bucket(self.name) if deleted_by_skypilot: msg_str = f'Deleted OCI bucket {self.name}.' diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 57ca7fa4ea6..39623085bbb 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -700,11 +700,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: # we store all these files in same bucket from config. bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None) store_kwargs: Dict[str, Any] = {} - # Controllers don't have the knowledge of whether the bucket is managed by - # sky or not, By default we consider the sky create and managed the - # intermediate bucket so we let controller delete the buckets after job - # finishes. - force_delete = True if bucket_wth_prefix is None: store_type = store_cls = sub_path = None storage_account_name = region = None @@ -718,8 +713,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: store_kwargs['storage_account_name'] = storage_account_name if region is not None: store_kwargs['region'] = region - # If the bucket is not managed by sky, we should not force delete it. - force_delete = False + # Step 1: Translate the workdir to SkyPilot storage. new_storage_mounts = {} if task.workdir is not None: @@ -751,7 +745,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.COPY, stores=stores, _bucket_sub_path=bucket_sub_path) - storage_obj.force_delete = force_delete new_storage_mounts[constants.SKY_REMOTE_WORKDIR] = storage_obj # Check of the existence of the workdir in file_mounts is done in # the task construction. @@ -789,7 +782,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.COPY, stores=stores, _bucket_sub_path=bucket_sub_path) - storage_obj.force_delete = force_delete new_storage_mounts[dst] = storage_obj logger.info(f' {colorama.Style.DIM}Folder : {src!r} ' f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}') @@ -829,7 +821,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None: mode=storage_lib.StorageMode.MOUNT, stores=stores, _bucket_sub_path=file_mounts_tmp_subpath) - storage_obj.force_delete = force_delete + new_storage_mounts[file_mount_remote_tmp_dir] = storage_obj if file_mount_remote_tmp_dir in original_storage_mounts: with ux_utils.print_exception_no_traceback(): From abfbe219e75cc7c9af0d555c088e0ecffa3ca22a Mon Sep 17 00:00:00 2001 From: ZePing Guo Date: Wed, 8 Jan 2025 17:46:44 +0800 Subject: [PATCH 3/3] restore function name --- sky/data/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index aa9b0cf16e0..5937b8fd73c 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -401,7 +401,7 @@ def delete(self) -> None: """Removes the Storage from the cloud.""" raise NotImplementedError - def delete_sub_path(self) -> None: + def _delete_sub_path(self) -> None: """Removes objects from the sub path in the bucket.""" raise NotImplementedError