{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": "true" }, "source": [ "

Table of Contents

\n", "
" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# code for loading the format for the notebook\n", "import os\n", "\n", "# path : store the current path to convert back to it later\n", "path = os.getcwd()\n", "os.chdir(os.path.join('..', 'notebook_format'))\n", "\n", "from formats import load_style\n", "load_style(plot_style=False)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Ethen 2018-01-20 11:37:13 \n", "\n", "CPython 3.5.2\n", "IPython 6.2.1\n", "\n", "joblib 0.11\n", "requests 2.18.4\n" ] } ], "source": [ "os.chdir(path)\n", "\n", "# 1. magic to print version\n", "# 2. magic so that the notebook will reload external python modules\n", "%load_ext watermark\n", "%load_ext autoreload\n", "%autoreload 2\n", "\n", "import math\n", "import time\n", "import logging\n", "import requests\n", "import threading\n", "import multiprocessing\n", "import concurrent.futures\n", "from joblib import Parallel, delayed\n", "\n", "%watermark -a 'Ethen' -d -t -v -p joblib,requests" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel Programming in Python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The essence of parallel programming is: If we have two tasks on our hand, task A and task B that does not depend on each other, we can run them simultaneously without having to wait for task A to finish before running running task B. In this tutorial, we're going to take a look at doing parallel computing in Python, we will go through the following:\n", "\n", "- Why is parallelism tricky in Python (hint: it's because of the GIL—the global interpreter lock).\n", "- Threads vs. Processes: Different ways of achieving parallelism. When to use one over the other?\n", "- Parallel vs. Concurrent: Why in some cases we can settle for concurrency rather than parallelism.\n", "- Building a simple but practical example using the various techniques discussed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Global Interpreter Lock" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The **Global Interpreter Lock (GIL)** is one of the most controversial subjects in the Python world. In CPython, the most popular implementation of Python, the GIL is a [mutex (mutually exclusive object)](https://stackoverflow.com/questions/34524/what-is-a-mutex) that makes things thread-safe. The GIL makes it easy to integrate with external libraries that are not thread-safe, and it makes non-parallel code faster. This comes at a cost, though. Due to the GIL, we can't achieve true parallelism via multithreading. Basically, two different native threads of the same process can't run Python code at the same time.\n", "\n", "Things are not that bad, though, and here's why: stuff that happens outside the GIL realm is free to be parallel. In this category fall long-running tasks like I/O and libraries like Numpy (Numpy works around this limitation by running external code in C)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Threads vs. Processes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From the last section, we learned that unlike other programming language, Python is not truly multithreaded as it can't run multiple threads simultaneously on multiple cores/CPUs due to the GIL. Before we move on, let's take a step back and understand what is a thread, what is a process and the distinction between them?\n", "\n", "- A **process** is a program that is in execution. In other words, code that are running (e.g. Jupyter notebook, Google Chrome, Python interpreter). Multiple processes are always running in a computer, and they are executed in parallel.\n", "- A **process can spawn multiple threads (sub-processes)** to handle subtasks. They live inside processes and share the same memory space (they can read and write to the same variables). Ideally, they run in parallel, but not necessarily. The reason why processes aren't enough is because applications need to be responsive and listen for user actions while updating the display and saving a file.\n", "\n", "e.g. Microsoft Word. When we open up Word, we're essentially creating a process (an instance of the program). When we start typing, the process spawns a number of threads: one to read keystrokes, another to display text on the screen, a thread to autosave our file, and yet another to highlight spelling mistakes. By spawning multiple threads, Microsoft takes advantage of \"wasted CPU time\" (waiting for our keystrokes or waiting for a file to save) to provide a smoother user interface and make us more productive.\n", "\n", "Here's a quick comparison table:\n", "\n", "| Process | Threads |\n", "|------------------------------------------------|------------------------------------------------------------------------------|\n", "| Processes don't share memory | Threads share memory |\n", "| Spawning/switching processes is more expensive | Spawning/switching threads requires less resources |\n", "| No memory synchronisation needed | Needs synchronization mechanisms to ensure we're correctly handling the data |\n", "\n", "There isn't a one size fits all solution. Choosing one is greatly dependent on the context and the task we are trying to achieve.\n", "\n", "- **Multiprocessing** can speed up Python operations that are CPU intensive because they benefit from multiple cores/CPUs and avoid the GIL problem.\n", "- **Multithreading** has no benefit in Python for CPU intensive tasks because of GIL problem (this problem is unique to CPython), however, it is often better than multiprocessing at I/O, network operations or other tasks that rely on external systems because the threads can combine their work more efficiently (they exist in the same memory space) while multiprocessing needs to pickle the results and combine them at the end of the work.\n", "\n", "> Side note: do not mistake parallel for concurrent. Remember that only the parallel approach takes advantage of multi-core processors, whereas concurrent programming intelligently schedules tasks so that if a piece of code is waiting on long-running operations, we can run a different but independent part of the code to ensure our CPU is always busy and working. i.e. Only processes achieve true parallelism" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python's Parallel Programming Ecosystem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Python has built-in libraries for doing parallel programming. Here, we'll cover the most popular ones:\n", "\n", "- **threading**: The standard way of working with threads in Python. It is a higher-level API wrapper over the functionality exposed by the `_thread` module, which is a low-level interface over the operating system's thread implementation.\n", "- **multiprocessing**: Offers a very similar interface to the threading module but using processes instead of threads.\n", "- **concurrent.futures**: A module part of the standard library that provides an even higher-level abstraction layer over threading/multiprocessing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### threading" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll first take a look at the threading API. Before we create/initialize a thread, we define a simple function that simply sleeps for a specified amount of time." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def sleeper(n_time):\n", " name = threading.current_thread().name\n", " print('I am {}. Going to sleep for {} seconds'.format(name, n_time))\n", " time.sleep(n_time)\n", " print('{} has woken up from sleep'.format(name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then initialize our thread with the `Thread` class from the `threading` module.\n", "\n", "- `target`: accepts the function that we're going to execute.\n", "- `name`: naming the thread; this allows us to easily differentiate between threads when we have multiple threads.\n", "- `args`: pass in the argument to our function here." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I am thread1. Going to sleep for 2 seconds\n", "thread1 has woken up from sleep\n" ] } ], "source": [ "# we call .start to start executing the function from the thread\n", "n_time = 2\n", "thread = threading.Thread(target = sleeper, name = 'thread1', args = (n_time,))\n", "thread.start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When we run a program and something is sleeping for a few seconds, we would have to wait for that portion to wake up before we can continue with the rest of the program, but the concurrency of threads can bypass this behavior. Suppose we consider the main program as the main thread and our thread as its own separate thread, the code chunk below demonstrates the concurrency property, i.e. we don't have to wait for the calling thread to finish before running the rest of our program." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I am thread2. Going to sleep for 2 seconds\n", "hello\n", "\n", "thread2 has woken up from sleep\n" ] } ], "source": [ "# hello is printed \"before\" the wake up message from the function\n", "thread = threading.Thread(target = sleeper, name = 'thread2', args = (n_time,))\n", "thread.start()\n", "\n", "print()\n", "print('hello')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sometimes, we don't want Python to switch to the main thread until the thread we defined has finished executing its function. To do this, we can use `.join` method, this is essentially what people called the blocking call. It blocks the interpreter from accessing or executing the main program until the thread finishes it task." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I am thread3. Going to sleep for 2 seconds\n", "thread3 has woken up from sleep\n", "\n", "hello\n" ] } ], "source": [ "# hello is printed \"after\" the wake up message from the function\n", "thread = threading.Thread(target = sleeper, name = 'thread3', args = (n_time,))\n", "thread.start()\n", "thread.join()\n", "\n", "print()\n", "print('hello')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following code chunk showcase how to initialize and utilize multiple threads." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "I am thread0. Going to sleep for 2 seconds\n", "I am thread1. Going to sleep for 2 seconds\n", "I am thread2. Going to sleep for 2 seconds\n", "I am thread3. Going to sleep for 2 seconds\n", "I am thread4. Going to sleep for 2 seconds\n", "thread1 has woken up from sleepthread0 has woken up from sleep\n", "thread3 has woken up from sleep\n", "thread2 has woken up from sleep\n", "\n", "thread4 has woken up from sleep\n", "\n", "Elapse time: 2.004049062728882\n" ] } ], "source": [ "n_time = 2\n", "n_threads = 5\n", "start = time.time()\n", "\n", "# create n_threads number of threads and store them in a list\n", "threads = []\n", "for i in range(n_threads):\n", " name = 'thread{}'.format(i)\n", " thread = threading.Thread(target = sleeper, name = name, args = (n_time,))\n", " threads.append(thread)\n", " # we can start the thread while we're creating it, or move\n", " # this to its own loop (as shown below)\n", " thread.start()\n", "\n", "# we could instead start the thread in a separate loop\n", "# for thread in threads:\n", "# thread.start()\n", "\n", "# ensure all threads have finished before executing main program\n", "for thread in threads:\n", " thread.join()\n", "\n", "elapse = time.time() - start\n", "print()\n", "print('Elapse time: ', elapse)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From the result above, we can observe from the elapse time that it doesn't take n_threads * (the time we told the sleep function to sleep) amount of time to finish all the task, which is pretty neat!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### concurrent.futures" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As mentioned previously, the concurrent.futures module is part of the standard library which provides a high level API for launching asynchronous tasks. This module features the Executor class which is an abstract class and it can not be used directly, however, it has two very useful concrete subclasses – `ThreadPoolExecutor` and `ProcessPoolExecutor`. As their names suggest, one uses multithreading and the other one uses multiprocessing as their backend. In both case, we get a pool of threads or processes and we can submit tasks to this pool. The pool would assign tasks to the available resources (threads or pools) and schedule them to run.\n", "\n", "Both executors have a common method – `map()`. Like the built in function, the map method allows multiple calls to a provided function, passing each of the items in an iterable to that function. Except, in this case, the functions are called concurrently.\n", "\n", "From the [documentation](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor): For multiprocessing, this iterable is broken into chunks and each of these chunks is passed to the function in separate processes. We can control the chunk size by passing a third parameter, chunk_size. By default the chunk size is 1. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor, chunksize has no effect." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "112272535095293 is prime: True\n", "112582705942171 is prime: True\n", "112272535095293 is prime: True\n", "115280095190773 is prime: True\n", "115797848077099 is prime: True\n", "1099726899285419 is prime: False\n" ] } ], "source": [ "# example from the documentation page\n", "# https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example\n", "def is_prime(n):\n", " \"\"\"\n", " References\n", " ----------\n", " https://math.stackexchange.com/questions/1343171/why-only-square-root-approach-to-check-number-is-prime\n", " \"\"\"\n", " if n % 2 == 0:\n", " return False\n", "\n", " sqrt_n = int(math.floor(math.sqrt(n)))\n", " for i in range(3, sqrt_n + 1, 2):\n", " if n % i == 0:\n", " return False\n", "\n", " return True\n", "\n", "\n", "PRIMES = [\n", " 112272535095293,\n", " 112582705942171,\n", " 112272535095293,\n", " 115280095190773,\n", " 115797848077099,\n", " 1099726899285419]\n", "\n", "with concurrent.futures.ProcessPoolExecutor() as executor:\n", " for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):\n", " print('{} is prime: {}'.format(number, prime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This quick introduction should do it for now. In the next section, we'll define both an I/O task (reading a file, API calls, scraping URLs) and a CPU intensive task after that we'll benchmark the two tasks by running them serially, using multithreading and using multiprocessing." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def only_sleep():\n", " \"\"\"Wait for a timer to expire\"\"\"\n", " process_name = multiprocessing.current_process().name\n", " thread_name = threading.current_thread().name\n", " print('Process Name: {}, Thread Name: {}'.format(\n", " process_name, thread_name))\n", " \n", " time.sleep(4)\n", "\n", "\n", "def crunch_numbers():\n", " \"\"\"Do some computations \"\"\"\n", " process_name = multiprocessing.current_process().name\n", " thread_name = threading.current_thread().name\n", " print('Process Name: {}, Thread Name: {}'.format(\n", " process_name, thread_name))\n", "\n", " x = 0\n", " while x < 10000000:\n", " x += 1\n", "\n", "\n", "def experiment(target, n_workers):\n", " \"\"\"\n", " run the target function serially, using threads,\n", " using process and output the run time\n", " \"\"\"\n", " # Run tasks serially\n", " start_time = time.time()\n", " for _ in range(n_workers):\n", " target()\n", " \n", " end_time = time.time()\n", " print(\"Serial time=\", end_time - start_time)\n", " print()\n", "\n", " # Run tasks using processes\n", " start_time = time.time()\n", " processes = [multiprocessing.Process(target = target) for _ in range(n_workers)]\n", " for process in processes:\n", " process.start()\n", "\n", " for process in processes:\n", " process.join()\n", "\n", " end_time = time.time()\n", " print(\"Parallel time=\", end_time - start_time)\n", " print()\n", "\n", " # Run tasks using threads\n", " start_time = time.time()\n", " threads = [threading.Thread(target = target) for _ in range(n_workers)]\n", " for thread in threads:\n", " thread.start()\n", "\n", " for thread in threads:\n", " thread.join()\n", "\n", " end_time = time.time()\n", " print(\"Threads time=\", end_time - start_time)" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Serial time= 16.012150049209595\n", "\n", "Process Name: Process-9, Thread Name: MainThread\n", "Process Name: Process-10, Thread Name: MainThread\n", "Process Name: Process-11, Thread Name: MainThread\n", "Process Name: Process-12, Thread Name: MainThread\n", "Parallel time= 4.024165868759155\n", "\n", "Process Name: MainProcess, Thread Name: Thread-5\n", "Process Name: MainProcess, Thread Name: Thread-6\n", "Process Name: MainProcess, Thread Name: Thread-7\n", "Process Name: MainProcess, Thread Name: Thread-8\n", "Threads time= 4.00553297996521\n" ] } ], "source": [ "n_workers = 4\n", "experiment(target = only_sleep, n_workers = n_workers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here are some observations:\n", "\n", "- In the case of the **serial approach**, things are pretty obvious. We're running the tasks one after the other. All four runs are executed by the same thread of the same process.\n", "- Using **processes** we cut the execution time down to a quarter of the original time, simply because the tasks are executed in parallel. Notice how each task is performed in a different process and on the MainThread of that process.\n", "- Using **threads** we take advantage of the fact that the tasks can be executed concurrently. The execution time is also cut down to a quarter, even though nothing is running in parallel. Here's how that goes: we spawn the first thread and it starts waiting for the timer to expire. We pause its execution, letting it wait for the timer to expire, and in this time we spawn the second thread. We repeat this for all the threads. The moment the timer of the first thread expires, we switch execution to it and terminate it. The algorithm is repeated for the second and for all the other threads. At the end, the result is as if things were ran in parallel. Also notice that different threads branch out and live inside the same process: MainProcess.\n", "\n", "You may even notice that the threaded approach is quicker than the truly parallel one. That's due to the overhead of spawning processes. As we noted previously, spawning and switching processes is much more expensive and requires more resources.\n", "\n", "Let's perform the same routine but this time on the crunch_numbers function." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Process Name: MainProcess, Thread Name: MainThread\n", "Serial time= 2.1557538509368896\n", "\n", "Process Name: Process-13, Thread Name: MainThread\n", "Process Name: Process-14, Thread Name: MainThread\n", "Process Name: Process-15, Thread Name: MainThread\n", "Process Name: Process-16, Thread Name: MainThread\n", "Parallel time= 0.6022598743438721\n", "\n", "Process Name: MainProcess, Thread Name: Thread-9\n", "Process Name: MainProcess, Thread Name: Thread-10\n", "Process Name: MainProcess, Thread Name: Thread-11\n", "Process Name: MainProcess, Thread Name: Thread-12\n", "Threads time= 2.8286261558532715\n" ] } ], "source": [ "n_workers = 4\n", "experiment(target = crunch_numbers, n_workers = n_workers)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The main difference here is in the result of the multithreaded approach. This time it performs very similarly to the serial approach, and here's why: since it performs computations and Python doesn't perform real parallelism, the threads are basically running one after the other until they all finish. In fact it might even be slower, as we need to take into account the overhead of launching multiple threads." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Practical Application" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section, we're going to walk through all the different paradigms by building a classic application that checks the uptime of websites. The purpose of these apps is to notify us when our website is down so that we can quickly take action. Here's how they work:\n", "\n", "- The application goes very frequently over a list of website URLs and checks if those websites are up.\n", "- Every website should be checked every 5-10 minutes so that the downtime is not significant.\n", "- Instead of performing a classic HTTP GET request, it performs a [HEAD request](https://ochronus.com/http-head-request-good-uses/) so that it does not affect our traffic significantly.\n", "- If the HTTP status is in the danger ranges (400+, 500+), the owner is notified by email, text-message, or push notification.\n", "\n", "Here's why it's essential to take a parallel/concurrent approach to the problem. As the list of websites grows, going through the list serially won't guarantee us that every website is checked every five minutes or so. The websites could be down for hours, and the owner won't be notified.\n", "\n", "Let's start by writing some utilities:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "def check_website(address):\n", " \"\"\"Utility function: check if a website is down, if so, notify the user\"\"\"\n", " try:\n", " ping_website(address)\n", " except WebsiteDownException:\n", " notify_owner(address)\n", "\n", "\n", "class WebsiteDownException(Exception):\n", " \"\"\"Exception if the website is down\"\"\"\n", " pass\n", "\n", "\n", "def ping_website(address, timeout = 20):\n", " \"\"\"\n", " Check if a website is down. A website is considered down \n", " if either the status_code >= 400 or if the timeout expires\n", "\n", " Throw a WebsiteDownException if any of the website down conditions are met\n", " \"\"\"\n", " try:\n", " response = requests.head(address, timeout = timeout)\n", " if response.status_code >= 400:\n", " logging.warning('Website {} returned status code={}'.format(address, response.status_code))\n", " raise WebsiteDownException()\n", " except requests.exceptions.RequestException:\n", " logging.warning('Timeout expired for website {}'.format(address))\n", " raise WebsiteDownException()\n", "\n", "\n", "def notify_owner(address):\n", " \"\"\" \n", " Send the owner of the address a notification that their website is down \n", "\n", " For now, we're just going to sleep for 0.5 seconds but this is where \n", " you would send an email, push notification or text-message\n", " \"\"\"\n", " logging.info('Notifying the owner of {} website'.format(address))\n", " time.sleep(0.5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we need some actual websites to try our system out. Create your own list or use the ones listed in the next code chunk first for experimentation. Normally, we'd keep this list in a database along with owner contact information so that you can contact them. Since this is not the main topic of this tutorial, and for the sake of simplicity, we're just going to use this Python list.\n", "\n", "You might have noticed two really long domains in the list that are not valid websites. Those domains were added on purpose to be sure we have some websites down on every run." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "WEBSITE_LIST = [\n", " 'http://envato.com',\n", " 'http://amazon.co.uk',\n", " 'http://amazon.com',\n", " 'http://facebook.com',\n", " 'http://google.com',\n", " 'http://google.fr',\n", " 'http://google.es',\n", " 'http://google.co.uk',\n", " 'http://internet.org',\n", " 'http://gmail.com',\n", " 'http://stackoverflow.com',\n", " 'http://github.com',\n", " 'http://heroku.com',\n", " 'http://really-cool-available-domain.com',\n", " 'http://djangoproject.com',\n", " 'http://rubyonrails.org',\n", " 'http://basecamp.com',\n", " 'http://trello.com',\n", " 'http://yiiframework.com',\n", " 'http://shopify.com',\n", " 'http://another-really-interesting-domain.co',\n", " 'http://airbnb.com',\n", " 'http://instagram.com',\n", " 'http://snapchat.com',\n", " 'http://youtube.com',\n", " 'http://baidu.com',\n", " 'http://yahoo.com',\n", " 'http://live.com',\n", " 'http://linkedin.com',\n", " 'http://yandex.ru',\n", " 'http://netflix.com',\n", " 'http://wordpress.com',\n", " 'http://bing.com']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we'll try the serial approach and use this as our baseline." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Timeout expired for website http://really-cool-available-domain.com\n", "WARNING:root:Timeout expired for website http://another-really-interesting-domain.co\n", "WARNING:root:Website http://live.com returned status code=405\n", "WARNING:root:Website http://netflix.com returned status code=405\n", "WARNING:root:Website http://bing.com returned status code=405\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Time for serial: 23.260082006454468 secs\n" ] } ], "source": [ "start_time = time.time()\n", "for address in WEBSITE_LIST:\n", " check_website(address)\n", "\n", "end_time = time.time()\n", "print('Time for serial: {} secs'.format(end_time - start_time))" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Timeout expired for website http://really-cool-available-domain.com\n", "WARNING:root:Timeout expired for website http://another-really-interesting-domain.co\n", "WARNING:root:Website http://live.com returned status code=405\n", "WARNING:root:Website http://bing.com returned status code=405\n", "WARNING:root:Website http://netflix.com returned status code=405\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Time for multithreading: 5.152196884155273 secs\n" ] } ], "source": [ "n_workers = 4\n", "start_time = time.time()\n", "with concurrent.futures.ThreadPoolExecutor(max_workers = n_workers) as executor:\n", " futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}\n", " # more detailed explanation of the wait command\n", " # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait\n", " _ = concurrent.futures.wait(futures)\n", "\n", "end_time = time.time()\n", "print('Time for multithreading: {} secs'.format(end_time - start_time))" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Timeout expired for website http://really-cool-available-domain.com\n", "WARNING:root:Timeout expired for website http://another-really-interesting-domain.co\n", "WARNING:root:Website http://live.com returned status code=405\n", "WARNING:root:Website http://bing.com returned status code=405\n", "WARNING:root:Website http://netflix.com returned status code=405\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Time for multiprocessing: 7.188531875610352 secs\n" ] } ], "source": [ "# process does not result in the same performance gain as thread\n", "start_time = time.time()\n", "with concurrent.futures.ProcessPoolExecutor(max_workers = n_workers) as executor:\n", " futures = {executor.submit(check_website, address) for address in WEBSITE_LIST}\n", " _ = concurrent.futures.wait(futures)\n", "\n", "end_time = time.time() \n", "print('Time for multiprocessing: {} secs'.format(end_time - start_time))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another library for performing parallel programming is `joblib`. This is my personal favorite and it is also used by the machine learning package scikit-learn to perform [hyperparameter search](http://nbviewer.jupyter.org/github/ethen8181/machine-learning/blob/master/model_selection/model_selection.ipynb)." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Timeout expired for website http://really-cool-available-domain.com\n", "WARNING:root:Timeout expired for website http://another-really-interesting-domain.co\n", "WARNING:root:Website http://live.com returned status code=405\n", "WARNING:root:Website http://bing.com returned status code=405\n", "WARNING:root:Website http://netflix.com returned status code=405\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Time for joblib threading: 5.940433979034424 secs\n" ] } ], "source": [ "start_time = time.time()\n", "\n", "# we start off by defining a Parallel class\n", "# the backend uses multiprocessing by default,\n", "# here we change it to threading as the task is I/O bound;\n", "# we can set n_jobs to -1 uses all the available cores\n", "parallel = Parallel(n_jobs = n_workers, backend = 'threading')\n", "\n", "# we wrapped our function with delayed and pass in our list of parameters\n", "result = parallel(delayed(check_website)(address) for address in WEBSITE_LIST)\n", "end_time = time.time() \n", "print('Time for joblib threading: {} secs'.format(end_time - start_time))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is the end of our journey into the world of parallel programming in Python, here are some conclusions we can draw:\n", "\n", "- There are several paradigms that help us achieve high-performance computing in Python.\n", "- Only processes achieve true parallelism, but they are more expensive to create.\n", "- In Python, use process for CPU bound tasks and thread for I/O bound task.\n", "\n", "For a more in-depth introduction of `joblib`, check out the following link. [Blog: A Library for Many Jobs](http://www.admin-magazine.com/HPC/Articles/Parallel-Python-with-Joblib)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Synchronous Versus Asynchronous" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another terminology that we often encounter in this parallel programming space is asynchronous.\n", "\n", "In synchronous programming, the tasks are assigned to a thread one after another, as depicted with the diagram below:\n", "\n", "\n", "\n", "In a multi-threaded environment, we can take up these tasks in parallel.\n", "\n", "\n", "\n", "In contrary to synchronous programming, in the world of asynchronous programming, once a thread start executing a task it can hold it in middle, save the current state and start executing another task.\n", "\n", "\n", "\n", "If we look at the diagram above, we can see that a single thread is still tasked to complete all the tasks, but the tasks can interleave between one another.\n", "\n", "Asynchronous programming becomes even more interesting under a multi-threaded environment. Tasks can be interleaved on different threads.\n", "\n", "\n", "\n", "As we can see that T4 was started first in Thread 1 and completed by Thread 2.\n", "\n", "When faced with a decision on whether to use multi-processing, multi-threading, asynchronous programming, we can use this cheat sheet.\n", "\n", "```python\n", "if io_bound:\n", " if io_very_slow:\n", " print(\"Use Asyncio\")\n", " else:\n", " print(\"Use Threads\")\n", "else:\n", " print(\"Multi Processing\")\n", "```\n", "\n", "- CPU Bound => Multi Processing.\n", "- I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading.\n", "- I/O Bound, Slow I/O, Many connections => Asyncio (Asynchronous library in Python, which we have not covered in this post)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Reference\n", "\n", "- [Youtube: Python Threading - Multithreading Playlist](https://www.youtube.com/playlist?list=PLGKQkV4guDKEv1DoK4LYdo2ZPLo6cyLbm)\n", "- [Forum: Python Parallel Processing - Tips and Applications](http://forums.fast.ai/t/python-parallel-processing-tips-and-applications/2092)\n", "- [Blog: A Library for Many Jobs](http://www.admin-magazine.com/HPC/Articles/Parallel-Python-with-Joblib)\n", "- [Blog: Python: A Quick Introduction To The concurrent.futures Module](http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html)\n", "- [Blog: Introduction to Parallel and Concurrent Programming in Python](https://code.tutsplus.com/articles/introduction-to-parallel-and-concurrent-programming-in-python--cms-28612)\n", "- [Blog: Concurrency vs Multi-threading vs Asynchronous Programming : Explained](https://codewala.net/2015/07/29/concurrency-vs-multi-threading-vs-asynchronous-programming-explained/)\n", "- [Blog: Async Python: The Different Forms of Concurrency](http://masnun.rocks/2016/10/06/async-python-the-different-forms-of-concurrency/)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4" }, "toc": { "nav_menu": { "height": "171px", "width": "252px" }, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": true, "toc_position": {}, "toc_section_display": "block", "toc_window_display": true }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }