Implementing multiple nodes pytorch training

Training a model through multiple nodes and mulitple processes

0. Introduction

This blog passage focuses on implementing distributed data parallelism training among multiple nodes and multiple processes. Three methods, torch.distributed.launch, torchrun, and mpirun are covered. Using WandB to monitor the training process is also included.

1. Resources

The model and the dataset are from this blog:
https://leimao.github.io/blog/PyTorch-Distributed-Training
The training process refers to this passage:
https://lambdalabs.com/blog/multi-node-pytorch-distributed-training-guide
https://github.com/LambdaLabsML/examples/tree/main/pytorch/distributed/resnet

2. Training environment

The chart for the training equipments

GPU driver: cuda-keyring_1.0-1
Virtual environment: conda 23.5.2
Requirements: python==3.10.2
wandb==0.15.8
tensorboard==2.12.3
tensorboard-data-server==0.7.1
datasets==1.16.1

3. Preparing the model and dataset

We will deploy the model on two machines as illustrated above, each node is equipped with 8 GPUs. First of all, we have to make sure that the two nodes are connected through ssh. We can set up ip address using the following code on one machine:
~$ sudo ip address add 10.10.10.11/24 dev ens11f1
and on the other machine:
~$ sudo ip address add 10.10.10.11/24 dev ens11f1
You can use
~$ ip a
to check the ip address settings. Then we can use the ssh key to enable ssh connection without a password between the two nodes.
Download the Python script from this repo and the dataset as shown in Mao Lei’s article on each machine.
To view the information during the training process on WandB, we need to add some pieces of code that upload the parameters we want. First, we have to initiate WandB in the code, but only on the main process on the main node:

1
2
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.init(project=project_name, config=argv)

Make sure that WandB is initiated only on the main process in the main node and before enumerating the training epochs. LOCAL_RANK is the rank of the local GPUs, from 0 to 7 on each node. You can specify a random one as the main process. WORLD_RANK is the priority of the machines. The main node is ranked 0. In the code, project=project_name sets the project title displayed on WandB. config argument is optional. We can pass some hyperparameters about the model to it. For more information, check out this document about the .init() function.

setting WandB terminal by codes

Furthermore, we want to periodically upload parameters like loss rate and accuracy during training, so in the training loop, we can add

1
2
3
4
5
if LOCAL_RANK == 0:
accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
torch.save(ddp_model.state_dict(), model_filepath)
if WORLD_RANK == 0:
wandb.log({'Epoch': epoch,'accuracy': accuracy})

after Line 171 and

1
2
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.log({'lr': learning_rate, 'samples': count*batch_size, 'loss/train': loss.item()})

after Line 201.

The complete code looks like:

main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import torch
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import wandb
import argparse
import os
import random
import numpy as np
import time
import importlib

if 'LOCAL_RANK' in os.environ:
# Environment variables set by torch.distributed.launch or torchrun
LOCAL_RANK = int(os.environ['LOCAL_RANK'])
WORLD_SIZE = int(os.environ['WORLD_SIZE'])
WORLD_RANK = int(os.environ['RANK'])
elif 'OMPI_COMM_WORLD_LOCAL_RANK' in os.environ:
# Environment variables set by mpirun
LOCAL_RANK = int(os.environ['OMPI_COMM_WORLD_LOCAL_RANK'])
WORLD_SIZE = int(os.environ['OMPI_COMM_WORLD_SIZE'])
WORLD_RANK = int(os.environ['OMPI_COMM_WORLD_RANK'])
else:
import sys
sys.exit("Can't find the evironment variables for local rank")

def set_random_seeds(random_seed=0):

torch.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)

def evaluate(model, device, test_loader):

model.eval()

correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

accuracy = correct / total

return accuracy


def main():
project_name = 'DDP_model'
num_epochs_default = 10000
batch_size_default = 256
image_size_default = 224
learning_rate_default = 0.1
random_seed_default = 0
model_dir_default = "saved_models"
model_filename_default = "resnet_distributed.pth"
steps_syn_default = 20

# Each process runs on 1 GPU device specified by the local_rank argument.
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--local-rank", type=int, help="Local rank. Necessary for using the torch.distributed.launch utility.")
parser.add_argument("--num_epochs", type=int, help="Number of training epochs.", default=num_epochs_default)
parser.add_argument("--batch_size", type=int, help="Training batch size for one process.", default=batch_size_default)
parser.add_argument("--image_size", type=int, help="Size of input image.", default=image_size_default)
parser.add_argument("--learning_rate", type=float, help="Learning rate.", default=learning_rate_default)
parser.add_argument("--random_seed", type=int, help="Random seed.", default=random_seed_default)
parser.add_argument("--model_dir", type=str, help="Directory for saving models.", default=model_dir_default)
parser.add_argument("--model_filename", type=str, help="Model filename.", default=model_filename_default)
parser.add_argument("--resume", action="store_true", help="Resume training from saved checkpoint.")
parser.add_argument("--backend", type=str, help="Backend for distribted training.", default='nccl', choices=['nccl', 'gloo', 'mpi'])
parser.add_argument("--arch", type=str, help="Model architecture.", default='resnet50', choices=['resnet50', 'resnet18', 'resnet101', 'resnet152'])
parser.add_argument("--use_syn", action="store_true", help="Use synthetic data")
parser.add_argument("--steps_syn", type=int, help="Step per epoch for training with synthetic data", default=steps_syn_default)
argv = parser.parse_args()

local_rank = argv.local_rank
num_epochs = argv.num_epochs
batch_size = argv.batch_size
learning_rate = argv.learning_rate
random_seed = argv.random_seed
model_dir = argv.model_dir
model_filename = argv.model_filename
resume = argv.resume
backend = argv.backend
use_syn = argv.use_syn
w = argv.image_size
h = argv.image_size
c = 3
steps_syn = argv.steps_syn

# Create directories outside the PyTorch program
# Do not create directory here because it is not multiprocess safe
'''
if not os.path.exists(model_dir):
os.makedirs(model_dir)
'''
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.init(project=project_name, config=argv)


model_filepath = os.path.join(model_dir, model_filename)

# We need to use seeds to make sure that the models initialized in different processes are the same
set_random_seeds(random_seed=random_seed)

# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
torch.distributed.init_process_group(backend=backend, rank=WORLD_RANK, world_size=WORLD_SIZE)

# Encapsulate the model on the GPU assigned to the current process
model = getattr(torchvision.models, argv.arch)(pretrained=False)
device = torch.device("cuda:{}".format(LOCAL_RANK))


model = model.to(device)
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)

# We only save the model who uses device "cuda:0"
# To resume, the device for the saved model would also be "cuda:0"
if resume == True:
map_location = {"cuda:0": "cuda:{}".format(LOCAL_RANK)}
ddp_model.load_state_dict(torch.load(model_filepath, map_location=map_location))

if use_syn:
# Synthetic data
inputs_syn = torch.rand((batch_size, c, w, h)).to(device)
labels_syn = torch.zeros(batch_size, dtype=torch.int64).to(device)
else:
# Prepare dataset and dataloader
transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

# Data should be prefetched
# Download should be set to be False, because it is not multiprocess safe
train_set = torchvision.datasets.CIFAR10(root="data", train=True, download=False, transform=transform)
test_set = torchvision.datasets.CIFAR10(root="data", train=False, download=False, transform=transform)

# Restricts data loading to a subset of the dataset exclusive to the current process
train_sampler = DistributedSampler(dataset=train_set)

train_loader = DataLoader(dataset=train_set, batch_size=batch_size, sampler=train_sampler, num_workers=8)
# Test loader does not have to follow distributed sampling strategy
test_loader = DataLoader(dataset=test_set, batch_size=128, shuffle=False, num_workers=8)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=1e-5)



# Loop over the dataset multiple times
times = []
for epoch in range(num_epochs):

print("Local Rank: {}, Epoch: {}, Training ...".format(LOCAL_RANK, epoch))

# Save and evaluate model routinely
if not use_syn:
if epoch % 10 == 0:
if LOCAL_RANK == 0:
accuracy = evaluate(model=ddp_model, device=device, test_loader=test_loader)
torch.save(ddp_model.state_dict(), model_filepath)
print("-" * 75)
print("Epoch: {}, Accuracy: {}".format(epoch, accuracy))
print("-" * 75)
if WORLD_RANK == 0:
wandb.log({'Epoch': epoch,'accuracy': accuracy})

ddp_model.train()

if use_syn:
start_epoch = time.time()
for count in range(steps_syn):
optimizer.zero_grad()
outputs = ddp_model(inputs_syn)
loss = criterion(outputs, labels_syn)

# print("***"*10)
# print(loss.item())
# print(type(loss.item()))
# print("***"*10)
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.log({'lr': learning_rate, 'samples': count*batch_size,
'epoch': epoch, 'loss/train': loss.item()})
loss.backward()
optimizer.step()
torch.cuda.synchronize()
end_epoch = time.time()
elapsed = end_epoch - start_epoch

if epoch > 0:
times.append(elapsed)
print('num_steps_per_gpu: {}, avg_step_time: {:.4f}'.format(count, elapsed / count))
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.log({'epoch': epoch, 'num_steps_per_gpu': count, 'avg_step_time': elapsed/count})
else:
train_loader.sampler.set_epoch(epoch)
start_epoch = time.time()
count = 0
for data in train_loader:
inputs, labels = data[0].to(device), data[1].to(device)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
count += 1
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.log({'lr': learning_rate, 'samples': count*batch_size,
'epoch': epoch, 'loss/train': loss.item()})
torch.cuda.synchronize()
end_epoch = time.time()
elapsed = end_epoch - start_epoch

if epoch > 0:
times.append(elapsed)
print('num_steps_per_gpu: {}, avg_step_time: {:.4f}'.format(count, elapsed / count))
if (LOCAL_RANK == 0 and WORLD_RANK==0):
wandb.log({'epoch': epoch, 'num_steps_per_gpu': count, 'avg_step_time': elapsed / count})

avg_time = sum(times) / (num_epochs - 1)

print("Average epoch time: {}".format(avg_time))

if __name__ == "__main__":

main()

4. Training the model using Distributed Data Parallelism

  • Using torch.distributed.launch

In order to use torch.distributed.launch, we need to run the following codes on two machines, respectively

On the main node:
python3 -m torch.distributed.launch --nproc_per_node=8 --nnodes=2 --node_rank=0 --master_addr=10.10.10.11 --master_port=1234 main.py --backend=nccl --use_syn --batch_size=256 --arch=resnet50
On the worker node:
python3 -m torch.distributed.launch --nproc_per_node=8 --nnodes=2 --node_rank=1 --master_addr=10.10.10.11 --master_port=1234 main.py --backend=nccl --use_syn --batch_size=256 --arch=resnet50

nproc_per_nod defines the number of workers on each node. It should equal to the number of GPUs on each node. nnodes defines the number of nodes.The only difference should be –node_rank. The argument –use_syn means to use synthesized data, we can delete it if we use the dataset.

Below are some problems you may encounter:

  1. “main.py: error: unrecognized arguments: –local-rank=7”
    The log reports that the arguments for ranking all the local processes are unrecognized. This may be caused by a mistake in the Python script. We can solve this problem by changing “–local_rank” to “–local-rank” in the code “parser.add_argument(“–local_rank”)”.__
  2. “torch.cuda.OutOfMemoryError: CUDA out of memory. Tried to allocate 6.12
    GiB (GPU 6; 44.35 GiB total capacity; 7.46 GiB already allocated; 5.80 GiB free; 7.48 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF” on each GPU.

    The error reports that the GPUs run out of memory. First, we may want to check if there are other running processes on the GPUs that occupies the memory.
    ~$ nvidia-smi
    occupied gpus
    If there are irrelevant processses running on the GPUs like above, we can kill the processes by
    ~$ sudo killall python
    Or terminate the processes using PID:
    ~$ sudo kill -9 [PID]
    Sometimes when the previous attempt to train the model didn’t terminate properly, the terminal would report that the port is already in use when we are trying to start another attempt. In this case we can use
    ~$ sudo netstat -lnput
    to check the process that occupies the port and terminate the process.
    If the the GPUs still run out of memory even after clearing the processes, we have to consider adjusting the parameters like lowering the batch size and using a smaller model. Adjusting the argument to –batch_size=256, –arch=resnet50 will be valid for the A40 GPUs in this case.
  • Using torchrun

Torchrun works similarly to torch.distributed.launch:
master:
torchrun --nproc_per_node=8 --nnodes=2 --node_rank=0 --master_addr=10.10.10.11 --master_port=1234 main.py --backend=nccl --use_syn --batch_size=256 --arch=resnet50
worker:
torchrun --nproc_per_node=8 --nnodes=2 --node_rank=1 --master_addr=10.10.10.11 --master_port=1234 main.py --backend=nccl --use_syn --batch_size=256 --arch=resnet50

  • Using mpirun

Although the above methods work well for DDP on two nodes, they require run command on each node, making it inconvenient when there are more nodes. By contrast, We can launch the DDP training by only typing the command on the master node using mpirun. You can refer to the mpirun section in the passage Multi Node Pytorch Distributed Training Guide for People In a Hurry for installation of OpenMPI and NCCL. Before training, we have to make sure that OpenMPI and NCCL are installed on all of the nodes, and they all use similar virtual environments. To start the DDP training using mpirun, we run the following command on the main node only:
mpirun -np 16 -H 10.10.10.11:8,10.10.10.12:8 -x MASTER_ADDR=10.10.10.11 -x MASTER_PORT=1234 -x PATH -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib python3 main.py --backend=nccl --use_syn --batch_size=256 --arch=resnet50
The PATH argument ensures that the script runs in the specified environment.

Author

Jiangshan Gong

Posted on

2023-08-19

Updated on

2024-01-05

Licensed under

Comments