# GPT-J-6B Fine-Tuning with Ray Train and DeepSpeed

This example showcases how to use Ray Train for **GPT-J fine-tuning**. GPT-J is a GPT-2-like causal language model trained on the Pile dataset. This particular model has 6 billion parameters. For more information, see [GPT-J](https://huggingface.co/docs/transformers/model_doc/gptj).

This example uses the Ray Train ðŸ¤— Transformers integration and a pre-trained model from the Hugging Face Hub. Note that this example is adaptable to other similar models.

This is an advanced example that focuses on the performance and distributed computing aspects of Ray Train. For a beginner-friendly introduction to the Ray Train ðŸ¤— Transformers integration, see {ref}`Basic Example for HuggingFace Transformers <transformers_torch_trainer_basic_example>`.

Read [Ray Train Key Concepts](train-key-concepts) and [Ray Data Integration User Guides](data-ingest-torch) before starting this example.

```{note}
To run this example, make sure your Ray cluster has access to at least one GPU with 16 or more GBs of memory. The required amount of memory depends on the model. This notebook is tested with 16 g4dn.4xlarge instances (including the head node).
```

This notebook has the following steps:
1. [Set up Ray](#gptj-setup)
2. [Load the dataset](#gptj-load)
3. [Preprocess the dataset with Ray Data](#gptj-preprocess)
4. [Run the training with Ray Train](#gptj-train)
5. [Generate text from prompt](#gptj-predict)

Uncomment and run the following line in order to install all the necessary dependencies (this notebook was tested with `accelerate=0.18.0`, `transformers==4.26.0`, `deepspeed==0.12.3`):

In [1]:
! pip install "datasets" "evaluate" "accelerate==0.18.0" "transformers==4.26.0" "torch>=1.12.0" "deepspeed==0.12.3"

In [1]:
import numpy as np
import pandas as pd
import os

(gptj-setup)=
## Set up Ray

First, let's set some global variables. We will use 16 workers, each being assigned 1 GPU and 8 CPUs.

In [3]:
model_name = "EleutherAI/gpt-j-6B"
use_gpu = True
num_workers = 16
cpus_per_worker = 8

We will use `ray.init()` to initialize a local cluster. By default, this cluster will be comprised of only the machine you are running this notebook on. You can also run this notebook on an Anyscale cluster.

We define a {ref}`runtime environment <runtime-environments>` to ensure that the Ray workers have access to all the necessary packages. You can omit the `runtime_env` argument if you have all of the packages already installed on each node in your cluster.

In [None]:
import ray

ray.init(
    runtime_env={
        "pip": [
            "datasets",
            "evaluate",
            # The latest combination accelerate==0.25.0, transformers==4.36.0, deepspeed==0.12.4
            # has issues with DeepSpeed process group initialization,
            # and will result in a batch_size validation problem.
            # TODO(ml-team): get rid of the pins once the issue is fixed.
            "accelerate==0.18.0",
            "transformers==4.26.0",
            "torch>=1.12.0",
            "deepspeed==0.12.3",
        ],
    },
)

In [None]:
# THIS SHOULD BE HIDDEN IN DOCS AND ONLY RAN IN CI
# Download the model from our S3 mirror as it's faster

import ray
import subprocess
import ray.util.scheduling_strategies


def force_on_node(node_id: str, remote_func_or_actor_class):
    scheduling_strategy = ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
        node_id=node_id, soft=False
    )
    options = {"scheduling_strategy": scheduling_strategy}
    return remote_func_or_actor_class.options(**options)


def run_on_every_node(remote_func_or_actor_class, **remote_kwargs):
    refs = []
    for node in ray.nodes():
        if node["Alive"] and node["Resources"].get("GPU", None):
            refs.append(
                force_on_node(node["NodeID"], remote_func_or_actor_class).remote(
                    **remote_kwargs
                )
            )
    return ray.get(refs)


@ray.remote(num_gpus=1)
def download_model():
    from transformers.utils.hub import TRANSFORMERS_CACHE

    path = os.path.expanduser(
        os.path.join(TRANSFORMERS_CACHE, "models--EleutherAI--gpt-j-6B")
    )
    subprocess.run(["mkdir", "-p", os.path.join(path, "snapshots", "main")])
    subprocess.run(["mkdir", "-p", os.path.join(path, "refs")])
    if os.path.exists(os.path.join(path, "refs", "main")):
        return
    subprocess.run(
        [
            "aws",
            "s3",
            "sync",
            "--no-sign-request",
            "s3://large-dl-models-mirror/models--EleutherAI--gpt-j-6B/main/",
            os.path.join(path, "snapshots", "main"),
        ]
    )
    with open(os.path.join(path, "snapshots", "main", "hash"), "r") as f:
        f_hash = f.read().strip()
    with open(os.path.join(path, "refs", "main"), "w") as f:
        f.write(f_hash)
    os.rename(
        os.path.join(path, "snapshots", "main"), os.path.join(path, "snapshots", f_hash)
    )


_ = run_on_every_node(download_model)

(gptj-load)=
## Loading the dataset

We will be fine-tuning the model on the [`tiny_shakespeare` dataset](https://huggingface.co/datasets/tiny_shakespeare), comprised of 40,000 lines of Shakespeare from a variety of Shakespeare's plays. The aim will be to make the GPT-J model better at generating text in the style of Shakespeare.

In [None]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset

We will use [Ray Data](data) for distributed preprocessing and data ingestion. We can easily convert the dataset obtained from Hugging Face Hub to Ray Data by using {meth}`ray.data.from_huggingface`.

In [6]:
import ray.data

ray_datasets = {
    "train": ray.data.from_huggingface(current_dataset["train"]),
    "validation": ray.data.from_huggingface(current_dataset["validation"]),
}

ray_datasets

{'train': MaterializedDataset(num_blocks=1, num_rows=1, schema={text: string}),
 'validation': MaterializedDataset(num_blocks=1, num_rows=1, schema={text: string})}

(gptj-preprocess)=
Note that the dataset is represented by a single line of large string, and needs some preprocessing. To do this, use the {meth}`~ray.data.Dataset.map_batches` API to apply transformation functions to batches of data.

The `split_text` function takes the single string and splits it into separate lines, removing empty lines and character names ending with ':' (eg. 'ROMEO:'). The `tokenize` function takes the lines and tokenizes them using the ðŸ¤— Tokenizer associated with the model, ensuring each entry has the same length (`block_size`) by padding and truncating. This preprocessing is necessary for training.

```{note}
This preprocessing can be done in other ways. A common pattern is to tokenize first, and then split the obtained tokens into equally-sized blocks.
```

In [7]:
block_size = 512

In [8]:
from transformers import AutoTokenizer


def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])


def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)


processed_datasets = {
    key: (
        ds.map_batches(split_text, batch_format="pandas")
        .map_batches(tokenize, batch_format="pandas")
    )
    for key, ds in ray_datasets.items()
}
processed_datasets

{'train': MapBatches(tokenize)
 +- MapBatches(split_text)
    +- Dataset(num_blocks=1, num_rows=1, schema={text: string}),
 'validation': MapBatches(tokenize)
 +- MapBatches(split_text)
    +- Dataset(num_blocks=1, num_rows=1, schema={text: string})}

(gptj-train)=
### Fine-tuning the model with Ray Train

Configure Ray Train's {class}`~ray.train.torch.TorchTrainer` to perform distributed fine-tuning of the model. Specify a `train_loop_per_worker` function, which defines the training logic to be distributed by Ray using Distributed Data Parallelism, which uses the PyTorch Distributed backend internally. Each worker has its own copy of the model, but operates on different data. At the end of each step, all the workers sync gradients.

Because GPT-J is a relatively large model, it may not be possible to fit it on smaller GPU types (<=16 GB GRAM). To deal with that issue, this example uses [DeepSpeed](https://github.com/microsoft/DeepSpeed), a library to optimize the training process and to offload and partition optimizer and parameter states, reducing GRAM usage. Furthermore, DeepSpeed ZeRO Stage 3 can load large models without running out of memory.

ðŸ¤— Transformers and Ray Train's {ref}`integrations <train-transformers-integration>` allow you to easily configure and use DDP and DeepSpeed. All you need to do is specify the DeepSpeed configuration in the [`TrainingArguments`](https://huggingface.co/docs/transformers/en/main_classes/trainer#transformers.TrainingArguments) object.

```{tip}
There are many DeepSpeed settings that allow you to trade-off speed for memory usage. The settings used below are tailored to the cluster setup used (16 g4dn.4xlarge nodes) and per device batch size of 16. Some things to keep in mind:
- If your GPUs support bfloat16, use that instead of float16 mixed precision to get better performance and prevent overflows. Replace `fp16=True` with `bf16=True` in `TrainingArguments`.
- If you are running out of GRAM: try reducing batch size (defined in the cell below the next one), set `"overlap_comm": False` in DeepSpeed config.
- If you are running out of RAM, add more nodes to your cluster, use nodes with more RAM, set `"pin_memory": False` in the DeepSpeed config, reduce the batch size, and remove `"offload_param"` from the DeepSpeed config.

For more information on DeepSpeed configuration, refer to [Hugging Face documentation](https://huggingface.co/docs/transformers/main_classes/deepspeed) and [DeepSpeed documentation](https://www.deepspeed.ai/docs/config-json/).

Additionally, if you prefer a lower-level API, the logic below can be expressed as an [Accelerate training loop](https://github.com/huggingface/accelerate/blob/main/examples/by_feature/deepspeed_with_config_support.py) distributed by a Ray Train {class}`~ray.train.torch.torch_trainer.TorchTrainer`.
```

#### Training speed

As this example uses data parallelism, each worker operates on its own shard of the data. The batch size set in `train_ds.iter_torch_batches` is the **per device batch size** (per worker batch size). By changing the number of workers, you can change the **effective batch size** and thus the time needed for training to complete. Calculate the effective batch size as `per device batch size * number of workers * number of gradient accumulation steps`. As you add more workers, the effective batch size rises and thus less time is needed to complete a full epoch. While the speedup is not exactly linear due to extra communication overheads, in many cases it can be close to linear.

The preprocessed dataset has 1348 examples. We have set per device batch size to 16.

* With 16 g4dn.4xlarge nodes, the effective batch size was 256, which equals to 85 steps per epoch. One epoch took **~2440 seconds** (including initialization time).

* With 32 g4dn.4xlarge nodes, the effective batch size was 512, which equals to 43 steps per epoch. One epoch took **~1280 seconds** (including initialization time).

In [None]:
import evaluate
import torch
from transformers import (
    Trainer,
    TrainingArguments,
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar

from ray import train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback


def train_func(config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        train.get_context().get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)
    steps_per_epoch = config.get("steps_per_epoch")

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
            "hysteresis": 4,
            "consecutive_hysteresis": True,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        logging_steps=1,
        save_strategy="steps",
        save_steps=steps_per_epoch,
        max_steps=steps_per_epoch * epochs,
        per_device_train_batch_size=batch_size,
        gradient_accumulation_steps=1,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        push_to_hub=False,
        report_to="none",
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    train_ds = train.get_dataset_shard("train")
    eval_ds = train.get_dataset_shard("validation")

    train_ds_iterable = train_ds.iter_torch_batches(
        batch_size=batch_size,
        local_shuffle_buffer_size=train.get_context().get_world_size() * batch_size,
    )
    eval_ds_iterable = eval_ds.iter_torch_batches(batch_size=batch_size)

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_ds_iterable,
        eval_dataset=eval_ds_iterable,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )

    # Add callback to report checkpoints to Ray Train
    trainer.add_callback(RayTrainReportCallback())
    trainer = prepare_trainer(trainer)
    trainer.train()

After defining the training function, instantiate the {class}`~ray.train.torch.TorchTrainer`. Aside from the function, set the `scaling_config` to control the number of workers and amount of resources to use, and `datasets`(the preprocessed Ray Datasets) to use for training and evaluation.

```{note}
Running with multiple nodes necessitates the persistence of checkpoints
and other outputs to some external storage for access after training has completed.
**You should set up cloud storage or NFS, then replace `storage_path` with your own cloud bucket URI or NFS path.**

See {ref}`Configuration and Persistent Storage<persistent-storage-guide>` for more details.
```

In [None]:
storage_path = "s3://your-bucket-here"  # TODO: Set up cloud storage
# storage_path="/mnt/path/to/nfs"     # TODO: Alternatively, set up NFS

In [10]:
import os, re

artifact_storage = os.environ.get("ANYSCALE_ARTIFACT_STORAGE", "artifact_storage")
user_name = re.sub(r"\s+", "__", os.environ.get("ANYSCALE_USERNAME", "user"))
storage_path = f"{artifact_storage}/{user_name}/gptj-deepspeed-finetune"

In [None]:
batch_size = 16
train_ds_size = processed_datasets["train"].count()
steps_per_epoch = train_ds_size // (batch_size * num_workers)

In [None]:
# SMOKE TEST SETTINGS FOR CI
steps_per_epoch = 10
num_workers = 8
batch_size = 1

In [12]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={
        "epochs": 1,
        "batch_size": batch_size,  # per device
        "steps_per_epoch": steps_per_epoch,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets=processed_datasets,
    run_config=RunConfig(storage_path=storage_path),
)

Finally, call the {meth}`~ray.train.torch.TorchTrainer.fit` method to start training with Ray Train. Save the {class}`~ray.train.Result` object to a variable to access metrics and checkpoints.

In [14]:
results = trainer.fit()

0,1
Current time:,2023-08-18 18:54:02
Running for:,00:44:50.37
Memory:,10.2/62.0 GiB

Trial name,status,loc,iter,total time (s),loss,learning_rate,epoch
TorchTrainer_01ea5_00000,TERMINATED,10.0.60.59:8839,1,2663.78,0.069,2.38095e-07,1


[2m[36m(TrainTrainable pid=8839, ip=10.0.60.59)[0m 2023-08-18 18:09:16.315108: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
[2m[36m(TrainTrainable pid=8839, ip=10.0.60.59)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(TrainTrainable pid=8839, ip=10.0.60.59)[0m 2023-08-18 18:09:16.462944: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[2m[36m(TrainTrainable pid=8839, ip=10.0.60.59)[0m 2023-08-18 18:09:17.336229: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic 

[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m Preparing training arguments
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Loading model
[2m[1m[36m(autoscaler +3m53s)[0m [workspace snapshot] New snapshot created successfully (size: 172.52 MB).
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:12:01,852] [INFO] [partition_parameters.py:454:__exit__] finished initializing model with 6.05B parameters
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Preparing training arguments[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8880, ip=10.0.63.99)[0m Loading model[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8851, ip=10.0.43.240)[0m Model loaded


Downloading builder script: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 4.20k/4.20k [00:00<00:00, 22.1MB/s]
[2m[36m(SplitCoordinator pid=8980, ip=10.0.60.59)[0m 2023-08-18 18:09:27.424862: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64[32m [repeated 32x across cluster][0m
[2m[36m(RayTrainWorker pid=36311, ip=10.0.27.53)[0m comet_ml is installed but `COMET_API_KEY` is not set.[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36262, ip=10.0.52.191)[0m --------------------------------------------------------------------------[32m [repeated 26x across cluster][0m
[2m[36m(RayTrainWorker pid=36262, ip=10.0.52.191)[0m                  Aim collects anonymous usage analytics.                 [32m [repeated 13x across cluster][0m


[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:12:36,256] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed info: version=0.9.2, git-hash=unknown, git-branch=unknown
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:12:36,373] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False


[2m[36m(RayTrainWorker pid=8858, ip=10.0.0.119)[0m Using /home/ray/.cache/torch_extensions/py39_cu118 as PyTorch extensions root...
[2m[36m(RayTrainWorker pid=8858, ip=10.0.0.119)[0m Creating extension directory /home/ray/.cache/torch_extensions/py39_cu118/cpu_adam...
Downloading builder script: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 4.20k/4.20k [00:00<00:00, 19.8MB/s][32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8857, ip=10.0.44.114)[0m max_steps is given, it will override any value given in num_train_epochs[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8857, ip=10.0.44.114)[0m Using cuda_amp half precision backend[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=49329)[0m Detected CUDA files, patching ldflags
[2m[36m(RayTrainWorker pid=49329)[0m Emitting ninja build file /home/ray/.cache/torch_extensions/py39_cu118/cpu_adam/build.ninja...
[2m[36m(RayTrainWorker pid=49329)[0m Building extension module cpu_ada

[2m[36m(RayTrainWorker pid=8858, ip=10.0.0.119)[0m [1/3] /usr/local/cuda/bin/nvcc  -DTORCH_EXTENSION_NAME=cpu_adam -DTORCH_API_INCLUDE_EXTENSION_H -DPYBIND11_COMPILER_TYPE=\"_gcc\" -DPYBIND11_STDLIB=\"_libstdcpp\" -DPYBIND11_BUILD_ABI=\"_cxxabi1011\" -I/home/ray/anaconda3/lib/python3.9/site-packages/deepspeed/ops/csrc/includes -I/usr/local/cuda/include -isystem /home/ray/anaconda3/lib/python3.9/site-packages/torch/include -isystem /home/ray/anaconda3/lib/python3.9/site-packages/torch/include/torch/csrc/api/include -isystem /home/ray/anaconda3/lib/python3.9/site-packages/torch/include/TH -isystem /home/ray/anaconda3/lib/python3.9/site-packages/torch/include/THC -isystem /usr/local/cuda/include -isystem /home/ray/anaconda3/include/python3.9 -D_GLIBCXX_USE_CXX11_ABI=0 -D__CUDA_NO_HALF_OPERATORS__ -D__CUDA_NO_HALF_CONVERSIONS__ -D__CUDA_NO_BFLOAT16_CONVERSIONS__ -D__CUDA_NO_HALF2_OPERATORS__ --expt-relaxed-constexpr -gencode=arch=compute_75,code=compute_75 -gencode=arch=compute_75,code=

[2m[36m(RayTrainWorker pid=49329)[0m Loading extension module cpu_adam...
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Using /home/ray/.cache/torch_extensions/py39_cu118 as PyTorch extensions root...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Creating extension directory /home/ray/.cache/torch_extensions/py39_cu118/cpu_adam...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Detected CUDA files, patching ldflags[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Emitting ninja build file /home/ray/.cache/torch_extensions/py39_cu118/cpu_adam/build.ninja...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Building extension module cpu_adam...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Allowing ninja to set a default number of workers... (

[2m[36m(RayTrainWorker pid=49329)[0m Adam Optimizer #0 is created with AVX512 arithmetic capability.
[2m[36m(RayTrainWorker pid=49329)[0m Config: alpha=0.000020, betas=(0.900000, 0.999000), weight_decay=0.000000, adam_w=1


[2m[36m(RayTrainWorker pid=49329)[0m Building extension module utils...


[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:13,196] [INFO] [logging.py:96:log_dist] [Rank 0] Using DeepSpeed Optimizer param name adamw as basic optimizer
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:13,212] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Basic Optimizer = DeepSpeedCPUAdam
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:13,212] [INFO] [utils.py:54:is_zero_supported_optimizer] Checking ZeRO support for optimizer=DeepSpeedCPUAdam type=<class 'deepspeed.ops.adam.cpu_adam.DeepSpeedCPUAdam'>
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:13,212] [INFO] [logging.py:96:log_dist] [Rank 0] Creating fp16 ZeRO stage 3 optimizer, MiCS is enabled False, Hierarchical params gather False
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:13,212] [INFO] [logging.py:96:log_dist] [Rank 0] Creating torch.float16 ZeRO stage 3 optimizer
[2m[36m(RayTrainWorker pi

[2m[36m(RayTrainWorker pid=49329)[0m Loading extension module utils...
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Loading extension module cpu_adam...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Using /home/ray/.cache/torch_extensions/py39_cu118 as PyTorch extensions root...[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Creating extension directory /home/ray/.cache/torch_extensions/py39_cu118/utils...[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Emitting ninja build file /home/ray/.cache/torch_extensions/py39_cu118/utils/build.ninja...[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.

[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:29,490] [INFO] [utils.py:785:see_memory_usage] DeepSpeedZeRoOffload initialize [begin]
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:29,491] [INFO] [utils.py:786:see_memory_usage] MA 0.11 GB         Max_MA 0.11 GB         CA 1.54 GB         Max_CA 2 GB 
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:29,491] [INFO] [utils.py:793:see_memory_usage] CPU Virtual Memory:  used = 8.96 GB, percent = 14.5%
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Parameter Offload: Total persistent parameters: 811008 in 114 params
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:29,763] [INFO] [utils.py:785:see_memory_usage] DeepSpeedZeRoOffload initialize [end]
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:29,764] [INFO] [utils.py:786:see_memory_usage] MA 0.11 GB         Max_MA 0.11 GB         CA 1.54 GB         Max_CA 2 GB 
[2

[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Loading extension module utils...[32m [repeated 15x across cluster][0m


[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m [2/2] c++ flatten_unflatten.o -shared -L/home/ray/anaconda3/lib/python3.9/site-packages/torch/lib -lc10 -ltorch_cpu -ltorch -ltorch_python -o utils.so[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=36675, ip=10.0.13.222)[0m Time to load utils op: 16.94431161880493 seconds[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:31,872] [INFO] [utils.py:785:see_memory_usage] After creating fp16 partitions: 1
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:31,873] [INFO] [utils.py:786:see_memory_usage] MA 0.11 GB         Max_MA 0.11 GB         CA 1.54 GB         Max_CA 2 GB 
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:31,873] [INFO] [utils.py:793:see_memory_usage] CPU Virtual Memory:  used = 9.98 GB, percent = 16.1%
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:32,120] [INFO] [util

[2m[36m(RayTrainWorker pid=8830, ip=10.0.30.35)[0m Using /home/ray/.cache/torch_extensions/py39_cu118 as PyTorch extensions root...
[2m[36m(RayTrainWorker pid=8830, ip=10.0.30.35)[0m No modifications detected for re-loaded extension module utils, skipping build step...
[2m[36m(RayTrainWorker pid=8830, ip=10.0.30.35)[0m Loading extension module utils...
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m Loading extension module utils...
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m ***** Running training *****
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m   Num examples = 10752
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m   Num Epochs = 9223372036854775807
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m   Instantaneous batch size per device = 8
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m   Total train batch size (w. parallel, distributed & accumulation) = 128
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m   Gradient Accu

[2m[36m(RayTrainWorker pid=8830, ip=10.0.30.35)[0m Time to load utils op: 0.0005006790161132812 seconds
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m Time to load utils op: 0.0005137920379638672 seconds
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:40,692] [INFO] [utils.py:785:see_memory_usage] After initializing ZeRO optimizer
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:40,693] [INFO] [utils.py:786:see_memory_usage] MA 0.14 GB         Max_MA 0.91 GB         CA 1.54 GB         Max_CA 2 GB 
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:40,693] [INFO] [utils.py:793:see_memory_usage] CPU Virtual Memory:  used = 17.3 GB, percent = 27.9%
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:40,694] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Final Optimizer = adamw
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:13:40,694] [INFO] [logging.py:96:log_dist] 

(pid=8980, ip=10.0.60.59) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=8980, ip=10.0.60.59)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(split_text)->MapBatches(tokenize)] -> OutputSplitter[split(16, equal=True)]
[2m[36m(SplitCoordinator pid=8980, ip=10.0.60.59)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['6002ded0aaa53ce9a0351d22a72b344ef411a422919132f41d9f937a', 'd3bbd390b6fe73f26202f96d75998946cf3e8b457528d426db0c6e07', 'fe6aaf54317ee630a02d23e0d49581b57b5cd51316eaf769e28bb045', 'f7de4694a4f764c05a9c51a6a4bd40ac33f3fced3b25127b25cd4ac3', '42866a2fba4ce2ab4b6645c4d731d486b762e2b23ac24cafccba7096', '8a7272830662c7e756a656de0a9b433a3a1f9b990768f692b6fe11a7', 'bba62e8b57552509c62a6b6b7fd67c1a2280b9d81b3d9c41eb4d1b9b', 'b40764f303538c24bc439106f2e7b2144d382bfed6c9fdec15ab828e', 'd1de4d4b6d44eff93857026df4ef0f70e24e3dc91e15d87015f2ed32', '4d6a9dc1aa7bfc80cb73d9f66f4e28041807f12769391f5643bce

[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Time to load utils op: 0.0003864765167236328 seconds[32m [repeated 14x across cluster][0m
[2m[36m(RayTrainWorker pid=8851, ip=10.0.43.240)[0m {'loss': 12.1235, 'learning_rate': 1.9761904761904763e-05, 'epoch': 0.01}
[2m[36m(RayTrainWorker pid=8851, ip=10.0.43.240)[0m {'loss': 6.7834, 'learning_rate': 1.9523809523809524e-05, 'epoch': 0.02}[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=8857, ip=10.0.44.114)[0m {'loss': 2.2151, 'learning_rate': 1.928571428571429e-05, 'epoch': 0.04}[32m [repeated 16x across cluster][0m
[2m[36m(RayTrainWorker pid=9631, ip=10.0.57.153)[0m {'loss': 0.1739, 'learning_rate': 1.904761904761905e-05, 'epoch': 0.05}[32m [repeated 16x across cluster][0m
[2m[1m[36m(autoscaler +8m53s)[0m [workspace snapshot] New snapshot created successfully (size: 172.58 MB).
[2m[36m(RayTrainWorker pid=8858, ip=10.0.0.119)[0m {'loss': 0.121, 'learning_rate': 1.880952380952381e-05, 'epoc

[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Saving model checkpoint to output/checkpoint-84
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Configuration saved in output/checkpoint-84/config.json
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Configuration saved in output/checkpoint-84/generation_config.json
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Using /home/ray/.cache/torch_extensions/py39_cu118 as PyTorch extensions root...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m No modifications detected for re-loaded extension module utils, skipping build step...[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Loading extension module utils...[32m [repeated 14x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m ***** Running training *****[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m   Num exam

[2m[36m(RayTrainWorker pid=49329)[0m [2023-08-18 18:52:12,213] [INFO] [torch_checkpoint_engine.py:33:commit] [Torch] Checkpoint global_step84 is ready now!
[2m[36m(RayTrainWorker pid=36249, ip=10.0.11.26)[0m {'loss': 0.069, 'learning_rate': 2.3809523809523811e-07, 'epoch': 1.0}[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:12,213] [INFO] [logging.py:96:log_dist] [Rank 0] [Torch] Checkpoint global_step84 is about to be saved!
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:12,213] [INFO] [engine.py:3337:save_16bit_model] Saving model weights to output/checkpoint-84/pytorch_model.bin, tag: global_step84
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:12,213] [INFO] [torch_checkpoint_engine.py:21:save] [Torch] Saving output/checkpoint-84/pytorch_model.bin...




[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:27,660] [INFO] [torch_checkpoint_engine.py:23:save] [Torch] Saved output/checkpoint-84/pytorch_model.bin.
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:27,673] [INFO] [logging.py:96:log_dist] [Rank 0] [Torch] Checkpoint global_step84 is about to be saved!
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:27,684] [INFO] [logging.py:96:log_dist] [Rank 0] Saving model checkpoint: output/checkpoint-84/global_step84/zero_pp_rank_0_mp_rank_00_model_states.pt
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:27,685] [INFO] [torch_checkpoint_engine.py:21:save] [Torch] Saving output/checkpoint-84/global_step84/zero_pp_rank_0_mp_rank_00_model_states.pt...
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:27,660] [INFO] [torch_checkpoint_engine.py:33:commit] [Torch] Checkpoint global_step84 is ready now![32m [repeated 15x across clust

[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m 
[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m 
[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m Training completed. Do not forget to share your model on huggingface.co/models =)
[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m 
[2m[36m(RayTrainWorker pid=8885, ip=10.0.47.209)[0m 

To fix this issue, configure AIR to use either:
(1) Cloud storage: `RunConfig(storage_path='s3://your/bucket')`
(2) A network filesystem mounted on all nodes: `RunConfig(storage_path='/mnt/path/to/nfs_storage')`
See this Github issue for more details on transitioning to cloud storage/NFS as well as an explanation on why this functionality is being removed: https://github.com/ray-project/ray/issues/37177

Other temporary workarounds:
- Or, to re-enable the head node syncing behavior, set the environment variable RAY_AIR_REENABLE_DEPRECATED_SYNC_TO_HEAD_NODE=1
  - **Note that this functionality will tentatively be hard-deprecated in

[2m[36m(RayTrainWorker pid=36262, ip=10.0.52.191)[0m {'train_runtime': 2355.3551, 'train_samples_per_second': 4.565, 'train_steps_per_second': 0.036, 'train_loss': 0.32820896875290645, 'epoch': 1.0}
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [2023-08-18 18:52:36,012] [INFO] [engine.py:3228:_save_zero_checkpoint] zero checkpoint saved output/checkpoint-84/global_step84/zero_pp_rank_0_mp_rank_00_optim_states.pt[32m [repeated 15x across cluster][0m
[2m[36m(RayTrainWorker pid=8875, ip=10.0.0.80)[0m [2023-08-18 18:52:36,193] [INFO] [torch_checkpoint_engine.py:33:commit] [Torch] Checkpoint global_step84 is ready now![32m [repeated 15x across cluster][0m


[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m [32m [repeated 60x across cluster][0m
[2m[36m(RayTrainWorker pid=8911, ip=10.0.60.59)[0m Training completed. Do not forget to share your model on huggingface.co/models =)[32m [repeated 15x across cluster][0m
2023-08-18 18:54:02,594	INFO tune.py:1146 -- Total run time: 2691.03 seconds (2676.82 seconds for the tuning loop).


Use the returned {class}`~ray.train.Result` object to access metrics and the Ray Train {class}`~ray.train.Checkpoint` associated with the last iteration.

In [15]:
checkpoint = results.checkpoint
checkpoint

Checkpoint(filesystem=<pyarrow._s3fs.S3FileSystem object at 0x7f8c59d311b0>, path=anyscale-staging-data-cld-kvedzwag2qa8i5bjxuevf5i7/org_7c1Kalm9WcX2bNIjW53GUT/cld_kvedZWag2qA8i5BjxUevf5i7/artifact_storage/yunxuan__xiao/gptj-deepspeed-finetune/TorchTrainer_2023-08-18_18-09-11/TorchTrainer_01ea5_00000_0_2023-08-18_18-09-12/checkpoint_000000)

(gptj-predict)=
### Generate text from prompt

First, download the persistent Ray Train checkpoint locally and load the fine-tuned model weights and tokenizer from the checkpoint. Then use ðŸ¤— Transformers [`pipeline`](https://huggingface.co/docs/transformers/en/main_classes/pipelines) to generate predictions from the fine-tuned model.

```{tip}
For large scale batch inference, see {ref}`End-to-end: Offline Batch Inference <batch_inference_home>`.
```

In [16]:
!awsv2 configure set s3.max_concurrent_requests 32
!awsv2 configure set default.s3.preferred_transfer_client crt
!awsv2 configure set default.s3.target_bandwidth 100Gb/s
!awsv2 configure set default.s3.multipart_chunksize 8MB

In [None]:
import os

os.system(f"awsv2 s3 sync s3://{checkpoint.path} /mnt/local_storage/")

Set the `task` to `"text-generation"`, and also set `device_map="auto"` for Ray Train to automatically place the model on the right device. 

In [5]:
from transformers import pipeline, AutoTokenizer, GPTJForCausalLM

model = GPTJForCausalLM.from_pretrained("/mnt/local_storage/checkpoint")
tokenizer = AutoTokenizer.from_pretrained("/mnt/local_storage/checkpoint")

pipe = pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    torch_dtype=torch.float16,
    device_map="auto",
)

In [6]:
# Generate from prompts!
for sentence in pipe(
    ["Romeo and Juliet", "Romeo", "Juliet"], do_sample=True, min_length=20
):
    print(sentence)

[{'generated_text': 'Romeo and Juliet. This very night shall they come. A word with you, sir.'}]
[{'generated_text': 'Romeo! I know thee not. Lord Mercutio, is it you! Signior Montague.'}]
[{'generated_text': 'Juliet, look up in the vault, and there shalt find a grave; within the monument there is a table:'}]
