Skip to content

Commit

Permalink
Move helper functions to resize A3 cluster as GPU util
Browse files Browse the repository at this point in the history
  • Loading branch information
michelle-yooh committed Feb 3, 2025
1 parent 27e775a commit a751d02
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 191 deletions.
96 changes: 3 additions & 93 deletions dags/multipod/maxtext_gpu_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@


import datetime
import tempfile

from airflow import models
from airflow.decorators import task
from airflow.hooks.subprocess import SubprocessHook
from airflow.utils.task_group import TaskGroup
from dags import composer_env
from dags.common import test_owner
from dags.common.vm_resource import XpkClusters, CpuVersion, DockerImage, GpuVersion, Project, TpuVersion, Zone
from dags.common.vm_resource import XpkClusters, DockerImage
from dags.multipod.configs import gke_config
from xlml.utils import gke
from xlml.utils.gpu import scale_up_a3_cluster, scale_down_a3_cluster

# Run once a day at 4 am UTC (8 pm PST)
SCHEDULED_TIME = "0 4 * * *" if composer_env.is_prod_env() else None
Expand All @@ -35,93 +32,6 @@
A3_NUM_NODES = 10


def configure_project_and_cluster(project: str, cluster_name: str, zone: str):
region = gke.zone_to_region(zone)

gcloud_command = (
f"gcloud config set project {project}",
"sudo chown -R airflow:airflow /home/airflow/composer_kube_config",
f"gcloud container clusters get-credentials {cluster_name}"
f" --region {region}",
)
return gcloud_command


def resize_a3_cluster(cluster_name: str, zone: str, num_nodes: int):
region = gke.zone_to_region(zone)
node_pool = f"{cluster_name}-np-0"

gcloud_command = (
f"gcloud container clusters resize {cluster_name}"
f" --quiet --region {region}"
f" --node-pool {node_pool}"
f" --num-nodes {num_nodes}",
)
return gcloud_command


def wait_for_cluster_ready():
kubectl_command = (
"kubectl wait --for=condition=Ready nodes --all --timeout=5m",
)
return kubectl_command


@task
def scale_up_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
A3_NUM_NODES,
)
+ wait_for_cluster_ready()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


@task
def scale_down_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
0,
)
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


def run_maxtext_tests(dag: models.DAG):
test_name_prefix = "maxtext"

Expand Down Expand Up @@ -247,7 +157,7 @@ def run_maxtext_tests(dag: models.DAG):
catchup=False,
) as dag:
with TaskGroup(group_id="scale_up", dag=dag) as scale_up:
scale_up_a3_cluster()
scale_up_a3_cluster(A3_NUM_NODES)

with TaskGroup(
group_id="run_tests", dag=dag, prefix_group_id=False
Expand Down
104 changes: 7 additions & 97 deletions dags/sparsity_diffusion_devx/maxtext_moe_gpu_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@


import datetime
import tempfile

from airflow import models
from airflow.decorators import task
from airflow.hooks.subprocess import SubprocessHook
from airflow.utils.task_group import TaskGroup
from dags import composer_env
from dags.common import test_owner
from dags.common.vm_resource import XpkClusters, DockerImage, Project
from dags.common.vm_resource import XpkClusters, DockerImage
from dags.multipod.configs import gke_config
from xlml.utils import gke
from xlml.utils.gpu import scale_up_a3_cluster, scale_down_a3_cluster


# Run once a day at 11 am UTC (3 am PST)
Expand All @@ -39,107 +36,20 @@
UNSCANNED_CKPT_PATH = "gs://ml-auto-solutions/output/sparsity_diffusion_devx/maxtext/chained_tests_mixtral-8x7b_nightly-2025-01-09-05-00-18//unscanned_ckpt/checkpoints/0/items"


def configure_project_and_cluster(project: str, cluster_name: str, zone: str):
region = gke.zone_to_region(zone)

gcloud_command = (
f"gcloud config set project {project}",
"sudo chown -R airflow:airflow /home/airflow/composer_kube_config",
f"gcloud container clusters get-credentials {cluster_name}"
f" --region {region}",
)
return gcloud_command


def resize_a3_cluster(cluster_name: str, zone: str, num_nodes: int):
region = gke.zone_to_region(zone)
node_pool = f"{cluster_name}-np-0"

gcloud_command = (
f"gcloud container clusters resize {cluster_name}"
f" --quiet --region {region}"
f" --node-pool {node_pool}"
f" --num-nodes {num_nodes}",
)
return gcloud_command


def wait_for_cluster_ready():
kubectl_command = (
"kubectl wait --for=condition=Ready nodes --all --timeout=5m",
)
return kubectl_command


@task
def scale_up_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
A3_NUM_NODES,
)
+ wait_for_cluster_ready()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


@task
def scale_down_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
0,
)
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


def run_maxtext_tests(dag: models.DAG):
def run_maxtext_tests():
test_name_prefix = "maxtext"

test_models_gpu = {
"mixtral-8x7b-1node": (
f"SCANNED_CHECKPOINT={SCANNED_CHECKPOINT} \
UNSCANNED_CKPT_PATH={UNSCANNED_CKPT_PATH} \
bash end_to_end/gpu/test_mixtral.sh",
bash end_to_end/gpu/mixtral/test_8x7b.sh",
1,
),
"mixtral-8x7b-2node": (
f"SCANNED_CHECKPOINT={SCANNED_CHECKPOINT} \
UNSCANNED_CKPT_PATH={UNSCANNED_CKPT_PATH} \
bash end_to_end/gpu/test_mixtral.sh",
bash end_to_end/gpu/mixtral/test_8x7b.sh",
2,
),
}
Expand Down Expand Up @@ -199,12 +109,12 @@ def run_maxtext_tests(dag: models.DAG):
catchup=False,
) as dag:
with TaskGroup(group_id="scale_up", dag=dag) as scale_up:
scale_up_a3_cluster()
scale_up_a3_cluster(A3_NUM_NODES)

with TaskGroup(
group_id="run_tests", dag=dag, prefix_group_id=False
) as run_tests:
run_maxtext_tests(dag)
run_maxtext_tests()

with TaskGroup(group_id="scale_down", dag=dag) as scale_down:
scale_down_a3_cluster()
Expand Down
79 changes: 78 additions & 1 deletion xlml/utils/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
from absl import logging
import airflow
from airflow.decorators import task, task_group
from airflow.hooks.subprocess import SubprocessHook
import datetime
import fabric
from google.cloud import compute_v1
import io
import paramiko
import re
import tempfile
import time
from typing import Dict, Iterable
import uuid
from xlml.apis import gcp_config, test_config
from xlml.utils import ssh
from xlml.utils import ssh, gke
from dags.common.vm_resource import XpkClusters
from dags.map_reproducibility.utils.common_utils import configure_project_and_cluster


def get_image_from_family(project: str, family: str) -> compute_v1.Image:
Expand Down Expand Up @@ -553,3 +557,76 @@ def wait_for_resource_deletion(operation_name: airflow.XComArg):

op = delete_resource_request(instance_name, project_id, zone)
wait_for_resource_deletion(op)


def resize_a3_cluster(cluster_name: str, zone: str, num_nodes: int):
region = gke.zone_to_region(zone)
node_pool = f"{cluster_name}-np-0"

gcloud_command = (
f"gcloud container clusters resize {cluster_name}"
f" --quiet --region {region}"
f" --node-pool {node_pool}"
f" --num-nodes {num_nodes}",
)
return gcloud_command


def wait_for_cluster_ready():
kubectl_command = (
"kubectl wait --for=condition=Ready nodes --all --timeout=5m",
)
return kubectl_command


@task
def scale_up_a3_cluster(num_nodes):
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
gke.zone_to_region(XpkClusters.GPU_A3_CLUSTER.zone),
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
num_nodes,
)
+ wait_for_cluster_ready()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


@task
def scale_down_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
gke.zone_to_region(XpkClusters.GPU_A3_CLUSTER.zone),
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
0,
)
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"

0 comments on commit a751d02

Please sign in to comment.