From 138c8a564add9caf79f7dd2dc15c91b8b78d7d6f Mon Sep 17 00:00:00 2001 From: Chi Zhang Date: Thu, 9 Jan 2025 05:14:51 +0800 Subject: [PATCH] [misc] chore: remove useless files (#86) * remove useless files * modify APIs --- examples/ray/tutorial.ipynb | 6 +- tests/ray/detached_worker/server.py | 2 +- tests/ray/test_ray_local_envs.py | 2 +- tests/ray/test_worker_group_basics.py | 2 +- verl/single_controller/base/dp.py | 47 ---------- verl/single_controller/ray/decorator.py | 65 ------------- .../ray/dist_data_pass_protocol.py | 23 ----- verl/single_controller/ray/dp.py | 93 ------------------- 8 files changed, 6 insertions(+), 234 deletions(-) delete mode 100644 verl/single_controller/base/dp.py delete mode 100644 verl/single_controller/ray/decorator.py delete mode 100644 verl/single_controller/ray/dist_data_pass_protocol.py delete mode 100644 verl/single_controller/ray/dp.py diff --git a/examples/ray/tutorial.ipynb b/examples/ray/tutorial.ipynb index f270cd9..37784f6 100644 --- a/examples/ray/tutorial.ipynb +++ b/examples/ray/tutorial.ipynb @@ -437,7 +437,7 @@ }, "outputs": [], "source": [ - "from verl.single_controller.ray.decorator import register, Dispatch, Execute" + "from verl.single_controller.base.decorator import register, Dispatch, Execute" ] }, { @@ -518,7 +518,7 @@ }, "outputs": [], "source": [ - "from verl.single_controller.ray.decorator import register, Dispatch, collect_all_to_all, Execute" + "from verl.single_controller.base.decorator import register, Dispatch, collect_all_to_all, Execute" ] }, { @@ -723,7 +723,7 @@ }, "outputs": [], "source": [ - "from verl.single_controller.ray.decorator import register, Dispatch, Execute\n", + "from verl.single_controller.base.decorator import register, Dispatch, Execute\n", "from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup\n", "from verl.single_controller.base.megatron.worker import MegatronWorker\n", "from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup\n", diff --git a/tests/ray/detached_worker/server.py b/tests/ray/detached_worker/server.py index c8057e3..2108f1e 100644 --- a/tests/ray/detached_worker/server.py +++ b/tests/ray/detached_worker/server.py @@ -28,7 +28,7 @@ from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup from verl.single_controller.base.megatron.worker import MegatronWorker -from verl.single_controller.ray.decorator import register, Dispatch +from verl.single_controller.base.decorator import register, Dispatch from verl import DataProto from verl.models.llama.megatron import ParallelLlamaForCausalLMRmPadPP diff --git a/tests/ray/test_ray_local_envs.py b/tests/ray/test_ray_local_envs.py index 542d536..63102d0 100644 --- a/tests/ray/test_ray_local_envs.py +++ b/tests/ray/test_ray_local_envs.py @@ -19,7 +19,7 @@ from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup from verl.single_controller.base.worker import Worker -from verl.single_controller.ray.decorator import register, Dispatch, collect_all_to_all, Execute +from verl.single_controller.base.decorator import register, Dispatch, collect_all_to_all, Execute @ray.remote diff --git a/tests/ray/test_worker_group_basics.py b/tests/ray/test_worker_group_basics.py index fa18e9b..b4b6339 100644 --- a/tests/ray/test_worker_group_basics.py +++ b/tests/ray/test_worker_group_basics.py @@ -20,7 +20,7 @@ from verl.single_controller.ray.base import RayResourcePool, RayClassWithInitArgs, RayWorkerGroup from verl.single_controller.base.worker import Worker -from verl.single_controller.ray.decorator import register, Dispatch, collect_all_to_all, Execute +from verl.single_controller.base.decorator import register, Dispatch, collect_all_to_all, Execute def two_to_all_dispatch_fn(worker_group, *args, **kwargs): diff --git a/verl/single_controller/base/dp.py b/verl/single_controller/base/dp.py deleted file mode 100644 index 2d19188..0000000 --- a/verl/single_controller/base/dp.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from verl.single_controller.base.worker import Worker - - -class DPEngineWorker(Worker): - - def __init__(self, *args, **kwargs): - # todo: extract _world_size etc. from kwargs and inject in super().__init__() - Worker.__init__(self, *args, **kwargs) - - def init(self): - raise NotImplementedError - - def add_engine(self, model, dp_config): - raise NotImplementedError - - def execute_engine(self, method_name, *args, **kwargs): - print(f"execute_engine called with method={method_name}") - func = getattr(self._engine, method_name) - return func(*args, **kwargs) - - def execute_module(self, method_name, *args, **kwargs): - print(f"execute_module called with method={method_name}") - func = getattr(self._engine.module, method_name) - return func(*args, **kwargs) - - def get_model_size_on_rank_zero(self): - import torch - from verl.utils.model import get_model_size - if torch.distributed.get_rank() == 0: - # print("model print on rank 0: ", self._model) - module_size, module_size_scale = get_model_size(self._model) - return module_size, module_size_scale - return None diff --git a/verl/single_controller/ray/decorator.py b/verl/single_controller/ray/decorator.py deleted file mode 100644 index 1de452f..0000000 --- a/verl/single_controller/ray/decorator.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import json -import os - -import ray - -# compatiblity cern -from verl.single_controller.base.decorator import * - - -def maybe_remote(main): - """Schedule main function as ray remote task if VERL_DRIVER_NUM_GPUS or VERL_DRIVER_RESOURCES specified in config. - - VERL_DRIVER_NUM_GPUS: number of GPUs for driver task. - - VERL_DRIVER_RESOURCES: custom resources for driver task, e.g {"verl_driver": 1.0}. - - For job submission to ray cluster, you can specify these two envs in runtime.yaml. - ```yaml - working_dir: "." - env_vars: - VERL_DRIVER_NUM_GPUS: "1" - VERL_DRIVER_RESOURCES: '{"verl_driver": 1.0}' - ``` - - ray job submit --runtime-env=runtime.yaml -- python3 test.py - - Args: - main (Callable): main function to be schedule. - """ - - num_gpus = 0 - resources = {} - env_num_gpus = os.getenv("VERL_DRIVER_NUM_GPUS") - if env_num_gpus: - num_gpus = int(env_num_gpus) - env_resources = os.getenv("VERL_DRIVER_RESOURCES") - if env_resources: - resources = json.loads(env_resources) - print(f"verl driver num_gpus: {num_gpus}, resources={resources}") - assert isinstance(resources, dict), f"resources must be dict, got {type(resources)}" - - @functools.wraps(main) - def _main(*args, **kwargs): - # Run main function locally. - if num_gpus == 0 and len(resources) == 0: - return main(*args, **kwargs) - - # Run main function remotely as ray task. - f = ray.remote(num_gpus=num_gpus, resources=resources)(main) - return ray.get(f.remote(*args, **kwargs)) - - return _main diff --git a/verl/single_controller/ray/dist_data_pass_protocol.py b/verl/single_controller/ray/dist_data_pass_protocol.py deleted file mode 100644 index 59202b1..0000000 --- a/verl/single_controller/ray/dist_data_pass_protocol.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from tensordict import TensorDict -import ray - -from verl import DataProto - - -class DistDataProto(DataProto, ray.ObjectRef): - ... - # skip for prototype, assuming dp size kept among all Roles diff --git a/verl/single_controller/ray/dp.py b/verl/single_controller/ray/dp.py deleted file mode 100644 index b53d4b9..0000000 --- a/verl/single_controller/ray/dp.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2024 Bytedance Ltd. and/or its affiliates -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import ray - -from verl.single_controller.ray.base import RayWorkerGroup, RayResourcePool, RayClassWithInitArgs - - -@ray.remote -class RefBasicRayActor: - ... - - -class DPEngineRayWorkerGroup(RayWorkerGroup): - - class DummyModule: - - def __init__(self, core, methods_names) -> None: - self.core = core - - def func_generator(method_name): - - def func(*args, **kwargs): - return self.core.execute_all_async("execute_engine", method_name, *args, **kwargs) - - return func - - for method_name in methods_names: - setattr(self, method_name, func_generator(method_name)) - - def __init__(self, name_prefix, process_dispatch_scheme, use_gpu, engine_type, *args, **kwargs) -> None: - from torch import nn - # print(f"in DataParallelEngineWrapper, name_prefix = {name_prefix}") - if isinstance(process_dispatch_scheme, RayResourcePool): - rpdc = process_dispatch_scheme - else: - rpdc = RayResourcePool(process_on_nodes=process_dispatch_scheme, - use_gpu=use_gpu, - name_prefix=name_prefix, - max_colocate_count=1) - rcia = RayClassWithInitArgs(cls=engine_type, *args, **kwargs) - - self._engine_type = engine_type - - super().__init__(rpdc, rcia) - - nn_module_methods = [ - method_name for method_name in dir(nn.Module) - if callable(getattr(nn.Module, method_name)) and not method_name.startswith("__") - ] - nn_module_methods += ["__call__"] - - def func_generator(method_name): - - def func(*args, **kwargs): - return self.execute_all_async(method_name, *args, **kwargs) - - return func - - print(f"{engine_type} has methods: {dir(engine_type)}") - for method_name in dir(engine_type): - try: - is_callable = callable(getattr(engine_type, method_name)) - except Exception as _: - pass - else: - if is_callable and method_name not in dir(RefBasicRayActor): - print(f"register method: {method_name}") - setattr(self, method_name, func_generator(method_name)) - - self.module = DPEngineRayWorkerGroup.DummyModule(self, nn_module_methods) - - @property - def engine(self): - return self.module - - def get_model_size_on_rank_zero(self): - results = ray.get([worker.get_model_size_on_rank_zero.remote() for worker in self._workers]) - - for result in results: - if result is not None: - return result