{ "cells": [ { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 4
  • \n", "
  • Cores: 4
  • \n", "
  • Memory: 10.00 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import os\n", "import numpy as np\n", "import scipy.sparse as sp\n", "import pandas as pd\n", "from glob import glob\n", "\n", "import dask\n", "import dask.bag as db\n", "import joblib\n", "\n", "from distributed import Client\n", "client = Client()\n", "client" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [], "source": [ "rm -rf sparse_chunks/" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "folder = 'sparse_chunks'\n", "n_features = int(1e5)\n", "n_informative = int(1e4)\n", "\n", "n_chunks = int(1e1)\n", "chunk_size = int(1e2)\n", "\n", "rng = np.random.RandomState(42)\n", "true_coef = rng.randn(n_features)\n", "true_coef[n_informative:] = 0\n", "\n", "\n", "def make_chunk(n_samples, true_coef, chunk_idx, format='csr',\n", " density=1e-3, noise=1e-1):\n", " rng = np.random.RandomState(chunk_idx)\n", " n_features = true_coef.shape[0]\n", " input_data = sp.rand(n_samples, n_features, format=format,\n", " density=density, random_state=rng)\n", " noise = rng.normal(loc=0, scale=noise, size=n_samples)\n", " target = input_data.dot(true_coef).ravel() + noise\n", " return chunk_idx, input_data, (target > 0).astype(np.int32)\n", "\n", "\n", "def save_to_disk(chunk_idx, X, y, folder='sparse_chunks'):\n", " os.makedirs(folder, exist_ok=True)\n", " filename = \"sparse_chunk_{:04d}.pkl\".format(chunk_idx)\n", " joblib.dump((X, y), os.path.join(folder, filename))\n", " return filename\n", "\n", "\n", "def load_from_disk(chunk_idx, filename):\n", " X, y = joblib.load(filename)\n", " return chunk_idx, X, y" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Lazy loading chunks from sparse_chunks\n" ] } ], "source": [ "if not os.path.exists(folder):\n", " print(\"Generating chunks of sparse data into\", folder)\n", " b = db.from_sequence([(chunk_size, true_coef, i)\n", " for i in range(n_chunks)])\n", " b = b.starmap(make_chunk).starmap(save_to_disk).compute()\n", "\n", "\n", " \n", "print(\"Lazy loading chunks from\", folder)\n", "b = db.from_sequence(enumerate(sorted(glob('sparse_chunks/*.pkl'))))\n", "b = b.starmap(load_from_disk)" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8 ms, sys: 0 ns, total: 8 ms\n", "Wall time: 9.53 ms\n" ] } ], "source": [ "%time b = b.persist()" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "10" ] }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(b.compute())" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 16 ms, sys: 4 ms, total: 20 ms\n", "Wall time: 25.8 ms\n" ] } ], "source": [ "%%time\n", "chunk_idx, X_0, y_0 = b.take(1)[0]" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0" ] }, "execution_count": 53, "metadata": {}, "output_type": "execute_result" } ], "source": [ "chunk_idx" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "<100x100000 sparse matrix of type ''\n", "\twith 10000 stored elements in Compressed Sparse Row format>" ] }, "execution_count": 54, "metadata": {}, "output_type": "execute_result" } ], "source": [ "X_0" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.97999999999999998" ] }, "execution_count": 55, "metadata": {}, "output_type": "execute_result" } ], "source": [ "np.mean((X_0.dot(true_coef).ravel() > 0) == y_0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## L1-penalized Logistic Regression with SGD" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 760 ms, sys: 64 ms, total: 824 ms\n", "Wall time: 2.75 s\n" ] }, { "data": { "text/plain": [ "(0.53333333333333333, 0.032998316455372205, 0.46405999999999997)" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from sklearn.linear_model import SGDClassifier\n", "from sklearn.model_selection import train_test_split\n", "from dask import delayed\n", "\n", "CLASSES = np.array([0, 1])\n", "\n", "\n", "def scan_fit(model, chunk):\n", " return model.partial_fit(*chunk, classes=CLASSES)\n", "\n", "\n", "def score(model, chunk):\n", " return model.score(*chunk)\n", "\n", "\n", "all_filenames = sorted(glob('sparse_chunks/*.pkl'))\n", "train_filenames, test_filenames = train_test_split(\n", " all_filenames, random_state=0)\n", "\n", "model = SGDClassifier(loss='log', alpha=1e-3, penalty='elasticnet', tol=0)\n", "\n", "for i in range(20):\n", " for filename in train_filenames:\n", " chunk = delayed(joblib.load)(filename)\n", " model = delayed(scan_fit)(model, chunk)\n", "\n", "\n", "scores = [delayed(score)(model, delayed(joblib.load)(filename))\n", " for filename in test_filenames]\n", " \n", "%time scores, model = dask.compute(scores, model)\n", "np.mean(scores), np.std(scores), np.mean(model.coef_ != 0)" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [ { "ename": "TypeError", "evalue": "'Future' object is not iterable", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mmodel\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpartial_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mX\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mclasses\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 7\u001b[0;31m \u001b[0mb\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mscan_fit\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmodel\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mto_delayed\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdask\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m~/code/dask/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 96\u001b[0m \u001b[0mExtra\u001b[0m \u001b[0mkeywords\u001b[0m \u001b[0mto\u001b[0m \u001b[0mforward\u001b[0m \u001b[0mto\u001b[0m \u001b[0mthe\u001b[0m \u001b[0mscheduler\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m`\u001b[0m\u001b[0;31m`\u001b[0m\u001b[0mget\u001b[0m\u001b[0;31m`\u001b[0m\u001b[0;31m`\u001b[0m \u001b[0mfunction\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 97\u001b[0m \"\"\"\n\u001b[0;32m---> 98\u001b[0;31m \u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraverse\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 99\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 100\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 203\u001b[0m \u001b[0mdsk\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcollections_to_dsk\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mvariables\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moptimize_graph\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 204\u001b[0m \u001b[0mkeys\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mvar\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_keys\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mvar\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mvariables\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 205\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 206\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 207\u001b[0m \u001b[0mresults_iter\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0miter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresults\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mget_sync\u001b[0;34m(dsk, keys, **kwargs)\u001b[0m\n\u001b[1;32m 560\u001b[0m \"\"\"\n\u001b[1;32m 561\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'num_workers'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# if num_workers present, remove it\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 562\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mget_async\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mapply_sync\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 563\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 564\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mget_async\u001b[0;34m(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)\u001b[0m\n\u001b[1;32m 506\u001b[0m \u001b[0;31m# Seed initial tasks into the thread pool\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 507\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0mstate\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'ready'\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;32mand\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mstate\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m'running'\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m<\u001b[0m \u001b[0mnum_workers\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 508\u001b[0;31m \u001b[0mfire_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 509\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 510\u001b[0m \u001b[0;31m# Main loop, wait on tasks to finish, insert new ones\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mfire_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 502\u001b[0m args=(key, dumps((dsk[key], data)),\n\u001b[1;32m 503\u001b[0m dumps, loads, get_id, pack_exception),\n\u001b[0;32m--> 504\u001b[0;31m callback=queue.put)\n\u001b[0m\u001b[1;32m 505\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 506\u001b[0m \u001b[0;31m# Seed initial tasks into the thread pool\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mapply_sync\u001b[0;34m(func, args, kwds, callback)\u001b[0m\n\u001b[1;32m 549\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mapply_sync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwds\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m{\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcallback\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 550\u001b[0m \u001b[0;34m\"\"\" A naive synchronous version of apply_async \"\"\"\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 551\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwds\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 552\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcallback\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 553\u001b[0m \u001b[0mcallback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mres\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mexecute_task\u001b[0;34m(key, task_info, dumps, loads, get_id, pack_exception)\u001b[0m\n\u001b[1;32m 293\u001b[0m \u001b[0mfailed\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 294\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mBaseException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 295\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mpack_exception\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdumps\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 296\u001b[0m \u001b[0mfailed\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 297\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mkey\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfailed\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36mexecute_task\u001b[0;34m(key, task_info, dumps, loads, get_id, pack_exception)\u001b[0m\n\u001b[1;32m 288\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 289\u001b[0m \u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdata\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mloads\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask_info\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 290\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdata\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 291\u001b[0m \u001b[0mid\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mget_id\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 292\u001b[0m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdumps\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mid\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/local.py\u001b[0m in \u001b[0;36m_execute_task\u001b[0;34m(arg, cache, dsk)\u001b[0m\n\u001b[1;32m 269\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 270\u001b[0m \u001b[0margs2\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0ma\u001b[0m \u001b[0;32min\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 271\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 272\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mishashable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 273\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/code/dask/dask/bag/core.py\u001b[0m in \u001b[0;36maccumulate_part\u001b[0;34m(binop, seq, initial, is_first)\u001b[0m\n\u001b[1;32m 1273\u001b[0m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbinop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1274\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1275\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0maccumulate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mbinop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mseq\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0minitial\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1276\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mis_first\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1277\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;34m-\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mres\u001b[0m \u001b[0;32melse\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/.virtualenvs/py36/lib/python3.6/site-packages/toolz/itertoolz.py\u001b[0m in \u001b[0;36maccumulate\u001b[0;34m(binop, seq, initial)\u001b[0m\n\u001b[1;32m 56\u001b[0m \u001b[0mitertools\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0maccumulate\u001b[0m \u001b[0;34m:\u001b[0m \u001b[0mIn\u001b[0m \u001b[0mstandard\u001b[0m \u001b[0mitertools\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mPython\u001b[0m \u001b[0;36m3.2\u001b[0m\u001b[0;34m+\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 57\u001b[0m \"\"\"\n\u001b[0;32m---> 58\u001b[0;31m \u001b[0mseq\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0miter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 59\u001b[0m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mnext\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseq\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0minitial\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mno_default\u001b[0m \u001b[0;32melse\u001b[0m \u001b[0minitial\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 60\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mTypeError\u001b[0m: 'Future' object is not iterable" ] } ], "source": [ "model = SGDClassifier(loss='log', penalty='l1', max_iter=1)\n", "\n", "def scan_fit(model, next_chunk):\n", " chunk_idx, X, y = next_chunk\n", " return model.partial_fit(X, y, classes=[0, 1])\n", "\n", "b.accumulate(scan_fit, initial=model).to_delayed()[-1].compute(get=dask.get)[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.1" } }, "nbformat": 4, "nbformat_minor": 2 }