DEV Community

Tomoya Oda
Tomoya Oda

Posted on

PyTorch Distributed Data Parallel (DDP) using Hugging Face Accelerate

This article is translated from my Japanese tech blog.
https://tmyoda.hatenablog.com/entry/20210314/1615712115

Introduction

Recently, PyTorch has been recommending DDP (Distributed Data Parallel). However, the multiple changes need to the source code.

So, I'll introduce the implementation of DDP using Hugging Face Accelerate.
It's very handy and I think it's best DDP library so far.

Preparation

I used this image dataset.
https://www.kaggle.com/c/dogs-vs-cats-redux-kernels-edition/overview

Data -> Download All

Training Code

I used the VGG16 model from torchvision.
The train_model function here is modified for DDP (Distributed Data Parallel).

https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html

I referred to this kernel for the Dataloader.
https://www.kaggle.com/alpaca0984/dog-vs-cat-with-pytorch#Generate-submittion.csv

nn.DataParallel

import time
import copy
from tqdm import tqdm
import multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torchvision.models as models
from torchvision import datasets
import matplotlib.pyplot as plt

from src.datasets import DogCatDataset

# Config
IMAGE_SIZE = 224
NUM_CLASSES = 2
BATCH_SIZE = 50
NUM_EPOCH = 1

# Seed
torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_dir = './data/train'
test_dir = './data/test'

# Data preprocessing
transform = transforms.Compose(
    [
        # Resize
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # To tensor
        transforms.ToTensor(),
        # Std
        transforms.Normalize(mean=[0.4883, 0.4551, 0.4170],
                             std=[0.2257, 0.2211, 0.2214])
    ]
)


# Get training data
train_dataset = DogCatDataset(
    csv_file="./data/train.csv",
    root_dir=train_dir,
    transform=transform
)

# Split train and val
n_samples = len(train_dataset)  # n_samples is 60000
train_size = int(len(train_dataset) * 0.8)  # train_size is 48000
val_size = n_samples - train_size  # val_size is 48000

train_dataset, val_dataset = torch.utils.data.random_split(
    train_dataset, [train_size, val_size])
datasets = {'train': train_dataset, 'val': val_dataset}

# Create training and validation dataloaders
dataloaders = {
    x: torch.utils.data.DataLoader(
        datasets[x],
        batch_size=BATCH_SIZE,
        shuffle=True,
        pin_memory=True,
        num_workers=mp.cpu_count()) for x in ['train', 'val']
}


# Fine-tuning
model = models.vgg16(pretrained=True, progress=True)
model.classifier[6] = nn.Linear(4096, NUM_CLASSES)
# Transfer to GPU
model = model.to(device)
# Use multi-GPU
# model = nn.DataParallel(model)

# optimizer SGD
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# Loss function
criterion = nn.CrossEntropyLoss()

# Start training 
# https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
since = time.time()
history = {'accuracy': [],
            'val_accuracy': [],
            'loss': [],
            'val_loss': []}


best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

for epoch in range(NUM_EPOCH):
    print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1))
    print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()  # Set model to training mode
        else:
            model.eval()   # Set model to evaluate mode

        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for inputs, labels in tqdm(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(phase == 'train'):
                # Get model outputs and calculate loss
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

            # statistics
            running_loss += loss * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss.item() / len(dataloaders[phase].dataset)
        epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

        print(
            '{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase,
                epoch_loss,
                epoch_acc))

        # deep copy the model
        if phase == 'val' and epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        if phase == 'train':
            history['accuracy'].append(epoch_acc.item())
            history['loss'].append(epoch_loss)
        else:
            history['val_accuracy'].append(epoch_acc.item())
            history['val_loss'].append(epoch_loss) 

    print()

time_elapsed = time.time() - since
print(
    'Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed //
        60,
        time_elapsed %
        60))
print('Best val Acc: {:4f}'.format(best_acc))

# load best model weights
model.load_state_dict(best_model_wts)

model = model.to('cpu')
torch.save(model.state_dict(), './model/best.pth')


# plot
acc = history['accuracy']
val_acc = history['val_accuracy']
loss = history['loss']
val_loss = history['val_loss']
epochs_range = range(NUM_EPOCH)

plt.figure(figsize=(24, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.savefig("training_results.png")
Enter fullscreen mode Exit fullscreen mode

Hugging Face Accelerate

https://huggingface.co/docs/accelerate/

pip install accelerate
Enter fullscreen mode Exit fullscreen mode

Source Code

import time
import copy
from tqdm import tqdm
import multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torchvision.models as models
from torchvision import datasets
import matplotlib.pyplot as plt
from accelerate import Accelerator

from src.datasets import DogCatDataset

# Config
IMAGE_SIZE = 224
NUM_CLASSES = 2
BATCH_SIZE = 50
NUM_EPOCH = 2

# Seed
torch.manual_seed(42)
np.random.seed(42)

# Device
accelerator = Accelerator()
device = accelerator.device

train_dir = './data/train'
test_dir = './data/test'

# Data preprocessing
transform = transforms.Compose(
    [
        # Resize
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        # To tensor
        transforms.ToTensor(),
        # Std
        transforms.Normalize(mean=[0.4883, 0.4551, 0.4170],
                             std=[0.2257, 0.2211, 0.2214])
    ]
)


# Get training data
train_dataset = DogCatDataset(
    csv_file="./data/train.csv",
    root_dir=train_dir,
    transform=transform
)

# Split train and val
n_samples = len(train_dataset)  # n_samples is 25000
train_size = int(len(train_dataset) * 0.8)  # train_size is 20000
val_size = n_samples - train_size  # val_size is 5000

train_dataset, val_dataset = torch.utils.data.random_split(
    train_dataset, [train_size, val_size])
datasets = {'train': train_dataset, 'val': val_dataset}

# Create training and validation dataloaders
dataloaders = {
    x: torch.utils.data.DataLoader(
        datasets[x],
        batch_size=BATCH_SIZE,
        shuffle=True,
        pin_memory=True,
        drop_last=False if x == 'val' else True,
        num_workers=mp.cpu_count()) for x in ['train', 'val']
}


# Fine-tuning
model = models.vgg16(pretrained=True, progress=True)
model.classifier[6] = nn.Linear(4096, NUM_CLASSES)

# optimizer SGD
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# Loss function
criterion = nn.CrossEntropyLoss()

# Prepare everything
# There is no specific order to remember, we just need to unpack the objects in the same order we gave them to the
# prepare method.
model, optimizer, dataloaders['train'], dataloaders['val'] = accelerator.prepare(
    model, optimizer, dataloaders['train'], dataloaders['val'])

# Start training
# https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
since = time.time()
history = {'accuracy': [],
           'val_accuracy': [],
           'loss': [],
           'val_loss': []}


best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0

for epoch in range(NUM_EPOCH):
    # Use accelerator.print to print only on the main process.
    accelerator.print('Epoch {}/{}'.format(epoch, NUM_EPOCH - 1))
    accelerator.print('-' * 10)

    # Each epoch has a training and validation phase
    for phase in ['train', 'val']:
        if phase == 'train':
            model.train()  # Set model to training mode
        else:
            model.eval()   # Set model to evaluate mode

        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for inputs, labels in tqdm(dataloaders[phase]):
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(phase == 'train'):
                # Get model outputs and calculate loss
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                # backward + optimize only if in training phase
                if phase == 'train':
                    accelerator.backward(loss)
                    optimizer.step()

            # statistics
            running_loss += loss * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        all_running_loss = accelerator.gather(running_loss)
        all_running_corrects = accelerator.gather(running_corrects)

        if accelerator.is_local_main_process:
            epoch_loss = all_running_loss.sum().item() / len(dataloaders[phase].dataset)
            epoch_acc = all_running_corrects.sum().double() / len(dataloaders[phase].dataset)

            print(
                '{} Loss: {:.4f} Acc: {:.4f}'.format(
                    phase,
                    epoch_loss,
                    epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                unwrapped_model = accelerator.unwrap_model(model)
                best_model_wts = copy.deepcopy(unwrapped_model.state_dict())

            if phase == 'train':
                history['accuracy'].append(epoch_acc.item())
                history['loss'].append(epoch_loss)
            else:
                history['val_accuracy'].append(epoch_acc.item())
                history['val_loss'].append(epoch_loss)

    print()

if accelerator.is_local_main_process:
    time_elapsed = time.time() - since
    print(
        'Training complete in {:.0f}m {:.0f}s'.format(
            time_elapsed //
            60,
            time_elapsed %
            60))
    print('Best val Acc: {:4f}'.format(best_acc))

    torch.save(best_model_wts, './model/best.pth')

# plot
    acc = history['accuracy']
    val_acc = history['val_accuracy']
    loss = history['loss']
    val_loss = history['val_loss']
    epochs_range = range(NUM_EPOCH)

    plt.figure(figsize=(24, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')
    plt.savefig("training_results.png")
Enter fullscreen mode Exit fullscreen mode

Command for training

Example for single node, 4 GPU

$ accelerate config
In which compute environment are you running? ([0] This machine, [1] AWS (Amazon SageMaker)): 0
Which type of machine are you using? ([0] No distributed training, [1] multi-CPU, [2] multi-GPU, [3] TPU): 2
How many different machines will you use (use more than 1 for multi-node training)? [1]:1
Do you want to use DeepSpeed? [yes/NO]: NO
How many processes in total will you use? [1]:4
Do you wish to use FP16 (mixed precision)? [yes/NO]: NO
Enter fullscreen mode Exit fullscreen mode
accelerate launch train.py
Enter fullscreen mode Exit fullscreen mode

DDP (accelerate) Explanation

Specify the device as follows.

accelerator = Accelerator()
device = accelerator.device
Enter fullscreen mode Exit fullscreen mode

Pass the model, optimizer, and dataloader to the prepare function.

model, optimizer, dataloaders['train'], dataloaders['val'] = accelerator.prepare(
    model, optimizer, dataloaders['train'], dataloaders['val'])
Enter fullscreen mode Exit fullscreen mode

When aggregating information from other nodes (during evaluation, etc.), use the gather function as follows.

        all_running_loss = accelerator.gather(running_loss)
        all_running_corrects = accelerator.gather(running_corrects)

Enter fullscreen mode Exit fullscreen mode

The model is saved as follows.

accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model.state_dict(), filename)
Enter fullscreen mode Exit fullscreen mode

The following process is only performed in the main process.

if accelerator.is_local_main_process:
Enter fullscreen mode Exit fullscreen mode

Time comparison

This is not a rigorous comparison, so take it as a reference.

  • GPU: 4
  • Batch size per 1 GPU: 50
  • Epoch: 1
  • Fine-tuning the pretrained VGG16 model from torchvision

cuda: 0

  • 4m 35s
train Loss: 0.0136 Acc: 0.9951
val Loss: 0.0240 Acc: 0.9912

Training complete in 4m 35s
Enter fullscreen mode Exit fullscreen mode

nn.DataParallel

  • 1m 39s
train Loss: 0.0459 Acc: 0.9817
val Loss: 0.0235 Acc: 0.9908

Training complete in 1m 39s
Enter fullscreen mode Exit fullscreen mode

DDP (accelerate)

  • 0m 42s
train Loss: 0.0750 Acc: 0.9682
val Loss: 0.0265 Acc: 0.9908

Training complete in 0m 42s
Enter fullscreen mode Exit fullscreen mode

DDP is so faster!

Implementing DDP was quite difficult, but using accelerate allows for an easy implementation of DDP. I highly recommend giving it a try.

Top comments (0)