In [48]:
import os
import numpy as np
import scipy.sparse as sp
import pandas as pd
from glob import glob

import dask
import dask.bag as db
import joblib

from distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:45838  Dashboard: http://127.0.0.1:8787,Cluster  Workers: 4  Cores: 4  Memory: 10.00 GB


In [37]:
rm -rf sparse_chunks/

In [38]:
folder = 'sparse_chunks'
n_features = int(1e5)
n_informative = int(1e4)

n_chunks = int(1e1)
chunk_size = int(1e2)

rng = np.random.RandomState(42)
true_coef = rng.randn(n_features)
true_coef[n_informative:] = 0


def make_chunk(n_samples, true_coef, chunk_idx, format='csr',
               density=1e-3, noise=1e-1):
    rng = np.random.RandomState(chunk_idx)
    n_features = true_coef.shape[0]
    input_data = sp.rand(n_samples, n_features, format=format,
                         density=density, random_state=rng)
    noise = rng.normal(loc=0, scale=noise, size=n_samples)
    target = input_data.dot(true_coef).ravel() + noise
    return chunk_idx, input_data, (target > 0).astype(np.int32)


def save_to_disk(chunk_idx, X, y, folder='sparse_chunks'):
    os.makedirs(folder, exist_ok=True)
    filename = "sparse_chunk_{:04d}.pkl".format(chunk_idx)
    joblib.dump((X, y), os.path.join(folder, filename))
    return filename


def load_from_disk(chunk_idx, filename):
    X, y = joblib.load(filename)
    return chunk_idx, X, y

In [49]:
if not os.path.exists(folder):
    print("Generating chunks of sparse data into", folder)
    b = db.from_sequence([(chunk_size, true_coef, i)
                          for i in range(n_chunks)])
    b = b.starmap(make_chunk).starmap(save_to_disk).compute()


    
print("Lazy loading chunks from", folder)
b = db.from_sequence(enumerate(sorted(glob('sparse_chunks/*.pkl'))))
b = b.starmap(load_from_disk)

Lazy loading chunks from sparse_chunks


In [50]:
%time b = b.persist()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 9.53 ms


In [51]:
len(b.compute())

10

In [52]:
%%time
chunk_idx, X_0, y_0 = b.take(1)[0]

CPU times: user 16 ms, sys: 4 ms, total: 20 ms
Wall time: 25.8 ms


In [53]:
chunk_idx

0

In [54]:
X_0

<100x100000 sparse matrix of type '<class 'numpy.float64'>'
	with 10000 stored elements in Compressed Sparse Row format>

In [55]:
np.mean((X_0.dot(true_coef).ravel() > 0) == y_0)

0.97999999999999998

## L1-penalized Logistic Regression with SGD

In [56]:
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split
from dask import delayed

CLASSES = np.array([0, 1])


def scan_fit(model, chunk):
    return model.partial_fit(*chunk, classes=CLASSES)


def score(model, chunk):
    return model.score(*chunk)


all_filenames = sorted(glob('sparse_chunks/*.pkl'))
train_filenames, test_filenames = train_test_split(
    all_filenames, random_state=0)

model = SGDClassifier(loss='log', alpha=1e-3, penalty='elasticnet', tol=0)

for i in range(20):
    for filename in train_filenames:
        chunk = delayed(joblib.load)(filename)
        model = delayed(scan_fit)(model, chunk)


scores = [delayed(score)(model, delayed(joblib.load)(filename))
          for filename in test_filenames]
    
%time scores, model = dask.compute(scores, model)
np.mean(scores), np.std(scores), np.mean(model.coef_ != 0)

CPU times: user 760 ms, sys: 64 ms, total: 824 ms
Wall time: 2.75 s


(0.53333333333333333, 0.032998316455372205, 0.46405999999999997)

In [57]:
model = SGDClassifier(loss='log', penalty='l1', max_iter=1)

def scan_fit(model, next_chunk):
    chunk_idx, X, y = next_chunk
    return model.partial_fit(X, y, classes=[0, 1])

b.accumulate(scan_fit, initial=model).to_delayed()[-1].compute(get=dask.get)[0]

TypeError: 'Future' object is not iterable