# Fine-tune a PyTorch Lightning Text Classifier with Ray Data

:::{note}

This is an intermediate example demonstrates how to use [Ray Data](data) with PyTorch Lightning in Ray Train.

If you just want to quickly convert your existing PyTorch Lightning scripts into Ray Train, you can refer to the [Lightning Quick Start Guide](train-pytorch-lightning).

:::

This demo introduces how to fine-tune a text classifier on the [CoLA(The Corpus of Linguistic Acceptability)](https://nyu-mll.github.io/CoLA/) dataset using a pre-trained BERT model. In particular, it follows three steps:
- Preprocess the CoLA dataset with Ray Data.
- Define a training function with PyTorch Lightning.
- Launch distributed training with Ray Train's TorchTrainer.

Run the following line in order to install all the necessary dependencies:

In [1]:
SMOKE_TEST = True

In [2]:
!pip install numpy datasets "transformers>=4.19.1" "pytorch_lightning>=1.6.5"

Start by importing the needed libraries:

In [3]:
import ray
import torch
import numpy as np
import pytorch_lightning as pl
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset, load_metric

2023-08-14 16:45:51.059256: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-14 16:45:51.198481: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-14 16:45:52.005931: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /usr/local/nvidia/lib:/usr/local/nvidia/lib64
2023-08-14 16:45:52.006010: W tensorflow/

## Pre-process CoLA Dataset

CoLA is a dataset for binary sentence classification with 10.6K training examples. First, download the dataset and metrics using the Hugging Face datasets API, and create a Ray Dataset for each split accordingly.

In [None]:
dataset = load_dataset("glue", "cola")

train_dataset = ray.data.from_huggingface(dataset["train"])
validation_dataset = ray.data.from_huggingface(dataset["validation"])

Next, tokenize the input sentences and pad the ID sequence to length 128 using the `bert-base-uncased` tokenizer. The {meth}`map_batches <ray.data.Dataset.map_batches>` applies this preprocessing function on all data samples.

In [5]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

def tokenize_sentence(batch):
    outputs = tokenizer(
        batch["sentence"].tolist(),
        max_length=128,
        truncation=True,
        padding="max_length",
        return_tensors="np",
    )
    outputs["label"] = batch["label"]
    return outputs

train_dataset = train_dataset.map_batches(tokenize_sentence, batch_format="numpy")
validation_dataset = validation_dataset.map_batches(tokenize_sentence, batch_format="numpy")

## Define a PyTorch Lightning model

You don't have to make any changes to your `LightningModule` definition. Just copy and paste your code here:

In [6]:
class SentimentModel(pl.LightningModule):
    def __init__(self, lr=2e-5, eps=1e-8):
        super().__init__()
        self.lr = lr
        self.eps = eps
        self.num_classes = 2
        self.model = AutoModelForSequenceClassification.from_pretrained(
            "bert-base-cased", num_labels=self.num_classes
        )
        self.metric = load_metric("glue", "cola")
        self.predictions = []
        self.references = []

    def forward(self, batch):
        input_ids, attention_mask = batch["input_ids"], batch["attention_mask"]
        outputs = self.model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        return logits

    def training_step(self, batch, batch_idx):
        labels = batch["label"]
        logits = self.forward(batch)
        loss = F.cross_entropy(logits.view(-1, self.num_classes), labels)
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        labels = batch["label"]
        logits = self.forward(batch)
        preds = torch.argmax(logits, dim=1)
        self.predictions.append(preds)
        self.references.append(labels)

    def on_validation_epoch_end(self):
        predictions = torch.concat(self.predictions).view(-1)
        references = torch.concat(self.references).view(-1)
        matthews_correlation = self.metric.compute(
            predictions=predictions, references=references
        )

        # self.metric.compute() returns a dictionary:
        # e.g. {"matthews_correlation": 0.53}
        self.log_dict(matthews_correlation, sync_dist=True)
        self.predictions.clear()
        self.references.clear()

    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.lr, eps=self.eps)

## Define a training function

Define a [training function](train-overview-training-function) that includes all of your lightning training logic. {class}`TorchTrainer <ray.train.torch.TorchTrainer>` launches this function on each worker in parallel. 


In [7]:
import ray.train
from ray.train.lightning import (
    prepare_trainer,
    RayDDPStrategy,
    RayLightningEnvironment,
    RayTrainReportCallback,
)

train_func_config = {
    "lr": 1e-5,
    "eps": 1e-8,
    "batch_size": 16,
    "max_epochs": 5,
}

def train_func(config):
    # Unpack the input configs passed from `TorchTrainer(train_loop_config)`
    lr = config["lr"]
    eps = config["eps"]
    batch_size = config["batch_size"]
    max_epochs = config["max_epochs"]

    # Fetch the Dataset shards
    train_ds = ray.train.get_dataset_shard("train")
    val_ds = ray.train.get_dataset_shard("validation")

    # Create a dataloader for Ray Datasets
    train_ds_loader = train_ds.iter_torch_batches(batch_size=batch_size)
    val_ds_loader = val_ds.iter_torch_batches(batch_size=batch_size)

    # Model
    model = SentimentModel(lr=lr, eps=eps)

    trainer = pl.Trainer(
        max_epochs=max_epochs,
        accelerator="auto",
        devices="auto",
        strategy=RayDDPStrategy(),
        plugins=[RayLightningEnvironment()],
        callbacks=[RayTrainReportCallback()],
        enable_progress_bar=False,
    )

    trainer = prepare_trainer(trainer)

    trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader)


To enable distributed training with Ray Train, configure the Lightning Trainer with the following utilities:

- {class}`~ray.train.lightning.RayDDPStrategy`
- {class}`~ray.train.lightning.RayLightningEnvironment`
- {class}`~ray.train.lightning.RayTrainReportCallback`


To ingest Ray Data with Lightning Trainer, follow these three steps:

- Feed the full Ray dataset to Ray `TorchTrainer` (details in the next section).
- Use {meth}`ray.train.get_dataset_shard <ray.train.get_dataset_shard>` to fetch the sharded dataset on each worker.
- Use {meth}`ds.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` to create a Ray data loader for Lightning Trainer.

:::{seealso}

- {ref}`Lightning Quick Start Guide <train-pytorch-lightning>`
- {ref}`User Guides for Ray Data <data-ingest-torch>`

:::

In [8]:
if SMOKE_TEST:
    train_func_config["max_epochs"] = 2
    train_dataset = train_dataset.random_sample(0.1)
    validation_dataset = validation_dataset.random_sample(0.1)

## Distributed training with Ray TorchTrainer

Next, define a {class}`TorchTrainer <ray.train.torch.TorchTrainer>` to launch your training function on 4 GPU workers. 

You can pass the full Ray dataset to the `datasets` argument of ``TorchTrainer``. TorchTrainer automatically shards the datasets among multiple workers.

In [9]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig


# Save the top-2 checkpoints according to the evaluation metric
# The checkpoints and metrics are reported by `RayTrainReportCallback`
run_config = RunConfig(
    name="ptl-sent-classification",
    checkpoint_config=CheckpointConfig(
        num_to_keep=2,
        checkpoint_score_attribute="matthews_correlation",
        checkpoint_score_order="max",
    ),
)

# Schedule four workers for DDP training (1 GPU/worker by default)
scaling_config = ScalingConfig(num_workers=4, use_gpu=True)

trainer = TorchTrainer(
    train_loop_per_worker=train_func,
    train_loop_config=train_func_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_dataset, "validation": validation_dataset}, # <- Feed the Ray Datasets here
)

In [10]:
result = trainer.fit()

0,1
Current time:,2023-08-14 16:51:48
Running for:,00:05:50.88
Memory:,34.5/186.6 GiB

Trial name,status,loc,iter,total time (s),train_loss,matthews_correlation,epoch
TorchTrainer_b723f_00000,TERMINATED,10.0.63.245:150507,5,337.748,0.0199119,0.577705,4


[2m[36m(TrainTrainable pid=150507)[0m 2023-08-14 16:46:02.166995: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
[2m[36m(TrainTrainable pid=150507)[0m To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(TrainTrainable pid=150507)[0m 2023-08-14 16:46:02.306203: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[2m[36m(TrainTrainable pid=150507)[0m 2023-08-14 16:46:03.087593: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7:

(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=150822)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)] -> OutputSplitter[split(4, equal=True)]
[2m[36m(SplitCoordinator pid=150822)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['d4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150618)[0m - This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).[32m [repeated 3x across cluste

(pid=150822) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]



(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150618)[0m LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3][32m [repeated 3x across cluster][0m
[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)][32m [repeated 4x across cluster][0m
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)[32m [repeated 4x across cluster][0m
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`[32m [repeated 5x across cluster][0m


(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=150822)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)] -> OutputSplitter[split(4, equal=True)]
[2m[36m(SplitCoordinator pid=150822)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['d4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150618)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)][32m [repeated 3x across cluster][0m
[2m[36m(RayTrainWorker pid=150618)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory

(pid=150822) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=150621)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150621)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150621)[

(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[1m[36m(autoscaler +2m37s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.


[2m[36m(SplitCoordinator pid=150822)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)] -> OutputSplitter[split(4, equal=True)]
[2m[36m(SplitCoordinator pid=150822)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['d4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150618)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)][32m [repeated 2x across cluster][0m
[2m[36m(RayTrainWorker pid=150618)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory

(pid=150822) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=150621)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150621)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150621)[

(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=150822)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)] -> OutputSplitter[split(4, equal=True)]
[2m[36m(SplitCoordinator pid=150822)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['d4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150618)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)][32m [repeated 2x across cluster][0m
[2m[36m(RayTrainWorker pid=150618)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory

(pid=150822) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=150621)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150621)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150621)[

(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(SplitCoordinator pid=150822)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)] -> OutputSplitter[split(4, equal=True)]
[2m[36m(SplitCoordinator pid=150822)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=['d4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b', 'd4dd34cdb4b35e8b1e0f1ab4187b66ed900ab78de951f03e1125233b'], preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150618)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)][32m [repeated 2x across cluster][0m
[2m[36m(RayTrainWorker pid=150618)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory

(pid=150822) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150620) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=150620)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150620)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150620)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=150621)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(tokenize_sentence)]
[2m[36m(RayTrainWorker pid=150621)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=2000000000.0), locality_with_output=True, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=150621)[

(pid=150621) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150619) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=150618) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

2023-08-14 16:51:48,299	INFO tune.py:1146 -- Total run time: 350.99 seconds (350.87 seconds for the tuning loop).


:::{note}
Note that this examples uses Ray Data for data ingestion for faster preprocessing, but you can also continue to use the native `PyTorch DataLoader` or `LightningDataModule`. See {doc}`Train a Pytorch Lightning Image Classifier <lightning_mnist_example>`. 

:::

In [11]:
result

Result(
  metrics={'train_loss': 0.019911885261535645, 'matthews_correlation': 0.577705364544777, 'epoch': 4, 'step': 670},
  path='/home/ray/ray_results/ptl-sent-classification/TorchTrainer_b723f_00000_0_2023-08-14_16-45-57',
  checkpoint=TorchCheckpoint(local_path=/home/ray/ray_results/ptl-sent-classification/TorchTrainer_b723f_00000_0_2023-08-14_16-45-57/checkpoint_000004)
)

[2m[1m[36m(autoscaler +50m28s)[0m Cluster is terminating (reason: user action).


## See also

* [Ray Train Examples](../../examples) for more use cases

* [Ray Train User Guides](train-user-guides) for how-to guides