{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dependencies beyond the DAG\n", "\n", "Ben Clifford benc@hawaga.org.uk\n", "\n", "This tutorial is about doing things with dependencies that are more advanced than in Parsl introductory material. It is intended both for presentation at ParslFest2020 (https://parsl-project.org/parslfest2020.html) but also as something you can work through independently.\n", "\n", "All of the use cases and apps in this tutorial are very simplified, but they all have origin in a real workflow used by the DESC workflow that is the subject of another ParslFest session: the science has been removed but the dependency structures, which are the topic of this talk, have been kept.\n", "\n", "Most of this tutorial will work with Parsl 1.0.0, the latest released version at the time of writing. However, one section needs an in-development feature which you can get from URL in that section's header." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Getting started\n", "First, I'll initialise parsl, ready for the rest of the tutorial. I'll load a local configuration with plenty of workers." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import parsl\n", "from parsl.config import Config\n", "parsl.clear()\n", "parsl.load(\n", " Config(\n", " executors=[parsl.ThreadPoolExecutor(max_threads=10)]\n", " ))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Introduction to dependencies\n", "First, let's look at simple dependencies between apps in Parsl.\n", "\n", "Here's a simple application, which will return the number `10` but will also print some messages (so we can see it run) and delay for a few seconds (to help us see things happening over time):" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# here's a simple app...\n", "@parsl.python_app\n", "def first():\n", " import time\n", " print(\"starting first\")\n", " time.sleep(5)\n", " print(\"ending first\")\n", " return 10" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This app looks like a normal Python function, but with a decorator to turn it into a Parsl app.\n", "\n", "That means when I invoke it, I won't get back the number 10... instead I'll get back a future..." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n" ] }, { "data": { "text/plain": [ "parsl.dataflow.futures.AppFuture" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" }, { "name": "stdout", "output_type": "stream", "text": [ "ending first\n" ] } ], "source": [ "fut = first()\n", "type(fut)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... and I can ask that future if it is done yet - true/false and to wait for the result." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] }, { "data": { "text/plain": [ "10" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "print(fut.done())\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's one app - it doesn't take any input and produces some output.\n", "\n", "Here's another one that takes an input value and doubles it, along with a simple demonstration:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executing double\n" ] }, { "data": { "text/plain": [ "8" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@parsl.python_app\n", "def double(n):\n", " print(\"executing double\")\n", " return n*2\n", "\n", "double(4).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far we haven't seen any dependency handling. But we can string these two apps together like this, and watch them execute. This executes pretty much just like a regular Python expression, with Parsl converting the Future that comes out of `first` into the contained value as it gets passed into double." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "ending first\n", "executing double\n" ] }, { "data": { "text/plain": [ "20" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fut = double(first())\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far, I haven't shown anything that couldn't be written in regular Python, almost identically but without the app decorators.\n", "\n", "Here's another app, `total`, which will add up all values that it is passed:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Totalling...\n" ] }, { "data": { "text/plain": [ "13" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@parsl.python_app\n", "def total(*args):\n", " print(\"Totalling...\")\n", " accumulator = 0\n", " for x in args:\n", " accumulator += x\n", " return accumulator\n", "\n", "total(1,10,2).result()\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now if we run this with a couple of `first` invocations, we can see some more interesting parsl stuff happening:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "starting first\n", "total future is >\n", "ending first\n", "ending first\n", "Totalling...\n" ] }, { "data": { "text/plain": [ "20" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fut = total(first(), first())\n", "print(f\"total future is {fut}\")\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What happened is that both `first` calls ran at the same time, and then `total` ran after those two had completed.\n", "\n", "\n", "The two `first` calls produced futures, which are passed into `total`, which itself returns a future - stored in `fut`. We get that final future *right away* without having to wait for any of the three apps to actually execute. So the rest of our main workflow code can keep executing without any big delay.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Remember in a \"real\" workflow, those three apps might take a long time - days - to execute, but the `Future` is returned immediately.\n", "\n", "That's important in a more complicated workflow, because we might want the code to go off and do other things - for example, launch more apps in a `for`-loop." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Composing subworkflows using Python functions... `.result()` is your enemy\n", "If we weren't going to use Parsl dependencies, we could instead explicitly get the results passing from `first` to `sum`:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "starting first\n", "ending first\n", "ending first\n", "Totalling...total future is >\n", "\n" ] }, { "data": { "text/plain": [ "20" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "a_fut = first()\n", "b_fut = first()\n", "a = a_fut.result()\n", "b = b_fut.result()\n", "fut = total(a,b)\n", "print(f\"total future is {fut}\")\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Although this above example has the same concurrency (two `first` apps run together, then `total` runs after), the final `total` Future is not returned until after the two `first` apps have run. Remember this could be days later.\n", "\n", "If I want a more complicated workflow, perhaps with this code wrapped in a function, then some differences start to appear. Here are the two implementations above, wrapped in functions and called twice:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "starting first\n", "starting first\n", "starting first\n", "ending first\n", "ending first\n", "Totalling...\n", "ending first\n", "ending first\n", "Totalling...\n", "Totalling...\n" ] }, { "data": { "text/plain": [ "40" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def add_firsts_a():\n", " return total(first(), first())\n", "\n", "total(add_firsts_a(), add_firsts_a()).result()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "starting first\n", "ending first\n", "ending first\n", "Totalling...\n", "starting first\n", "starting first\n", "ending first\n", "ending first\n", "Totalling...\n", "Totalling...\n" ] }, { "data": { "text/plain": [ "40" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def add_firsts_b():\n", " a_fut = first()\n", " b_fut = first()\n", " a = a_fut.result()\n", " b = b_fut.result()\n", " return total(a,b)\n", "\n", "total(add_firsts_b(), add_firsts_b()).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With `add_firsts_a`, all `first` invocations run at once. With `add_firsts_b`, only two of the `first` invocations run; then the next two are invoked later. The same results comes out at the end, but the concurrency is unnecessarily limited.\n", "\n", "Rules of thumb:\n", "* `.result()` is your enemy. Avoid it apart from at the very end.\n", "* Return Futures as fast as you can so that other workflow code can run." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Dynamically assembling dependencies in a more complicated workflow\n", "\n", "\n", "In the previous examples, the dependency structure was fixed in the Python source code: the dependency structure was completely fixed by the way in which apps were invoked in the Python source code.\n", "\n", "In this section, I'm going to make a more complicated workflow: a Fantasy Sports League processor.\n", "\n", "The idea is that you can assemble fantasy teams of three real players, and the workflow will compute for each team how well it would have performed, based on analysis of the players real performance.\n", "\n", "First, here is the list of available players:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "players = [\"Dugnutt\", \"Butch\", \"Bannister\", \"Jackson\", \"Ennis-Hill\", \"Abi\"]" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start analyzing Abi\n", "end analyzing Abi\n" ] }, { "data": { "text/plain": [ "3" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@parsl.python_app\n", "def analyze_player(name):\n", " print(f\"start analyzing {name}\")\n", " import time\n", " n = len(name)\n", " time.sleep(n)\n", " print(f\"end analyzing {name}\")\n", " return n\n", "\n", "analyze_player(\"Abi\").result()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "def analyze_all_players() -> dict:\n", " player_futures = {}\n", " for p in players:\n", " player_futures[p] = analyze_player(p)\n", " return player_futures" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If I run analyze_all_players() in the cell below, I'll see all 5 player analyses start concurrently, and immediately get back futures for all 5 players in a `dict`. Then over the next 10 seconds, I'll see the analyses complete.\n", "\n", "Remember returning futures right away is an important thing to do for concurrency." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start analyzing Dugnutt\n", "start analyzing Butch\n", "start analyzing Bannister\n", "start analyzing Jackson\n", "start analyzing Ennis-Hill\n", "start analyzing Abi" ] }, { "data": { "text/plain": [ "{'Dugnutt': >,\n", " 'Butch': >,\n", " 'Bannister': >,\n", " 'Jackson': >,\n", " 'Ennis-Hill': >,\n", " 'Abi': >}" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" }, { "name": "stdout", "output_type": "stream", "text": [ "\n", "end analyzing Abi\n", "end analyzing Butch\n", "end analyzing Dugnutt\n", "end analyzing Jackson\n", "end analyzing Bannister\n", "end analyzing Ennis-Hill\n" ] } ], "source": [ "analyze_all_players()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets define some teams. Because this is a fantasy league, a player can appear in multiple teams." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "teams = [\n", " (\"Team A\", [\"Dugnutt\", \"Bannister\", \"Ennis-Hill\"]),\n", " (\"Team B\", [\"Abi\", \"Butch\", \"Dugnutt\"]),\n", " (\"Team C\", [\"Butch\", \"Jackson\", \"Dugnutt\"])\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So for each team, I want to compute a team score based on combining the abilities of the three players. I'm going to use a simple `sum` to do this." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "def analyze_teams(all_player_futs):\n", " final_results = []\n", " for (team_name, ps) in teams:\n", " player_futs = []\n", " for p in ps:\n", " player_futs.append(all_player_futs[p])\n", " final_results.append( (team_name, total(*player_futs))) \n", " print(f\"prepared team {team_name}\")\n", " print(\"All teams prepared\")\n", " return final_results" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start analyzing Dugnutt\n", "start analyzing Butch\n", "start analyzing Bannister\n", "start analyzing Jacksonstart analyzing Ennis-Hill\n", "\n", "start analyzing Abiprepared team Team A\n", "\n", "prepared team Team B\n", "prepared team Team C\n", "All teams prepared\n", "end analyzing Abi\n", "end analyzing Butch\n", "end analyzing Dugnutt\n", "Totalling...\n", "end analyzing Jackson\n", "Totalling...\n", "end analyzing Bannister\n", "end analyzing Ennis-Hill\n", "Totalling...\n" ] } ], "source": [ "all_player_futs = analyze_all_players()\n", "rs = analyze_teams(all_player_futs)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Team Team A has score 26\n", "Team Team B has score 15\n", "Team Team C has score 19\n" ] } ], "source": [ "for (n, f) in rs:\n", " if f.done():\n", " print(f\"Team {n} has score {f.result()}\")\n", " else:\n", " print(f\"Team {n} no score yet\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dependency relates features of this workflow:\n", "\n", " * Futures can be stored in some more complicated Python data structures: dictionaries and lists\n", " * A simple implementation might wait for all the player processing to complete before moving onto team assembly.\n", " The above implementation makes team assembly only depend of the relevant players.\n", " * The means it can compute team assemblies as soon as the relevant players are completed - that can increase efficiency of resource usage, because more tasks are available to run soner.\n", " * As a special case of that, even if some players fail to compute, can compute all the teams which don't involve them - computation can go \"deep\" into the workflow on the successful branches, even if can't process the full \"width\". Maybe still valuable final science results there, even if it doesn't compute everything.\n", " \n" ] }, { "cell_type": "markdown", "metadata": { "scrolled": false }, "source": [ "## 4. @join_app (feature in-development)\n", "Note: This section won't work with parsl 1.0.0, but get https://github.com/Parsl/parsl/pull/1860 if you would like to try it.\n", "\n", "\n", "In earlier sections, I've used functions to split up pieces of the workflow, quite like functions would be used in normal code. The important thing there is that when a function is called, it must return *fast*, returning Futures for anything that will take a long time.\n", "\n", "There are cases where this isn't enough though. Python functions can take Futures as parameters and use those Futures to attach dependencies to any parsl apps that they launch.\n", "\n", "But they can't inspect any value that will eventually appear inside any of those Futures, without using `.result()`, our enemy.\n", "\n", "If function can't inspect the value inside a future, then they can't change what they do (for example, which apps are launched) based on those values. This is important when one stage of a workflow needs to decide what to do based on an earlier stage.\n", "\n", "To address that, there is a new kind of Parsl app in development, called a `@join_app`. This is a cross between a regular Python function (it will run in the submitting process, and can launch apps), and a python app (it is able to wait for dependencies before executing, and can see the values inside those dependencies).\n", "\n", "The example I will use here is recursive computation of the Fibonacci sequence." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's how I can write this using `@join_app`. See how the choice of apps to run is made based on the value of `n`, which might come as a dependency from an earlier workflow step rather than be immediately available:" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [], "source": [ "@parsl.join_app\n", "def fibonacci(n):\n", " if n == 0:\n", " return total()\n", " elif n == 1:\n", " return total(1)\n", " else:\n", " return total(fibonacci(n-1), fibonacci(n-2))" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n" ] }, { "data": { "text/plain": [ "2" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fibonacci(3).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now I can make this fibonacci computation depend on a value coming from an earlier workflow step, rather than a value that is available immediately, but even so the final result Future in `fut` is available immediately." ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "starting first\n", "Waiting for result\n", "ending first\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...Totalling...\n", "\n", "Totalling...Totalling...\n", "\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n", "Totalling...\n" ] }, { "data": { "text/plain": [ "55" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fut = fibonacci(first())\n", "print(\"Waiting for result\")\n", "fut.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, the important thing that has been enabled is that the fibonacci code is allowed to use an `if` statement to decide which apps it is going to launch (`total` in two cases, and `total` along with 2 x `fibonacci` in the third case)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. Raw `Future` objects\n", "Now I'm going to dig deeper into the mechanics of Futures.\n", "\n", "The Future objects coming from invoking a parsl app have their values eventually set by the guts of parsl, when an app completes... but they're actually instances of the standard python Futures class. https://docs.python.org/3/library/concurrent.futures.html#future-objects \n", "\n", "(Futures also appear in the executor API and you can see that in Yadu's ParslFest talk).\n", "\n", "Parsl apps can use *any* future as a dependency, not just futures from other invoking parsl apps." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's a simple example of non-parsl Futures: I'll create a Future object that's not attached to any executing code, and then I'll use it as a dependency for a simple app, `increment`." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "from concurrent.futures import Future" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f = Future()\n", "f" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now f contains a `Future` that will just sit there passively. Unless I do something to it, it will never get a value.\n", "\n", "I can use that as a dependency for the `total` app:" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "data": { "text/plain": [ ">" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f2 = total(1, f)\n", "f2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because nothing is running to make `f` complete, that means the `total` app will never run either..." ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n", "False\n" ] } ], "source": [ "print(f.done())\n", "print(f2.done())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... until something comes along and sets the result of the Future ..." ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Totalling...\n" ] } ], "source": [ "f.set_result(10)" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "11" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f2.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... at which point `total` ran (because its input dependency had a value), and so the result of `total` is now also available in `f2`.\n", "\n", "Parsl doesn't care how `f` got its value... just that it eventually did get one." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hacking more interesting dependency behaviour in futures\n", "\n", "This is a way that I've prototyped different dependency behaviour without having to build it into the parsl codebase. By making Futures that behave as I want to, I can prototype the behaviour in a few lines of rough code without having to work on the core parsl code.\n", "\n", "As a trade-off, almost all of the other benefits that parsl brings (remote execution, retries, monitoring information, checkpointing) are lost for this particular step - so it's definitely a prototyping exercise rather than solid parsl functionality." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section, I'll try to solve a particular problem:\n", "\n", "Sometimes an app raises an Exception instead of completing sucessfully. With normal Parsl behaviour, that causes all dependent apps to fail too. But what I'd like in this case is that the dependent app still runs, and gets to see that some of its upstream apps might have failed.\n", "\n", "In this example, I would like an app that takes an average of the numbers returned by its dependencies - but if any dependencies fail, then we ignore that dependency rather than abort the whole computation. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's how it doesn't work right now:" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(55.0, 2)" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "@parsl.python_app\n", "def rand():\n", " import random\n", " return random.random()\n", "\n", "@parsl.python_app\n", "def fail():\n", " raise RuntimeError(\"Parsl demo failure\")\n", " \n", "@parsl.python_app\n", "def avg(*args):\n", " s = 0\n", " c = 0\n", " for x in args:\n", " c += 1\n", " s += x\n", " return (s/c, c)\n", "\n", "avg(50,60).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here's a small workflow with `rand` and `avg` put together:" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(0.5941547596900922, 3)" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "avg(rand(),rand(), rand()).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... but this fails (because there is a failing dependency):" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "ename": "DependencyError", "evalue": "Dependency failure for task 439 with failed dependencies from tasks [437]", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mDependencyError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mavg\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrand\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfail\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrand\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36mresult\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 423\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 424\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_state\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mFINISHED\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 425\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 426\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 427\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_condition\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36m__get_result\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 382\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 384\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 385\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 386\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/parsl/src/parsl/parsl/dataflow/dflow.py\u001b[0m in \u001b[0;36mhandle_exec_update\u001b[0;34m(self, task_id, future)\u001b[0m\n\u001b[1;32m 270\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 271\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 272\u001b[0;31m \u001b[0mres\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_unwrap_remote_exception_wrapper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuture\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 273\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 274\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m~/parsl/src/parsl/parsl/dataflow/dflow.py\u001b[0m in \u001b[0;36m_unwrap_remote_exception_wrapper\u001b[0;34m(future)\u001b[0m\n\u001b[1;32m 417\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0mstaticmethod\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 418\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m_unwrap_remote_exception_wrapper\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfuture\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0mFuture\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m->\u001b[0m \u001b[0mAny\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 419\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 420\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRemoteExceptionWrapper\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 421\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36mresult\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 423\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 424\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_state\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mFINISHED\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 425\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 426\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 427\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_condition\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36m__get_result\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 382\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 384\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 385\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 386\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mDependencyError\u001b[0m: Dependency failure for task 439 with failed dependencies from tasks [437]" ] } ], "source": [ "avg(rand(), fail(), rand()).result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What i want is that dependency failure to not happen, and for `avg` to see the failure and be able to change its behaviour.\n", "\n", "I'm going to make an adapter that passes through normal results, but that turns a `Future` with an exception into one that returns a regular value. Then I'm going to use that as an adapter." ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [], "source": [ "class ExceptionHidingFuture(Future):\n", " def __init__(self, parent: Future):\n", " super().__init__()\n", " parent.add_done_callback(self.cb)\n", " \n", " def cb(self, parent):\n", " if parent.exception():\n", " self.set_result(\"LOL\")\n", " else:\n", " self.set_result(parent.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, what happens without this adapter:" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [], "source": [ "f = Future()\n", "f.set_exception(RuntimeError(\"DEMO\"))" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "ename": "RuntimeError", "evalue": "DEMO", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mRuntimeError\u001b[0m Traceback (most recent call last)", "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[0;32m----> 1\u001b[0;31m \u001b[0mf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36mresult\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 423\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 424\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_state\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mFINISHED\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 425\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 426\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 427\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_condition\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;32m/usr/lib/python3.7/concurrent/futures/_base.py\u001b[0m in \u001b[0;36m__get_result\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 382\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__get_result\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 383\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 384\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_exception\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 385\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 386\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_result\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", "\u001b[0;31mRuntimeError\u001b[0m: DEMO" ] } ], "source": [ "f.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And now the same again, but with the new adapter in the dependency chain (once with an exception showing the new exception handling code, and once with a normal result showing passthrough behaviour):" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [], "source": [ "f = Future()\n", "f2 = ExceptionHidingFuture(f)\n", "f.set_exception(RuntimeError(\"DEMO\"))" ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'LOL'" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f2.result()" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [], "source": [ "f = Future()\n", "f2 = ExceptionHidingFuture(f)\n", "f.set_result(3)" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "f2.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So now I can go back to my earlier attempt to average some random numbers and failures, wrapping the ExceptionHidingFuture around all of my dependencies:\n", "\n", "First add some functionality into `avg` to make it handle failed dependencies:" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "@parsl.python_app\n", "def avg(*args):\n", " s = 0\n", " c = 0\n", " for x in args:\n", " if not isinstance(x, str):\n", " c += 1\n", " s += x\n", " return (s/c, c)\n" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "(0.7574856902166225, 2)" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "avg(ExceptionHidingFuture(rand()),\n", " ExceptionHidingFuture(fail()),\n", " ExceptionHidingFuture(rand())\n", " ).result()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So with a 9-line definition, this prototypes some differently dependency handling logic...\n", "\n", "To become real parsl functionality that would probably need to be wired in more deeply into the parsl dataflow kernel... remember that this prototype loses a bunch of parsl's useful non-dependency functionality." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# The End!\n", "Summary:\n", "\n", "1. Basic dependencies\n", "2. .result is your enemy\n", "3. More complicated dependency structures\n", "4. @join_apps - changing behaviour based on dependencies\n", "5. Messing around in futures\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Extra material" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## X1. What else is in a parsl future?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A Parsl app's Future also holds a reference to the task record for the app invocation, which can be interesting to poke at sometimes. For the purposes of this talk, the most interesting field is `depends` which shows that this task had two dependencies which both returned an `int` - those are the `first` invocations." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut.task_def\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## X2. Dependencies put in place by file staging\n", "\n", "Sometimes parsl will add in more dependencies automatically on app invocation, due to file staging.\n", "\n", "Here's an app that counts lines in a file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@parsl.python_app\n", "def count_lines(file):\n", " return len(open(file).readlines())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the app is invoked on a local file, there are no dependencies..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut = count_lines(parsl.File(\"/etc/passwd\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut.result()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut.task_def['depends']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... but if it is invoked on an HTTP URL, then a staging task is implicitly launched and added in as a dependency:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut2 = count_lines(parsl.File(\"http://www.cqx.ltd.uk/index.html\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut2.result()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fut2.task_def['depends']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In general, data staging providers can insert arbitrary sub-workflows before and after app execution." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }