-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
212 lines (158 loc) · 5.93 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import torch
import os
import tempfile
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
# 分布式模型,把模型计算放在不同GPU上
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
def setup(rank, world_size):
"""
分布式训练初始化
Parameters
----------
rank: 当前进程中GPU的编号
world_size: 总共有多少个GPU
"""
# 确定可用的GPU,注意这句话一定要放在任何对CUDA的操作之前(和别人公用服务器时使用)
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1'
# 设置两个环境变量,localhost是本地的ip地址,12355是用来通信的端口
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# 初始化进程组
dist.init_process_group(backend='nccl', rank=rank, world_size=world_size)
# 实现GPU的负载均衡
torch.cuda.set_device(rank)
def cleanup():
"""
在所有任务行完以后消灭进程用的。
"""
dist.destroy_process_group()
def run_parallel(parallel_fn, world_size):
"""
多进程产生函数。不用这个的话需要在运行训练代码的时候,用
'python-m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE train.py'才能启动。
Parameters
----------
parallel_fn: 分布式的函数
world_size: GPU数量
Returns
-------
"""
mp.spawn(parallel_fn, args=(world_size,), nprocs=world_size, join=True)
def main_train_basic(rank, world_size):
"""
基本的训练模板,需要指定device_ids让模型运行在不同的GPU上
Parameters
----------
rank: 当前进程中GPU的编号
world_size: 总共有多少个GPU
"""
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
def main_train_basic_plus(rank, world_size):
"""
带Checkpoint保存和读取的训练过程,需要指定device_ids让模型运行在不同的GPU上
Parameters
----------
rank: 当前进程中GPU的编号
world_size: 总共有多少个GPU
"""
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
# 创建模型,并把它移动到对应的GPU上
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
# 保存Checkpoint
if rank == 0:
# 因为每个GPU上的参数都是相同的,因此只需要保存一个GPU上的参数就行
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# 等待GPU:0保存参数完毕
dist.barrier()
# GPU的位置映射,torch.load一定要加
# 如果少了这句话,就会主GPU干活,剩下GPU在看戏的问题
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
checkpoint = torch.load(CHECKPOINT_PATH, map_location=map_location)
ddp_model.load_state_dict(checkpoint)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# 再删除文件时不需要使用dist.barrier()来保护同步,因为DDP反向传播时所有的AllReduce ops都已经同步
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
def main_train_basic_parallel(rank, world_size):
"""
带分布式模型的的训练过程,不需要指定device_ids,因为在哪个设备上运行已经在模型中声明
Parameters
----------
rank
world_size
"""
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# 设置模型和设备
dev0 = (rank * 2) % world_size
dev1 = (rank * 2 + 1) % world_size
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# 从dev1返回数据
outputs = ddp_mp_model(torch.randn(20, 10))
# 创建dev1的Label数据
labels = torch.randn(20, 5).to(dev1)
# 在dev1上计算损失
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
if __name__ == '__main__':
# # 告诉每个GPU各自使用哪些数据
# trainsampler = torch.utils.data.distributed.DistributedSampler(trainset)
# trainloader = DataLoader(一堆你自己的的参数, sampler=trainsampler)
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"至少需要2个GPUs,现在有{n_gpus}个GPUs"
world_size = n_gpus
# run_parallel(main_train_basic, world_size)
# run_parallel(main_train_basic_plus, world_size)
run_parallel(main_train_basic_parallel, world_size)