{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Parallelism\n", "\n", "이번 세션에는 데이터 병렬화 기법에 대해 알아보겠습니다.\n", "\n", "## 1. `torch.nn.DataParallel`\n", "가장 먼저 우리에게 친숙한 `torch.nn.DataParallel`의 동작 방식에 대해 알아봅시다. `torch.nn.DataParallel`은 single-node & multi-GPU에서 동작하는 multi-thread 모듈입니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1) Forward Pass\n", "\n", "1. 입력된 mini-batch를 **Scatter**하여 각 디바이스로 전송.\n", "2. GPU-1에 올라와 있는 모델의 파라미터를 GPU-2,3,4로 **Broadcast**.\n", "3. 각 디바이스로 복제된 모델로 **Forward**하여 Logits을 계산 함.\n", "4. 계산된 Logits을 **Gather**하여 GPU-1에 모음.\n", "5. Logits으로부터 **Loss**를 계산함. (with loss reduction)\n", "\n", "![](../images/dp_forward.png)\n", "\n", "
\n", "\n", "코드로 나타내면 아래와 같습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch.nn as nn\n", "\n", "\n", "def data_parallel(module, inputs, labels, device_ids, output_device):\n", " inputs = nn.parallel.scatter(inputs, device_ids)\n", " # 입력 데이터를 device_ids들에 Scatter함\n", "\n", " replicas = nn.parallel.replicate(module, device_ids)\n", " # 모델을 device_ids들에 복제함.\n", " \n", " logit = nn.parallel.parallel_apply(replicas, inputs)\n", " # 각 device에 복제된 모델이 각 device의 데이터를 Forward함.\n", "\n", " logits = nn.parallel.gather(outputs, output_device)\n", " # 모델의 logit을 output_device(하나의 device)로 모음\n", " \n", " return logits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2) Backward Pass\n", "\n", "1. 계산된 Loss를 각 디바이스에 **Scatter**함.\n", "2. 전달받은 Loss를 이용해서 각 디바이스에서 **Backward**를 수행하여 Gradients 계산.\n", "3. 계산된 모든 Gradient를 GPU-1로 **Reduce**하여 GPU-1에 전부 더함.\n", "4. 더해진 Gradients를 이용하여 GPU-1에 있는 모델을 업데이트.\n", "\n", "![](../images/dp_backward.png)\n", "\n", "\n", "#### 혹시나 모르시는 분들을 위해...\n", "- `loss.backward()`: 기울기를 미분해서 Gradient를 계산\n", "- `optimizer.step()`: 계산된 Gradient를 이용해서 파라미터를 업데이트\n", "- Computation cost는 `backward()` > `step()`.\n", "\n", "![](../images/backward_step.png)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "src/data_parallel.py\n", "\"\"\"\n", "\n", "from torch import nn\n", "from torch.optim import Adam\n", "from torch.utils.data import DataLoader\n", "from transformers import BertForSequenceClassification, BertTokenizer\n", "from datasets import load_dataset\n", "\n", "# 1. create dataset\n", "datasets = load_dataset(\"multi_nli\").data[\"train\"]\n", "datasets = [\n", " {\n", " \"premise\": str(p),\n", " \"hypothesis\": str(h),\n", " \"labels\": l.as_py(),\n", " }\n", " for p, h, l in zip(datasets[2], datasets[5], datasets[9])\n", "]\n", "data_loader = DataLoader(datasets, batch_size=128, num_workers=4)\n", "\n", "# 2. create model and tokenizer\n", "model_name = \"bert-base-cased\"\n", "tokenizer = BertTokenizer.from_pretrained(model_name)\n", "model = BertForSequenceClassification.from_pretrained(model_name, num_labels=3).cuda()\n", "\n", "# 3. make data parallel module\n", "# device_ids: 사용할 디바이스 리스트 / output_device: 출력값을 모을 디바이스\n", "model = nn.DataParallel(model, device_ids=[0, 1, 2, 3], output_device=0)\n", "\n", "# 4. create optimizer and loss fn\n", "optimizer = Adam(model.parameters(), lr=3e-5)\n", "loss_fn = nn.CrossEntropyLoss(reduction=\"mean\")\n", "\n", "# 5. start training\n", "for i, data in enumerate(data_loader):\n", " optimizer.zero_grad()\n", " tokens = tokenizer(\n", " data[\"premise\"],\n", " data[\"hypothesis\"],\n", " padding=True,\n", " truncation=True,\n", " max_length=512,\n", " return_tensors=\"pt\",\n", " )\n", "\n", " logits = model(\n", " input_ids=tokens.input_ids.cuda(),\n", " attention_mask=tokens.attention_mask.cuda(),\n", " return_dict=False,\n", " Logits에 비해 Loss는 Scalar이기 때문에 크기가 훨씬 작기 때문이죠. 이 작업은 [당근마켓 블로그](https://medium.com/daangn/pytorch-multi-gpu-%ED%95%99%EC%8A%B5-%EC%A0%9C%EB%8C%80%EB%A1%9C-%ED%95%98%EA%B8%B0-27270617936b)에 소개되었던 [PyTorch-Encoding](https://github.com/zhanghang1989/PyTorch-Encoding)의 `DataParallelCriterion`과 동일합니다. 블로그에 꽤나 복잡하게 설명되어 있는데, 복잡한 방법 대신 간단하게 **forward 함수를 오버라이드 하는 것** 만으로 동일 기능을 쉽게 구현 할 수 있습니다.\n", "\n", "![](../images/dp_forward_2.png)\n", "\n", "
\n", "\n", "## 2. `torch.nn.DataParallel`의 문제점\n", "\n", "\n", "### 1) 멀티쓰레드 모듈이기 때문에 Python에서 비효율적임.\n", "Python은 GIL (Global Interpreter Lock)에 의해 하나의 프로세스에서 동시에 여러개의 쓰레드가 작동 할 수 없습니다. 따라서 근본적으로 멀티 쓰레드가 아닌 **멀티 프로세스 프로그램**으로 만들어서 여러개의 프로세스를 동시에 실행하게 해야합니다.\n", "\n", "
\n", "\n", "### 2) 하나의 모델에서 업데이트 된 모델이 다른 device로 매 스텝마다 복제되어야 함.\n", "현재의 방식은 각 디바이스에서 계산된 Gradient를 하나의 디바이스로 모아서(Gather) 업데이트 하는 방식이기 때문에 업데이트된 모델을 매번 다른 디바이스들로 복제(Broadcast)해야 하는데, 이 과정이 꽤나 비쌉니다. 그러나 Gradient를 Gather하지 않고 각 디바이스에서 자체적으로 `step()`을 수행한다면 모델을 매번 복제하지 않아도 되겠죠. 어떻게 이 것을 구현 할 수 있을까요?\n", "\n", "
\n", "\n", "### Solution? ➝ All-reduce!! 👍\n", "![](../images/allreduce.png)\n", "\n", "정답은 앞서 배웠던 All-reduce 연산입니다. 각 디바이스에서 계산된 Gradients를 모두 더해서 모든 디바이스에 균일하게 뿌려준다면 각 디바이스에서 자체적으로 `step()`을 수행 할 수 있습니다. 그러면 매번 모델을 특정 디바이스로부터 복제해 올 필요가 없겠죠. 따라서 All-reduce를 활용하는 방식으로 기존 방식을 개선해야 합니다.\n", "\n", "
\n", "\n", "### 그러나... 🤔\n", "그러나 All-reduce는 매우 비용이 높은 연산에 속합니다. 왜 그럴까요? All-reduce의 세부 구현을 살펴봅시다.\n", "\n", "
\n", "\n", "### Reduce + Broadcast 구현 방식\n", "![](../images/allreduce_1.png)\n", "\n", "
\n", "\n", "### All to All 구현 방식\n", "![](../images/allreduce_2.png)\n", "\n", "

" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. `torch.nn.parallel.DistributedDataParallel` (이하 DDP)\n", "\n", "### Ring All-reduce 💍\n", "Ring All-reduce는 2017년에 바이두의 연구진이 개발한 새로운 연산입니다. 기존의 방식들에 비해 월등히 효율적인 성능을 보여줬기 때문에 DDP 개발의 핵심이 되었죠.\n", "\n", "- https://github.com/baidu-research/baidu-allreduce\n", "\n", "![](../images/ring_allreduce.gif)\n", "\n", "
\n", "\n", "![](../images/ring_allreduce.png)\n", "\n", "
\n", "\n", "### DDP란?\n", "DDP는 기존 DataParallel의 문제를 개선하기 위해 등장한 데이터 병렬처리 모듈이며 single/multi-node & multi-GPU에서 동작하는 multi-process 모듈입니다. All-reduce를 활용하게 되면서 마스터 프로세스의 개념이 없어졌기 때문에 학습 과정이 매우 심플하게 변합니다.\n", "\n", "![](../images/ddp.png)\n", "\n", "
\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "src/ddp.py\n", "\"\"\"\n", "\n", "import torch\n", "import torch.distributed as dist\n", "from torch.nn.parallel import DistributedDataParallel\n", "from torch.optim import Adam\n", "from torch.utils.data import DataLoader, DistributedSampler\n", "from transformers import BertForSequenceClassification, BertTokenizer\n", "from datasets import load_dataset\n", "\n", "# 1. initialize process group\n", "dist.init_process_group(\"nccl\")\n", "rank = dist.get_rank()\n", "world_size = dist.get_world_size()\n", "torch.cuda.set_device(rank)\n", "device = torch.cuda.current_device()\n", "\n", "# 2. create dataset\n", "datasets = load_dataset(\"multi_nli\").data[\"train\"]\n", "datasets = [\n", " {\n", " \"premise\": str(p),\n", " \"hypothesis\": str(h),\n", " \"labels\": l.as_py(),\n", " }\n", " for p, h, l in zip(datasets[2], datasets[5], datasets[9])\n", "]\n", "\n", "# 3. create DistributedSampler\n", "# DistributedSampler는 데이터를 쪼개서 다른 프로세스로 전송하기 위한 모듈입니다.\n", "sampler = DistributedSampler(\n", " datasets,\n", " num_replicas=world_size,\n", " rank=rank,\n", " shuffle=True,\n", ")\n", "data_loader = DataLoader(\n", " datasets,\n", " batch_size=32,\n", " num_workers=4,\n", " sampler=sampler,\n", " shuffle=False,\n", " pin_memory=True,\n", ")\n", "\n", "\n", "# 4. create model and tokenizer\n", "model_name = \"bert-base-cased\"\n", "tokenizer = BertTokenizer.from_pretrained(model_name)\n", "model = BertForSequenceClassification.from_pretrained(model_name, num_labels=3).cuda()\n", "# 5. make distributed data parallel module\n", "model = DistributedDataParallel(model, device_ids=[device], output_device=device)\n", "\n", "# 5. create optimizer\n", "optimizer = Adam(model.parameters(), lr=3e-5)\n", "\n", "# 6. start training\n", "for i, data in enumerate(data_loader):\n", " optimizer.zero_grad()\n", " tokens = tokenizer(\n", " data[\"premise\"],\n", " \n", "\n", "### 결과적으로 `backward()`와 `all-reduce`를 중첩시키는 것이 좋습니다.\n", "\n", "결과적으로 `backward()`와 `all-reduce`를 중첩시키는 것이 가장 효율적인 방식입니다. `all_reduce`는 네트워크 통신, `backward()`, `step()` 등은 GPU 연산이기 때문에 동시에 처리할 수 있죠. 이들을 중첩시키면 즉, computation과 communication이 최대한으로 overlap 되기 때문에 연산 효율이 크게 증가합니다.\n", "\n", "![](../images/ddp_analysis_2.png)\n", "\n", "
\n", "\n", "분석 결과 `backward()`와 `step()`을 비교해보면 `backward()`가 훨씬 무거운 연산이였습니다.\n", "\n", "![](../images/ddp_analysis_3.png)\n", "\n", "
\n", "\n", "당연히 더 무거운 연산을 중첩시킬 수록 전체 학습 과정을 수행하는 시간이 짧아집니다. 분석 결과 `backward()`가 끝날때 까지 기다리는 것 보다 `all-reduce`를 함께 수행하는 것이 훨씬 빨랐습니다.\n", "\n", "![](../images/ddp_analysis_4.png)\n", "\n", "
\n", "\n", "### 이 때, 생길 수 있는 궁금증들...\n", "- Q1: `backward()` 연산 중에 Gradient가 모두 계산되지 않았는데 어떻게 `all-reduce`를 수행합니까?\n", " - A1: `backward()`는 뒤쪽 레이어부터 순차적으로 이루어지기 때문에 계산이 끝난 레이어 먼저 전송하면 됩니다.\n", "\n", "
\n", "\n", "- Q2: 그렇다면 언제마다 `all-reduce`를 수행하나요? 레이어마다 이루어지나요?\n", " - A2: 아닙니다. Gradient Bucketing을 수행합니다. Bucket이 가득차면 All-reduce를 수행합니다.\n", "\n", "
\n", "\n", "### Gradient Bucekting\n", "Gradient Bucekting는 Gradient를 일정한 사이즈의 bucket에 저장해두고 가득차면 다른 프로세스로 전송하는 방식입니다. 가장 먼저 `backward()` 연산 도중 뒤쪽부터 계산된 Gradient들을 차례대로 bucket에 저장하다가 bucket의 용량이 가득차면 All-reduce를 수행해서 각 device에 Gradient의 합을 전달합니다. 그림 때문에 헷갈릴 수도 있는데, bucket에 저장되는 것은 모델의 파라미터가 아닌 해당 레이어에서 출력된 Gradient입니다. 모든 bucket은 일정한 사이즈를 가지고 있으며 `bucket_size_mb` 인자를 통해 mega-byte 단위로 용량을 설정 할 수 있습니다.\n", "\n", "![](../images/ddp_analysis_5.png)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }