# PyCaret Fugue Integration

[Fugue](https://github.com/fugue-project/fugue) is a low-code unified interface for different computing frameworks such as Spark, Dask and Pandas. PyCaret is using Fugue to support distributed computing scenarios.

## Hello World

### Classification

Let's start with the most standard example, the code is exactly the same as the local version, there is no magic.

In [1]:
from pycaret.datasets import get_data
from pycaret.classification import *

setup(data=get_data("juice", verbose=False), target = 'Purchase', n_jobs=1)

test_models = models().index.tolist()[:5]

Unnamed: 0,Description,Value
0,Session id,4292
1,Target,Purchase
2,Target type,Binary
3,Target mapping,"CH: 0, MM: 1"
4,Original data shape,"(1070, 19)"
5,Transformed data shape,"(1070, 19)"
6,Transformed train set shape,"(748, 19)"
7,Transformed test set shape,"(322, 19)"
8,Ordinal features,1
9,Numeric features,17


`compare_model` is also exactly the same if you don't want to use a distributed system

In [2]:
compare_models(include=test_models, n_select=2)

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.833,0.8975,0.7532,0.8097,0.7791,0.6451,0.6475,0.327
dt,Decision Tree Classifier,0.7715,0.7625,0.7224,0.7058,0.7106,0.5224,0.5256,0.078
nb,Naive Bayes,0.7608,0.8337,0.7802,0.6693,0.7179,0.5129,0.5206,0.078
knn,K Neighbors Classifier,0.7594,0.7989,0.6093,0.7323,0.662,0.4782,0.4856,0.108
svm,SVM - Linear Kernel,0.4881,0.0,0.759,0.3346,0.4628,0.0615,0.1061,0.059


Processing:   0%|          | 0/26 [00:00<?, ?it/s]

[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_samples_leaf=1,
                        min_samples_split=2, min_weight_fraction_leaf=0.0,
                        random_state=4292, splitter='best')]

Now let's make it distributed, as a toy case, on dask. The only thing changed is an additional parameter `parallel_backend`

In [4]:
from pycaret.parallel import FugueBackend

compare_models(include=test_models, n_select=2, parallel=FugueBackend("dask"))

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.833,0.8975,0.7532,0.8097,0.7791,0.6451,0.6475,0.214
dt,Decision Tree Classifier,0.7715,0.7625,0.7224,0.7058,0.7106,0.5224,0.5256,0.078
nb,Naive Bayes,0.7608,0.8337,0.7802,0.6693,0.7179,0.5129,0.5206,0.209
knn,K Neighbors Classifier,0.7594,0.7989,0.6093,0.7323,0.662,0.4782,0.4856,0.134
svm,SVM - Linear Kernel,0.4881,0.0,0.759,0.3346,0.4628,0.0615,0.1061,0.058


[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_samples_leaf=1,
                        min_samples_split=2, min_weight_fraction_leaf=0.0,
                        random_state=4292, splitter='best')]

In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a `SparkSession`, let's initialize a local Spark session

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now just change `parallel_backend` to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark

In [6]:
compare_models(include=test_models, n_select=2, parallel=FugueBackend(spark))

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.833,0.8975,0.7532,0.8097,0.7791,0.6451,0.6475,0.678
dt,Decision Tree Classifier,0.7715,0.7625,0.7224,0.7058,0.7106,0.5224,0.5256,0.208
nb,Naive Bayes,0.7608,0.8337,0.7802,0.6693,0.7179,0.5129,0.5206,0.213
knn,K Neighbors Classifier,0.7594,0.7989,0.6093,0.7323,0.662,0.4782,0.4856,0.573
svm,SVM - Linear Kernel,0.4881,0.0,0.759,0.3346,0.4628,0.0615,0.1061,0.059


[LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=4292, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False),
 DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_samples_leaf=1,
                        min_samples_split=2, min_weight_fraction_leaf=0.0,
                        random_state=4292, splitter='best')]

In the end, you can `pull` to get the metrics table

In [7]:
pull()

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lr,Logistic Regression,0.833,0.8975,0.7532,0.8097,0.7791,0.6451,0.6475,0.678
dt,Decision Tree Classifier,0.7715,0.7625,0.7224,0.7058,0.7106,0.5224,0.5256,0.208
nb,Naive Bayes,0.7608,0.8337,0.7802,0.6693,0.7179,0.5129,0.5206,0.213
knn,K Neighbors Classifier,0.7594,0.7989,0.6093,0.7323,0.662,0.4782,0.4856,0.573
svm,SVM - Linear Kernel,0.4881,0.0,0.759,0.3346,0.4628,0.0615,0.1061,0.059


### Regression

It follows the same pattern as classification.

In [7]:
from pycaret.datasets import get_data
from pycaret.regression import *

setup(data=get_data("insurance", verbose=False), target = 'charges', n_jobs=1)

test_models = models().index.tolist()[:5]

Unnamed: 0,Description,Value
0,Session id,3514
1,Target,charges
2,Target type,Regression
3,Data shape,"(1338, 10)"
4,Train data shape,"(936, 10)"
5,Test data shape,"(402, 10)"
6,Ordinal features,2
7,Numeric features,3
8,Categorical features,3
9,Preprocess,True


`compare_model` is also exactly the same if you don't want to use a distributed system

In [8]:
compare_models(include=test_models, n_select=2, sort="MAE")

Unnamed: 0,Model,MAE,MSE,RMSE,R2,RMSLE,MAPE,TT (Sec)
lar,Least Angle Regression,4215.375,36942784.9091,6056.6512,0.7412,0.5944,0.4301,0.054
lr,Linear Regression,4216.0692,36946939.1774,6057.0115,0.7412,0.5956,0.4303,0.154
lasso,Lasso Regression,4216.0766,36944721.4684,6056.8051,0.7412,0.5943,0.4303,0.059
ridge,Ridge Regression,4226.7264,36949983.8412,6057.125,0.7413,0.5923,0.4319,0.055
en,Elastic Net,7260.0035,90321787.1218,9448.8041,0.3861,0.7217,0.8981,0.054


Processing:   0%|          | 0/26 [00:00<?, ?it/s]

[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
      jitter=None, n_nonzero_coefs=500, normalize='deprecated',
      precompute='auto', random_state=3514, verbose=False),
 LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
                  normalize='deprecated', positive=False)]

Now let's make it distributed, as a toy case, on dask. The only thing changed is an additional parameter `parallel_backend`

In [9]:
from pycaret.parallel import FugueBackend

compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend("dask"))

Unnamed: 0,Model,MAE,MSE,RMSE,R2,RMSLE,MAPE,TT (Sec)
lar,Least Angle Regression,4215.375,36942780.0,6056.6512,0.7412,0.5944,0.4301,0.055
lr,Linear Regression,4216.0692,36946940.0,6057.0115,0.7412,0.5956,0.4303,0.054
lasso,Lasso Regression,4216.0766,36944720.0,6056.8051,0.7412,0.5943,0.4303,0.056
ridge,Ridge Regression,4226.7264,36949980.0,6057.125,0.7413,0.5923,0.4319,0.111
en,Elastic Net,7260.0035,90321790.0,9448.8041,0.3861,0.7217,0.8981,0.236


[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
      jitter=None, n_nonzero_coefs=500, normalize='deprecated',
      precompute='auto', random_state=3514, verbose=False),
 LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
                  normalize='deprecated', positive=False)]

In order to use Spark as the execution engine, you must have access to a Spark cluster, and you must have a `SparkSession`, let's initialize a local Spark session

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now just change `parallel_backend` to this session object, you make it run on Spark. You must understand this is a toy case. In the real situation, you need to have a SparkSession pointing to a real Spark cluster to enjoy the power of Spark

In [12]:
compare_models(include=test_models, n_select=2, sort="MAE", parallel=FugueBackend(spark))

Unnamed: 0,Model,MAE,MSE,RMSE,R2,RMSLE,MAPE,TT (Sec)
lar,Least Angle Regression,4215.375,36942780.0,6056.6512,0.7412,0.5944,0.4301,0.098
lr,Linear Regression,4216.0692,36946940.0,6057.0115,0.7412,0.5956,0.4303,0.1
lasso,Lasso Regression,4216.0766,36944720.0,6056.8051,0.7412,0.5943,0.4303,0.094
ridge,Ridge Regression,4226.7264,36949980.0,6057.125,0.7413,0.5923,0.4319,0.053
en,Elastic Net,7260.0035,90321790.0,9448.8041,0.3861,0.7217,0.8981,0.092


[Lars(copy_X=True, eps=2.220446049250313e-16, fit_intercept=True, fit_path=True,
      jitter=None, n_nonzero_coefs=500, normalize='deprecated',
      precompute='auto', random_state=3514, verbose=False),
 LinearRegression(copy_X=True, fit_intercept=True, n_jobs=1,
                  normalize='deprecated', positive=False)]

In the end, you can `pull` to get the metrics table

In [13]:
pull()

Unnamed: 0,Model,MAE,MSE,RMSE,R2,RMSLE,MAPE,TT (Sec)
lar,Least Angle Regression,4215.375,36942780.0,6056.6512,0.7412,0.5944,0.4301,0.098
lr,Linear Regression,4216.0692,36946940.0,6057.0115,0.7412,0.5956,0.4303,0.1
lasso,Lasso Regression,4216.0766,36944720.0,6056.8051,0.7412,0.5943,0.4303,0.094
ridge,Ridge Regression,4226.7264,36949980.0,6057.125,0.7413,0.5923,0.4319,0.053
en,Elastic Net,7260.0035,90321790.0,9448.8041,0.3861,0.7217,0.8981,0.092


As you see, the results from the distributed versions can be different from your local versions. In the later sections, we will show how to make them identical.

### Time Series

It follows the same pattern as classification.


In [14]:
from pycaret.datasets import get_data
from pycaret.time_series import *

exp = TSForecastingExperiment()
exp.setup(data=get_data('airline', verbose=False), fh=12, fold=3, fig_kwargs={'renderer': 'notebook'}, session_id=42)

test_models = exp.models().index.tolist()[:5]

Unnamed: 0,Description,Value
0,session_id,42
1,Target,Number of airline passengers
2,Approach,Univariate
3,Exogenous Variables,Not Present
4,Original data shape,"(144, 1)"
5,Transformed data shape,"(144, 1)"
6,Transformed train set shape,"(132, 1)"
7,Transformed test set shape,"(12, 1)"
8,Rows with missing values,0.0%
9,Fold Generator,ExpandingWindowSplitter


In [15]:
best_baseline_models = exp.compare_models(include=test_models, n_select=3)
best_baseline_models

Unnamed: 0,Model,MASE,RMSSE,MAE,RMSE,MAPE,SMAPE,R2,TT (Sec)
arima,ARIMA,0.683,0.6735,20.0069,22.2199,0.0501,0.0507,0.8677,0.32
snaive,Seasonal Naive Forecaster,1.1479,1.0945,33.3611,35.9139,0.0832,0.0879,0.6072,0.02
polytrend,Polynomial Trend Forecaster,1.6523,1.9202,48.6301,63.4299,0.117,0.1216,-0.0784,0.0167
naive,Naive Forecaster,2.3599,2.7612,69.0278,91.0322,0.1569,0.1792,-1.2216,1.06
grand_means,Grand Means Forecaster,5.5306,5.2596,162.4117,173.6492,0.4,0.5075,-7.0462,1.27


Processing:   0%|          | 0/27 [00:00<?, ?it/s]

[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0,
       scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12),
       with_intercept=True),
 NaiveForecaster(sp=12, strategy='last', window_length=None),
 PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]

In [16]:
from pycaret.parallel import FugueBackend

best_baseline_models = exp.compare_models(include=test_models, n_select=3, parallel=FugueBackend("dask"))
best_baseline_models

Unnamed: 0,Model,MASE,RMSSE,MAE,RMSE,MAPE,SMAPE,R2,TT (Sec)
arima,ARIMA,0.683,0.6735,20.0069,22.2199,0.0501,0.0507,0.8677,0.1267
snaive,Seasonal Naive Forecaster,1.1479,1.0945,33.3611,35.9139,0.0832,0.0879,0.6072,0.0367
polytrend,Polynomial Trend Forecaster,1.6523,1.9202,48.6301,63.4299,0.117,0.1216,-0.0784,0.0133
naive,Naive Forecaster,2.3599,2.7612,69.0278,91.0322,0.1569,0.1792,-1.2216,0.02
grand_means,Grand Means Forecaster,5.5306,5.2596,162.4117,173.6492,0.4,0.5075,-7.0462,0.0233


[ARIMA(maxiter=50, method='lbfgs', order=(1, 0, 0), out_of_sample_size=0,
       scoring='mse', scoring_args=None, seasonal_order=(0, 1, 0, 12),
       with_intercept=True),
 NaiveForecaster(sp=12, strategy='last', window_length=None),
 PolynomialTrendForecaster(degree=1, regressor=None, with_intercept=True)]

In [17]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [18]:
from pycaret.parallel import FugueBackend

best_baseline_models = exp.compare_models(include=test_models[:2], n_select=3, parallel=FugueBackend(spark))
best_baseline_models

Unnamed: 0,Model,MASE,RMSSE,MAE,RMSE,MAPE,SMAPE,R2,TT (Sec)
naive,Naive Forecaster,2.3599,2.7612,69.0278,91.0322,0.1569,0.1792,-1.2216,2.56
grand_means,Grand Means Forecaster,5.5306,5.2596,162.4117,173.6492,0.4,0.5075,-7.0462,2.5267


[NaiveForecaster(sp=1, strategy='last', window_length=None),
 NaiveForecaster(sp=1, strategy='mean', window_length=None)]

In [19]:
exp.pull()

Unnamed: 0,Model,MASE,RMSSE,MAE,RMSE,MAPE,SMAPE,R2,TT (Sec)
naive,Naive Forecaster,2.3599,2.7612,69.0278,91.0322,0.1569,0.1792,-1.2216,2.56
grand_means,Grand Means Forecaster,5.5306,5.2596,162.4117,173.6492,0.4,0.5075,-7.0462,2.5267


## A more practical case

The above examples are pure toys, to make things work perfectly in a distributed system you must be careful about a few things

### Use a lambda instead of a dataframe in setup

If you directly provide a dataframe in `setup`, this dataset will need to be sent to all worker nodes. If the dataframe is 1G, you have 100 workers, then it is possible your dirver machine will need to send out up to 100G data (depending on specific framework's implementation), then this data transfer becomes a bottleneck itself. Instead, if you provide a lambda function, it doesn't change the local compute scenario, but the driver will only send the function reference to workers, and each worker will be responsible to load the data by themselves, so there is no heavy traffic on the driver side.

### Be deterministic

You should always use `session_id` to make the distributed compute deterministic.

### Set n_jobs

It is important to be explicit on n_jobs when you want to run something distributedly, so it will not overuse the local/remote resources. This can also avoid resrouce contention, and make the compute faster.

In [1]:
from pycaret.datasets import get_data
from pycaret.classification import *

setup(data_func=lambda: get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=1);

Unnamed: 0,Description,Value
0,Session id,0
1,Target,Purchase
2,Target type,Binary
3,Target mapping,"CH: 0, MM: 1"
4,Original data shape,"(1070, 19)"
5,Transformed data shape,"(1070, 19)"
6,Transformed train set shape,"(748, 19)"
7,Transformed test set shape,"(322, 19)"
8,Ordinal features,1
9,Numeric features,17


### Set the appropriate batch_size

`batch_size` parameter helps adjust between load balence and overhead. For each batch, setup will be called only once. So

| Choice |Load Balance|Overhead|Best Scenario|
|---|---|---|---|
|Smaller batch size|Better|Worse|`training time >> data loading time` or `models ~= workers`|
|Larger batch size|Worse|Better|`training time << data loading time` or `models >> workers`|

The default value is set to `1`, meaning we want the best load balance.

### Display progress

In development, you can enable visual effect by `display_remote=True`, but meanwhile you must also enable [Fugue Callback](https://fugue-tutorials.readthedocs.io/tutorials/advanced/rpc.html) so that the driver can monitor worker progress. But it is recommended to turn off display in production.

In [9]:
from pycaret.parallel import FugueBackend

fconf = {
    "fugue.rpc.server": "fugue.rpc.flask.FlaskRPCServer",  # keep this value
    "fugue.rpc.flask_server.host": "0.0.0.0",  # the driver ip address workers can access
    "fugue.rpc.flask_server.port": "3333",  # the open port on the dirver
    "fugue.rpc.flask_server.timeout": "2 sec",  # the timeout for worker to talk to driver
}

be = FugueBackend("dask", fconf, display_remote=True, batch_size=3, top_only=False)
compare_models(n_select=2, parallel=be)

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,DUMMY,DUMMY2,TT (Sec)
ridge,Ridge Classifier,0.8383,0.0,0.7802,0.8085,0.7896,0.6585,0.6637,0.0,0.0,0.099
lda,Linear Discriminant Analysis,0.8329,0.8986,0.7701,0.8044,0.7824,0.6472,0.6522,0.0,1.0,0.132
lr,Logistic Regression,0.8303,0.8959,0.753,0.8053,0.7748,0.6391,0.6433,0.0,1.0,0.271
gbc,Gradient Boosting Classifier,0.8195,0.8982,0.7562,0.787,0.7656,0.6193,0.626,0.0,1.0,0.263
lightgbm,Light Gradient Boosting Machine,0.8047,0.8828,0.7492,0.7585,0.7482,0.5893,0.595,0.0,1.0,0.128
ada,Ada Boost Classifier,0.7968,0.8789,0.7326,0.7499,0.7388,0.5727,0.5751,0.0,1.0,0.178
rf,Random Forest Classifier,0.7955,0.8731,0.7256,0.75,0.7338,0.5682,0.5727,0.0,1.0,0.243
dt,Decision Tree Classifier,0.7795,0.7711,0.7328,0.7168,0.7201,0.5389,0.5441,0.0,1.0,0.082
et,Extra Trees Classifier,0.7714,0.8479,0.6951,0.7213,0.7038,0.5183,0.5225,0.0,1.0,0.214
nb,Naive Bayes,0.7621,0.8255,0.7255,0.6825,0.7009,0.5039,0.5074,0.0,1.0,0.08


Processing:   0%|          | 0/14 [00:00<?, ?it/s]

[RidgeClassifier(alpha=1.0, class_weight=None, copy_X=True, fit_intercept=True,
                 max_iter=None, normalize='deprecated', positive=False,
                 random_state=0, solver='auto', tol=0.001),
 LinearDiscriminantAnalysis(covariance_estimator=None, n_components=None,
                            priors=None, shrinkage=None, solver='svd',
                            store_covariance=False, tol=0.0001)]

### Custom Metrics

You can add custom metrics like before. But in order to make the scorer distributable, it must be serializable. A common function should be fine, but if inside the function, it is using some global variables that are not serializable (for example an `RLock` object), it can cause issues. So try to make the custom function independent from global variables.

In [3]:
def score_dummy(y_true, y_pred, axis=0):
    return 0.0

add_metric(id = 'mydummy',
               name = 'DUMMY',
               score_func = score_dummy,
               target = 'pred',
               greater_is_better = False,
              )

Name                                                             DUMMY
Display Name                                                     DUMMY
Score Function                <function score_dummy at 0x7f8aa0dc0ca0>
Scorer               make_scorer(score_dummy, greater_is_better=False)
Target                                                            pred
Args                                                                {}
Greater is Better                                                False
Multiclass                                                        True
Custom                                                            True
Name: mydummy, dtype: object

Adding a function in a class instance is also ok, but make sure all member variables in the class are serializable.

In [4]:
test_models = models().index.tolist()[:5]
compare_models(include=test_models, n_select=2, sort="DUMMY", parallel=FugueBackend("dask"))

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,DUMMY,TT (Sec)
dt,Decision Tree Classifier,0.7795,0.7711,0.7328,0.7168,0.7201,0.5389,0.5441,0.0,0.24
lr,Logistic Regression,0.8303,0.8959,0.753,0.8053,0.7748,0.6391,0.6433,0.0,0.306
nb,Naive Bayes,0.7621,0.8255,0.7255,0.6825,0.7009,0.5039,0.5074,0.0,0.13
knn,K Neighbors Classifier,0.7528,0.8053,0.6231,0.7208,0.6642,0.4703,0.477,0.0,0.097
svm,SVM - Linear Kernel,0.5677,0.0,0.269,0.2077,0.1901,0.029,0.0396,0.0,0.102


[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_samples_leaf=1,
                        min_samples_split=2, min_weight_fraction_leaf=0.0,
                        random_state=0, splitter='best'),
 LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False)]

In [5]:
pull()

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,DUMMY,TT (Sec)
dt,Decision Tree Classifier,0.7795,0.7711,0.7328,0.7168,0.7201,0.5389,0.5441,0.0,0.24
lr,Logistic Regression,0.8303,0.8959,0.753,0.8053,0.7748,0.6391,0.6433,0.0,0.306
nb,Naive Bayes,0.7621,0.8255,0.7255,0.6825,0.7009,0.5039,0.5074,0.0,0.13
knn,K Neighbors Classifier,0.7528,0.8053,0.6231,0.7208,0.6642,0.4703,0.477,0.0,0.097
svm,SVM - Linear Kernel,0.5677,0.0,0.269,0.2077,0.1901,0.029,0.0396,0.0,0.102


In [6]:
class Scores:
    def score_dummy2(self, y_true, y_prob, axis=0):
        return 1.0
    
scores = Scores()

add_metric(id = 'mydummy2',
               name = 'DUMMY2',
               score_func = scores.score_dummy2,
               target = 'pred_proba',
               greater_is_better = True,
              )

Name                                                            DUMMY2
Display Name                                                    DUMMY2
Score Function       <bound method Scores.score_dummy2 of <__main__...
Scorer               make_scorer(score_dummy2, needs_proba=True, er...
Target                                                      pred_proba
Args                                                                {}
Greater is Better                                                 True
Multiclass                                                        True
Custom                                                            True
Name: mydummy2, dtype: object

In [7]:
compare_models(include=test_models, n_select=2, sort="DUMMY2", parallel=FugueBackend("dask"))

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,DUMMY,DUMMY2,TT (Sec)
dt,Decision Tree Classifier,0.7795,0.7711,0.7328,0.7168,0.7201,0.5389,0.5441,0.0,1.0,0.237
lr,Logistic Regression,0.8303,0.8959,0.753,0.8053,0.7748,0.6391,0.6433,0.0,1.0,0.399
nb,Naive Bayes,0.7621,0.8255,0.7255,0.6825,0.7009,0.5039,0.5074,0.0,1.0,0.077
knn,K Neighbors Classifier,0.7528,0.8053,0.6231,0.7208,0.6642,0.4703,0.477,0.0,1.0,0.082
svm,SVM - Linear Kernel,0.5677,0.0,0.269,0.2077,0.1901,0.029,0.0396,0.0,0.0,0.104


[DecisionTreeClassifier(ccp_alpha=0.0, class_weight=None, criterion='gini',
                        max_depth=None, max_features=None, max_leaf_nodes=None,
                        min_impurity_decrease=0.0, min_samples_leaf=1,
                        min_samples_split=2, min_weight_fraction_leaf=0.0,
                        random_state=0, splitter='best'),
 LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
                    intercept_scaling=1, l1_ratio=None, max_iter=1000,
                    multi_class='auto', n_jobs=None, penalty='l2',
                    random_state=0, solver='lbfgs', tol=0.0001, verbose=0,
                    warm_start=False)]

In [8]:
pull()

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,DUMMY,DUMMY2,TT (Sec)
dt,Decision Tree Classifier,0.7795,0.7711,0.7328,0.7168,0.7201,0.5389,0.5441,0.0,1.0,0.237
lr,Logistic Regression,0.8303,0.8959,0.753,0.8053,0.7748,0.6391,0.6433,0.0,1.0,0.399
nb,Naive Bayes,0.7621,0.8255,0.7255,0.6825,0.7009,0.5039,0.5074,0.0,1.0,0.077
knn,K Neighbors Classifier,0.7528,0.8053,0.6231,0.7208,0.6642,0.4703,0.477,0.0,1.0,0.082
svm,SVM - Linear Kernel,0.5677,0.0,0.269,0.2077,0.1901,0.029,0.0396,0.0,0.0,0.104


## Notes

### Spark settings

It is highly recommended to have only 1 worker on each Spark executor, so the worker can fully utilize all cpus (set `spark.task.cpus`). Also when you do this you should explicitly set `n_jobs` in `setup` to the number of cpus of each executor.

```python
executor_cores = 4

spark = SparkSession.builder.config("spark.task.cpus", executor_cores).config("spark.executor.cores", executor_cores).getOrCreate()

setup(data=get_data("juice", verbose=False, profile=False), target = 'Purchase', session_id=0, n_jobs=executor_cores)

compare_models(n_select=2, parallel=FugueBackend(spark))
```

### Databricks

On Databricks, `spark` is the magic variable representing a SparkSession. But there is no difference to use. You do the exactly same thing as before:

```python
compare_models(parallel=FugueBackend(spark))
```

But Databricks, the visualization is difficult, so it may be a good idea to do two things:

* Set `verbose` to False in `setup`
* Set `display_remote` to False in `FugueBackend`

### Dask

Dask has fake distributed modes such as the default (multi-thread) and multi-process modes. The default mode will just work fine (but they are actually running sequentially), and multi-process doesn't work for PyCaret for now because it messes up with PyCaret's global variables. On the other hand, any Spark execution mode will just work fine.

### Local Parallelization

For practical use where you try non-trivial data and models, local parallelization (The eaiest way is to use local Dask as backend as shown above) normally doesn't have performance advantage. Because it's very easy to overload the CPUS on training, increasing the contention of resources. The value of local parallelization is to verify the code and give you confidence that the distributed environment will provide the expected result with much shorter time.

### How to develop 

Distributed systems are powerful but you must follow some good practices to use them:

1. **From small to large:** initially, you must start with a small set of data, for example in `compare_model` limit the models you want to try to a small number of cheap models, and when you verify they work, you can change to a larger model collection.
2. **From local to distributed:** you should follow this sequence: verify small data locally then verify small data distributedly and then verify large data distributedly. The current design makes the transition seamless. You can do these sequentially: `parallel=None` -> `parallel=FugueBackend()` -> `parallel=FugueBackend(spark)`. In the second step, you can replace with a local SparkSession or local dask.