DEV Community

Cover image for Pipeline Parallelism in PyTorch
Matías Battocchia
Matías Battocchia

Posted on

Pipeline Parallelism in PyTorch

As LLMs keep growing in size I started wondering about the possibility of deploying a model across many machines. While training is done in a distributed way, inference rarely goes beyond one machine as it has much less requirements to serve a model.

When the model is big, many of us would request a single-GPU machine with enough resources to serve the model. As a rule of thumb, you need at least twice GPU space for inference than the number of the model parameters. For example, LLaMA 13B has 13 giga-parameters (a giga is a billion), hence it requires 26 GB of GPU. Using float precision, it is 2 bytes per parameter, that is why. Choose a NVIDIA A100 40GB and we are good to go.

If the model is too big, then a tool like Hugging Face Accelerate can help us to run the model on a single machine with multiple GPUs. You are able to attach up to 8 devices to one machine in most cloud providers.

Then there are projects like Alpa that enable the deployment of really big models like OPT 175B and BLOOM 176B on multi-machine multi-device clusters. One drawback though, the model needs to be supported by the project.

Doing further research I came across PyTorch's PiPPy project. It features
automatic splitting of model code. It means that you do not have to make modifications to the model code to make parallelism work.

While consulting PiPPy docs and source code, I did the following exercise in order to grasp elemental insights from this tool and pipeline parallelism in general: to run a model across two Docker containers.

The idea is to deploy heavyweight models but for the sake of simplicity, let's use this model.

example.py

import os
import pippy
from torch.distributed import rpc
from torch import nn

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(128, 128)
        self.fc2 = nn.Linear(128,   8)

    def forward(self, x):
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.fc2(x)

        return x

net = Net()
net.eval()
Enter fullscreen mode Exit fullscreen mode

Let's assume that we have a bunch of machines, all on the same network. The same script runs on all the nodes, there are a head node and worker nodes. Worker nodes should be able to reach the head node at a host:port. Each node has a rank number and the total count of nodes is the world.

example.py (continuation)

RANK  = int(os.environ["RANK"])
WORLD = int(os.environ["WORLD"])
HOST  = os.environ["HOST"]
PORT  = os.environ["PORT"]
print(f"My rank is {RANK}")


# first thing to do is to init RCP
print("Waiting for all the nodes...")
rpc.init_rpc(
    f"worker{RANK}", # just an identifier
    rank=RANK,
    world_size=WORLD,
    rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=8,
        rpc_timeout=10, # seconds
        init_method=f"tcp://{HOST}:{PORT}", # head node's address and port
    )
)

# split the model, each process materializes its pipeline stage
driver, stage = pippy.all_compile(
    net,
    num_ranks=WORLD,
    num_chunks=WORLD, # microbatching
    schedule="FillDrain", # feed chunks through the pipeline sequentially
    split_policy=pippy.split_into_equal_size(WORLD), # split the model into specified number of equal-size stages
)
print(stage)

if rank == 0:
    x = torch.randn(4, 128)
    y = driver(x) # only rank 0 is able the call the pipeline's driver
    print(y)

rpc.shutdown()
print("Bye!")
Enter fullscreen mode Exit fullscreen mode

In a terminal:

$ docker build -t example .
$ docker network create rpc
$ docker run -e RANK=0 -e WORLD=2 -e HOST=head -e PORT=3000 \
  --net rpc --name head --rm -it example
Enter fullscreen mode Exit fullscreen mode

Notice how the HOST address equals the container's name.

In another terminal:

$ docker run -e RANK=1 -e WORLD=2 -e HOST=head -e PORT=3000 \
  --net rpc --name worker --rm -it example
Enter fullscreen mode Exit fullscreen mode

This container is named worker, HOST points to the rank 0 container head.

Head output, after all processes join.

My rank is 0
Waiting for all nodes...

PipeStageModule(
  (fc1): Linear(in_features=128, out_features=128, bias=True)
)

def forward(self, x):
    fc1 = self.fc1(x);  x = None
    return fc1

tensor([[-0.1043,  ..., -0.0093],
        ...
        [-0.1566,  ..., -0.0765]])
Bye!
Enter fullscreen mode Exit fullscreen mode

Worker output.

My rank is 1
Waiting for all nodes...

PipeStageModule(
  (fc2): Linear(in_features=128, out_features=8, bias=True)
)

def forward(self, fc1):
    relu = nn.functional.relu(fc1, inplace = False); fc1 = None
    fc2 = self.fc2(relu);  relu = None
    return fc2

Bye!
Enter fullscreen mode Exit fullscreen mode

Next steps in this topic are

  • use GPU,
  • more than one GPU device per machine,
  • deploy on local Kubernetes,
  • deploy on Google Cloud GKE,
  • use some really big models.

requirements.txt — CPU-only PyTorch

packaging # missing dependency from PiPPy
numpy
--index-url https://download.pytorch.org/whl/cpu
torch
Enter fullscreen mode Exit fullscreen mode

Dockerfile

FROM python:3

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

RUN git clone https://github.com/pytorch/PiPPy.git \
    && cd PiPPy \
    && python setup.py install \
    && cd ..

COPY . .

ENTRYPOINT ["python", "example.py"]
Enter fullscreen mode Exit fullscreen mode

Top comments (0)