(lightgbm-example-ref)=

# Training a model with distributed LightGBM
In this example we will train a model in Ray Train using distributed LightGBM.

Let's start with installing our dependencies:

In [1]:
!pip install -qU "ray[data,train]"


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m23.1.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Then we need some imports:

In [2]:
from typing import Tuple

import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import Categorizer, StandardScaler
from ray.train.lightgbm import LightGBMTrainer
from ray.train import Result, ScalingConfig

  from .autonotebook import tqdm as notebook_tqdm
2023-07-07 14:34:14,951	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-07-07 14:34:15,892	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


Next we define a function to load our train, validation, and test datasets.

In [3]:
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer_with_categorical.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(cols=["target"])
    return train_dataset, valid_dataset, test_dataset

The following function will create a LightGBM trainer, train it, and return the result.

In [4]:
def train_lightgbm(num_workers: int, use_gpu: bool = False) -> Result:
    train_dataset, valid_dataset, _ = prepare_data()

    # Scale some random columns, and categorify the categorical_column,
    # allowing LightGBM to use its built-in categorical feature support
    scaler = StandardScaler(columns=["mean radius", "mean texture"])
    categorizer = Categorizer(["categorical_column"])

    train_dataset = categorizer.fit_transform(scaler.fit_transform(train_dataset))
    valid_dataset = categorizer.transform(scaler.transform(valid_dataset))

    # LightGBM specific params
    params = {
        "objective": "binary",
        "metric": ["binary_logloss", "binary_error"],
    }

    trainer = LightGBMTrainer(
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        label_column="target",
        params=params,
        datasets={"train": train_dataset, "valid": valid_dataset},
        num_boost_round=100,
        metadata = {"scaler_pkl": scaler.serialize(), "categorizer_pkl": categorizer.serialize()}
    )
    result = trainer.fit()
    print(result.metrics)

    return result

Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this.

In [5]:
import pandas as pd
from ray.train import Checkpoint


class Predict:

    def __init__(self, checkpoint: Checkpoint):
        self.model = LightGBMTrainer.get_model(checkpoint)
        self.scaler = Preprocessor.deserialize(checkpoint.get_metadata()["scaler_pkl"])
        self.categorizer = Preprocessor.deserialize(checkpoint.get_metadata()["categorizer_pkl"])

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        preprocessed_batch = self.categorizer.transform_batch(self.scaler.transform_batch(batch))
        return {"predictions": self.model.predict(preprocessed_batch)}


def predict_lightgbm(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict, 
        fn_constructor_args=[result.checkpoint], 
        concurrency=1, 
        batch_format="pandas"
    )
    
    predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    print(f"PREDICTED LABELS")
    predicted_labels.show()

Now we can run the training:

In [6]:
result = train_lightgbm(num_workers=2, use_gpu=False)

0,1
Current time:,2023-07-07 14:34:34
Running for:,00:00:06.06
Memory:,12.2/64.0 GiB

Trial name,status,loc,iter,total time (s),train-binary_logloss,train-binary_error,valid-binary_logloss
LightGBMTrainer_0c5ae_00000,TERMINATED,127.0.0.1:10027,101,4.5829,0.000202293,0,0.130232


[2m[36m(LightGBMTrainer pid=10027)[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.
[2m[36m(LightGBMTrainer pid=10027)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(get_pd_value_counts)]
[2m[36m(LightGBMTrainer pid=10027)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(LightGBMTrainer pid=10027)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(LightGBMTrainer pid=10027)[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
[2m[36m(LightGBMTrainer pid=10027)[0m Executi

[2m[36m(_RemoteRayLightGBMActor pid=10063)[0m [LightGBM] [Info] Trying to bind port 51134...
[2m[36m(_RemoteRayLightGBMActor pid=10063)[0m [LightGBM] [Info] Binding port 51134 succeeded
[2m[36m(_RemoteRayLightGBMActor pid=10063)[0m [LightGBM] [Info] Listening...
[2m[36m(_RemoteRayLightGBMActor pid=10063)[0m [LightGBM] [Info] Connected to rank 0
[2m[36m(_RemoteRayLightGBMActor pid=10063)[0m [LightGBM] [Info] Local rank: 1, total number of machines: 2


2023-07-07 14:34:34,087	INFO tune.py:1148 -- Total run time: 7.18 seconds (6.05 seconds for the tuning loop).


{'train-binary_logloss': 0.00020229312743896637, 'train-binary_error': 0.0, 'valid-binary_logloss': 0.13023245107091222, 'valid-binary_error': 0.023529411764705882, 'time_this_iter_s': 0.021785974502563477, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '0c5ae_00000', 'date': '2023-07-07_14-34-34', 'timestamp': 1688765674, 'time_total_s': 4.582904100418091, 'pid': 10027, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.582904100418091, 'iterations_since_restore': 101, 'experiment_tag': '0'}


And perform inference on the obtained model:

In [7]:
predict_lightgbm(result)

2023-07-07 14:34:36,769	INFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.
2023-07-07 14:34:38,668	INFO dataset.py:2180 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-07-07 14:34:38,674	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(<lambda>)->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-07-07 14:34:38,674	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-07 14:34:38,676	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-07-07 14:34:38,701	INFO actor_

PREDICTED LABELS


                                                                                                                        

{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}


