Skip to content

Commit

Permalink
Differentiate type of storage
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Oct 21, 2024
1 parent b7d2024 commit cbaf3c7
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 28 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ and the following GPU types:
and the following CPU types:
* n2-standard-32

xpk also supports Google Cloud Storage solutions:
* [Cloud Storage FUSE](https://cloud.google.com/storage/docs/gcs-fuse)

# Permissions needed on Cloud Console:

Artifact Registry Writer
Expand Down Expand Up @@ -118,6 +121,8 @@ cleanup with a `Cluster Delete`.
If you have failures with workloads not running, use `xpk inspector` to investigate
more.

If you need your Workloads to have persistent storage, use `xpk storage` to find more.

## Cluster Create

First set the project and zone through gcloud config or xpk arguments.
Expand Down Expand Up @@ -313,6 +318,30 @@ will fail the cluster creation process because Vertex AI Tensorboard is not supp
--tpu-type=v5litepod-16
```

## Storage Create
Currently xpk supports Cloud Storage FUSE. A FUSE adapter that lets you mount and access Cloud Storage buckets as local file systems, so applications can read and write objects in your bucket using standard file system semantics.

To use the GCS FUSE with XPK user needs to create a a [Storage Bucket](https://pantheon.corp.google.com/storage/)
and a manifest with PersistentVolume and PersistentVolumeClaim that mounts to the Bucket. To learn how to properly
set up PersistentVolume and PersistentVolumeClaim visit [GKE Cloud Storage documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver#provision-static)

Once it's ready user can define
`--type` - defines a type of a storage, currently xpk supports `gcsfuse` only.
`--auto-mount` - if set to true means that all workloads should have a given storage mounted by default.
`--mount-point` - defines the path on which a given storage should be mounted for a workload.
`--manifest` --
* Create simple Storage
```shell
python3 xpk.py storage create test-storage --project=$PROJECT
--cluster=xpk-test --type=test-type --auto-mount=false \
--mount-point='/test-mount-point' --readonly=false \
--manifest='pv-pvc-auto-mount.yaml'
```
## Workload Create
* Workload Create (submit training job):
Expand Down
44 changes: 25 additions & 19 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@
get_user_workload_for_pathways,
)
from ..core.storage import (
GCS_FUSE_TYPE,
XPK_SA,
Storage,
add_bucket_iam_members,
get_storage_volume_mounts_yaml,
get_storage_volume_mounts_yaml_for_gpu,
get_storage_volumes_yaml,
get_storage_volumes_yaml_for_gpu,
get_storages,
get_storages_to_mount,
)
from ..core.system_characteristics import (
AcceleratorType,
Expand Down Expand Up @@ -104,7 +105,7 @@
labels:
xpk.google.com/workload: {args.workload}
annotations:
{gcs_fuse_annotation}
{storage_annotations}
spec:
schedulerName: {args.scheduler}
restartPolicy: Never
Expand Down Expand Up @@ -141,7 +142,7 @@
template:
metadata:
annotations:
{gcs_fuse_annotation}
{storage_annotations}
spec:
parallelism: {args.num_nodes}
completions: {args.num_nodes}
Expand Down Expand Up @@ -218,7 +219,7 @@
template:
metadata:
annotations:
{gcs_fuse_annotation}
{storage_annotations}
spec:
terminationGracePeriodSeconds: {args.termination_grace_period_seconds}
serviceAccountName: {service_account}
Expand Down Expand Up @@ -434,15 +435,18 @@ def workload_create(args) -> None:
if return_code != 0:
xpk_exit(return_code)

storages: list[Storage] = get_storages(k8s_api_client, args.storage)
gcs_fuse_annotation = ''
storages: list[Storage] = get_storages_to_mount(k8s_api_client, args.storage)
gcs_fuse_storages = list(
filter(lambda storage: storage.type == GCS_FUSE_TYPE, storages)
)
storage_annotations = ''
service_account = ''
if len(storages) > 0:
gcs_fuse_annotation = GCS_FUSE_ANNOTATION
if len(gcs_fuse_storages) > 0:
storage_annotations = GCS_FUSE_ANNOTATION
service_account = XPK_SA
xpk_print(f'Detected Storages to add: {storages}')
xpk_print(f'Detected gcsfuse Storages to add: {gcs_fuse_storages}')
else:
xpk_print('No Storages to add detected')
xpk_print('No gcsfuse Storages to add detected')

# Create the workload file based on accelerator type or workload type.
if system.accelerator_type == AcceleratorType['GPU']:
Expand All @@ -465,10 +469,12 @@ def workload_create(args) -> None:
gpu_rxdm_image=get_gpu_rxdm_image(system),
gpu_rxdm_cmd=get_gpu_rxdm_cmd(system),
gpu_tcp_volume=get_gpu_tcp_volume(system),
storage_volumes=get_storage_volumes_yaml_for_gpu(storages),
storage_volume_mounts=get_storage_volume_mounts_yaml_for_gpu(storages),
gcs_fuse_annotation=gcs_fuse_annotation,
service_account=XPK_SA,
storage_volumes=get_storage_volumes_yaml_for_gpu(gcs_fuse_storages),
storage_volume_mounts=get_storage_volume_mounts_yaml_for_gpu(
gcs_fuse_storages
),
storage_annotations=storage_annotations,
service_account=service_account,
)
elif args.use_pathways and ensure_pathways_workload_prerequisites(
args, system
Expand All @@ -490,10 +496,10 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
backoff_limit=system.vms_per_slice * 4,
gcs_fuse_annotation=gcs_fuse_annotation,
storage_volumes=get_storage_volumes_yaml(storages),
storage_volume_mounts=get_storage_volume_mounts_yaml(storages),
service_account=XPK_SA,
storage_annotations=storage_annotations,
storage_volumes=get_storage_volumes_yaml(gcs_fuse_storages),
storage_volume_mounts=get_storage_volume_mounts_yaml(gcs_fuse_storages),
service_account=service_account,
)
else:
container, debugging_dashboard_id = get_user_workload_container(
Expand All @@ -511,7 +517,7 @@ def workload_create(args) -> None:
local_queue_name=LOCAL_QUEUE_NAME,
autoprovisioning_args=autoprovisioning_args,
volumes=get_volumes(args, system),
gcs_fuse_annotation=gcs_fuse_annotation,
storage_annotations=storage_annotations,
service_account=service_account,
)
tmp = write_tmp_file(yml_string)
Expand Down
16 changes: 11 additions & 5 deletions src/xpk/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
run_command_with_updates_retry,
run_commands,
)
from .storage import Storage, get_storages
from .storage import Storage, get_storages_to_mount, GCS_FUSE_TYPE
from .system_characteristics import (
AcceleratorType,
AcceleratorTypeToAcceleratorCharacteristics,
Expand Down Expand Up @@ -2589,9 +2589,12 @@ def get_volumes(args, system: SystemCharacteristics) -> str:
- name: shared-data
"""

storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage)
storages: list[Storage] = get_storages_to_mount(
setup_k8s_env(args), args.storage
)
for storage in storages:
volumes += f"""- name: {storage.pv}
if storage.type == GCS_FUSE_TYPE:
volumes += f"""- name: {storage.pv}
persistentVolumeClaim:
claimName: {storage.pvc}
readOnly: {storage.readonly}
Expand Down Expand Up @@ -2647,9 +2650,12 @@ def get_volume_mounts(args, system: SystemCharacteristics) -> str:
mountPath: /usr/share/workload
"""

storages: list[Storage] = get_storages(setup_k8s_env(args), args.storage)
storages: list[Storage] = get_storages_to_mount(
setup_k8s_env(args), args.storage
)
for storage in storages:
volume_mount_yaml += f"""- name: {storage.pv}
if storage.type == GCS_FUSE_TYPE:
volume_mount_yaml += f"""- name: {storage.pv}
mountPath: {storage.mount_point}
readOnly: {storage.readonly}
"""
Expand Down
4 changes: 2 additions & 2 deletions src/xpk/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,10 @@ def add_bucket_iam_members(args: Namespace, storages: list[Storage]) -> None:

policy.bindings.append({"role": role, "members": {member}})
bucket.set_iam_policy(policy)
print(f"Added {member} with role {role} to {storage.bucket}.")
xpk_print(f"Added {member} with role {role} to {storage.bucket}.")


def print_storages_for_cluster(storages: list[Storage], cluster: str):
def print_storages_for_cluster(storages: list[Storage]) -> None:
"""
Prints in human readable manner a table of Storage resources that belong to the specified cluster.
Expand Down
4 changes: 2 additions & 2 deletions src/xpk/templates/storage.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
apiVersion: xpk.x-k8s.io/v1
kind: Storage
metadata:
name:
name: $NAME
spec:
auto_mount:
cluster:
manifest:
mount_point:
readonly:
type:
type: $NAME
pvc:
pv:

0 comments on commit cbaf3c7

Please sign in to comment.