{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Copyright 2014 Brett Slatkin, Pearson Education Inc.\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License.\n", "\n", "# Preamble to mimick book environment\n", "import logging\n", "from pprint import pprint\n", "from sys import stdout as STDOUT" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 1\n", "def download(item):\n", " return item\n", "\n", "def resize(item):\n", " return item\n", "\n", "def upload(item):\n", " return item" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 2\n", "from threading import Lock\n", "from collections import deque\n", "\n", "class MyQueue(object):\n", " def __init__(self):\n", " self.items = deque()\n", " self.lock = Lock()\n", "\n", "\n", "# Example 3\n", " def put(self, item):\n", " with self.lock:\n", " self.items.append(item)\n", "\n", "\n", "# Example 4\n", " def get(self):\n", " with self.lock:\n", " return self.items.popleft()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 5\n", "from threading import Thread\n", "from time import sleep\n", "\n", "class Worker(Thread):\n", " def __init__(self, func, in_queue, out_queue):\n", " super().__init__()\n", " self.func = func\n", " self.in_queue = in_queue\n", " self.out_queue = out_queue\n", " self.polled_count = 0\n", " self.work_done = 0\n", "\n", "\n", "# Example 6\n", " def run(self):\n", " while True:\n", " self.polled_count += 1\n", " try:\n", " item = self.in_queue.get()\n", " except IndexError:\n", " sleep(0.01) # No work to do\n", " except AttributeError:\n", " # The magic exit signal\n", " return\n", " else:\n", " result = self.func(item)\n", " self.out_queue.put(result)\n", " self.work_done += 1" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 7\n", "download_queue = MyQueue()\n", "resize_queue = MyQueue()\n", "upload_queue = MyQueue()\n", "done_queue = MyQueue()\n", "threads = [\n", " Worker(download, download_queue, resize_queue),\n", " Worker(resize, resize_queue, upload_queue),\n", " Worker(upload, upload_queue, done_queue),\n", "]" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 8\n", "for thread in threads:\n", " thread.start()\n", "for _ in range(1000):\n", " download_queue.put(object())" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 9\n", "import time\n", "while len(done_queue.items) < 1000:\n", " # Do something useful while waiting\n", " time.sleep(0.1)\n", "# Stop all the threads by causing an exception in their\n", "# run methods.\n", "for thread in threads:\n", " thread.in_queue = None" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Processed 1000 items after polling 3031 times\n" ] } ], "source": [ "# Example 10\n", "processed = len(done_queue.items)\n", "polled = sum(t.polled_count for t in threads)\n", "print('Processed', processed, 'items after polling',\n", " polled, 'times')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Consumer waiting\n" ] } ], "source": [ "# Example 11\n", "from queue import Queue\n", "queue = Queue()\n", "\n", "def consumer():\n", " print('Consumer waiting')\n", " queue.get() # Runs after put() below\n", " print('Consumer done')\n", "\n", "thread = Thread(target=consumer)\n", "thread.start()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Producer putting\n", "Consumer done\n", "Producer done\n" ] } ], "source": [ "# Example 12\n", "print('Producer putting')\n", "queue.put(object()) # Runs before get() above\n", "thread.join()\n", "print('Producer done')" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 13\n", "queue = Queue(1) # Buffer size of 1\n", "\n", "def consumer():\n", " time.sleep(0.1) # Wait\n", " queue.get() # Runs second\n", " print('Consumer got 1')\n", " queue.get() # Runs fourth\n", " print('Consumer got 2')\n", "\n", "thread = Thread(target=consumer)\n", "thread.start()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Producer put 1\n", "Consumer got 1\n", "Producer put 2\n", "Consumer got 2\n", "Producer done\n" ] } ], "source": [ "# Example 14\n", "queue.put(object()) # Runs first\n", "print('Producer put 1')\n", "queue.put(object()) # Runs third\n", "print('Producer put 2')\n", "thread.join()\n", "print('Producer done')" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Consumer waiting\n" ] } ], "source": [ "# Example 15\n", "in_queue = Queue()\n", "\n", "def consumer():\n", " print('Consumer waiting')\n", " work = in_queue.get() # Done second\n", " print('Consumer working')\n", " # Doing work\n", " print('Consumer done')\n", " in_queue.task_done() # Done third\n", "\n", "Thread(target=consumer).start()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Producer waiting\n", "Consumer working\n", "Consumer done\n", "Producer done\n" ] } ], "source": [ "# Example 16\n", "in_queue.put(object()) # Done first\n", "print('Producer waiting')\n", "in_queue.join() # Done fourth\n", "print('Producer done')" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 17\n", "class ClosableQueue(Queue):\n", " SENTINEL = object()\n", "\n", " def close(self):\n", " self.put(self.SENTINEL)\n", "\n", "# Example 18\n", " def __iter__(self):\n", " while True:\n", " item = self.get()\n", " try:\n", " if item is self.SENTINEL:\n", " return # Cause the thread to exit\n", " yield item\n", " finally:\n", " self.task_done()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 19\n", "class StoppableWorker(Thread):\n", " def __init__(self, func, in_queue, out_queue):\n", " super().__init__()\n", " self.func = func\n", " self.in_queue = in_queue\n", " self.out_queue = out_queue\n", "\n", " def run(self):\n", " for item in self.in_queue:\n", " result = self.func(item)\n", " self.out_queue.put(result)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 20\n", "download_queue = ClosableQueue()\n", "resize_queue = ClosableQueue()\n", "upload_queue = ClosableQueue()\n", "done_queue = ClosableQueue()\n", "threads = [\n", " StoppableWorker(download, download_queue, resize_queue),\n", " StoppableWorker(resize, resize_queue, upload_queue),\n", " StoppableWorker(upload, upload_queue, done_queue),\n", "]" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Example 21\n", "for thread in threads:\n", " thread.start()\n", "for _ in range(1000):\n", " download_queue.put(object())\n", "download_queue.close()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1000 items finished\n" ] } ], "source": [ "# Example 22\n", "download_queue.join()\n", "resize_queue.close()\n", "resize_queue.join()\n", "upload_queue.close()\n", "upload_queue.join()\n", "print(done_queue.qsize(), 'items finished')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.1" } }, "nbformat": 4, "nbformat_minor": 0 }