{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parsl WorkQueueExecutor tutorial\n", "\n", "\n", " \n", " \n", "
\n", "\n", " \n", "## WorkQueue installation\n", "\n", "To use WQEX in Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment.\n", "\n", "1. Install Parsl on Github master\n", "2. conda install -y -c conda-forge ndcctools\n", "\n", "\n", "## Fine-grained resource management\n", "\n", "In this tutorial we present three typical uses of WorkQueueExecutor (**WQEX**) in Parsl, including the **default**, **autolabel** and **per-task resource specification**.\n", "\n", "1. **Default** is suitable for apps that need to utilize the resource of the entire node, e.g., an app itself already does some parallelism internally.\n", "2. **Autolabel** is to enable Work Queue to automatically figure out the resource needed for every category of tasks, and then automatically allocate the tasks to use the computing resource efficiently.\n", "3. **Per-task resource specification** allows a user to specify the exact resource needed of an app.\n", "\n", "We will go through each of these use cases in the following." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Imports for this notebook\n", "from parsl.config import Config\n", "from parsl.executors import WorkQueueExecutor\n", "from parsl.providers import LocalProvider\n", "\n", "import parsl\n", "from parsl.app.app import python_app\n", "\n", "import time\n", "import multiprocessing\n", "import glob\n", "import pandas as pd\n", "import json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Default configuration\n", "\n", "Now we define an initial WQEX configuration. Like the configuration of other executors (e.g. HTEX), we need to define a provider for WQEX. This launches **ONE** Work Queue worker on the local node." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Default configuration of WQEX\n", "config = Config(\n", " executors=[\n", " WorkQueueExecutor(port=50055,\n", " provider=LocalProvider()\n", " )\n", " ]\n", ")\n", "\n", "dfk = parsl.load(config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we define a simple sleeper app that sleeps for a short duration. We know this app uses minimal resource." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "@python_app\n", "def sleeper(dur=1):\n", " \"\"\" \n", " An app that sleeps for certain duration\n", " \"\"\"\n", " import time\n", " time.sleep(dur)\n", " return dur" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we invoke the sleeper app for multiple times. With this default configuration, each worker can only run one task at a time. So the following `sleeper` tasks are essentially run in serial." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 1, 1, 1, 1]\n", "Task finished in 28.411840200424194 seconds\n" ] } ], "source": [ "start = time.time()\n", "tasks = [sleeper() for i in range(5)]\n", "res = [t.result() for t in tasks]\n", "print(res)\n", "print(f\"Task finished in {time.time() - start} seconds\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we see below, there are multiple cores on the local node. This default config is not ideal for running tasks like `sleeper` that uses only minimal resource. This default config is suitable for apps that need to use the entire computing node." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "56" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "multiprocessing.cpu_count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Clean up the loaded Parsl WQEX default config" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "dfk.cleanup()\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# How can we use WQEX for apps that do not need the entire node?\n", "\n", "1. Autolabel\n", "2. Per-task resource specification\n", "\n", "\n", "## Autolabel\n", "\n", "Let us enable the autolabelin this new configuration. To enable autolabel, we just need to set `autolabel=True`. Note here that we also set `autocateogory=True`. **Autolabel** in WQEX automaically monitors the resource pattern of apps per category. By default, we have `autocateogory=False`, which means that WQEX categorizes **ALL** Parsl apps into one category, `parsl-default`. However, different apps may present various resource patterns. So it is better to categorize apps in a finer-grained. Setting `autocateogory=True` enables WorkQueue to automatically categorize different parsl apps based on the function name. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Configuration with autolabel and autocategory enabled\n", "config = Config(\n", " executors=[\n", " WorkQueueExecutor(port=50055,\n", " autolabel=True,\n", " autocategory=True,\n", " provider=LocalProvider()\n", " )\n", " ]\n", ")\n", "\n", "dfk = parsl.load(config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we define an app called `mp_task` that can stress multiple cores. In this app, we use the `multiprocessing` module to implement it. We then run five `mp_task` tasks. By default, each `mp_task` uses 2 CPU cores and lasts for around 2 seconds." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']\n" ] } ], "source": [ "@python_app\n", "def mp_task(cores=2):\n", " \"\"\"\n", " An app that use multiprocessing to mimic an app that uses multiple cores\n", " \"\"\"\n", " from multiprocessing import Process\n", " import time\n", " def stress(dur=2):\n", " start = time.time()\n", " while time.time() - start < dur:\n", " continue\n", " \n", " processes = []\n", " for i in range(cores):\n", " p = Process(target=stress, args=(2,))\n", " p.start()\n", " processes.append(p)\n", " for p in processes:\n", " p.join()\n", " return f'Done with {cores} cores'\n", "\n", "\n", "# Submit 5 mp_task app\n", "tasks = []\n", "for i in range(5):\n", " fut = mp_task()\n", " tasks.append(fut)\n", "res = [t.result() for t in tasks]\n", "print(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "WorkQueue automatically monitors the resource usage of this category (based on the function name of app). Let us take a deeper look at what happens behind the scene. We define a function to parse the logs to get the resource allocated to all the tasks." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def parse_logs():\n", " \"\"\"\n", " Parse the resource assignment of Work Queue from the runinfo logs\n", " \"\"\"\n", " dirs = glob.glob(\"runinfo/*\")\n", " log = \"{}/WorkQueueExecutor/transaction_log\".format(sorted(dirs)[-1])\n", " with open(log) as f:\n", " lines = f.readlines()\n", "\n", " resources = ['task_id', 'task_type', 'cores', 'memory', 'disk']\n", " df = pd.DataFrame(columns=resources)\n", " task_ids = {}\n", " for line in lines:\n", " if \"WAITING\" in line and \"WAITING_RETRIEVAL\" not in line and 'taskid' not in line:\n", " line = line.split()\n", " task_id = line[3]\n", " task_category = line[5]\n", " task_ids[task_id] = task_category\n", "\n", " # timestamp master-pid TASK id (continue next line)\n", " # DONE SUCCESS exit-code exceeded measured\n", " if \"RUNNING\" in line and 'taskid' not in line:\n", " line = line.split()\n", " task_id = line[3]\n", " s = json.loads(line[-1])\n", "\n", " # append the new resources to the panda's data frame.\n", " # Resources are represented in a json array as\n", " # [value, \"unit\", such as [1000, \"MB\"],\n", " # so we only take the first element of the array:\n", " df.loc[len(df)] = [task_id, task_ids[task_id]] + list(float(s[r][0]) for r in resources[2:])\n", " return df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the resource for each `mp_task` task assigned by WorkQueue. \n", "\n", "As we see, the first `mp_task` task was allocated 56 cores, 257GB memory, and 112GB disk. This is because WorkQueue assigns all the resource on the local node to the task, in order to profile the resource utilization for the first task. After that, WorkQueue updates the resource for this category (based on function name) accordingly. Thus, the later tasks use only 2 cores, 57MB memory, and 2MB disk.\n", "\n", "In summary, in autolabel, for each category, Work Queue monitors the resources of some tasks in this category first (the number is tuned by `autolabel_window`). Then Work Queue allocates resources to later tasks based on the monitoring for high throughput. If a task fails because of lacking of resource, Work Queue automatically retries the task with more resource." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
task_idtask_typecoresmemorydisk
01mp_task56.0257036.0118788.0
12mp_task2.057.02.0
23mp_task2.057.02.0
34mp_task2.057.02.0
45mp_task2.057.02.0
\n", "
" ], "text/plain": [ " task_id task_type cores memory disk\n", "0 1 mp_task 56.0 257036.0 118788.0\n", "1 2 mp_task 2.0 57.0 2.0\n", "2 3 mp_task 2.0 57.0 2.0\n", "3 4 mp_task 2.0 57.0 2.0\n", "4 5 mp_task 2.0 57.0 2.0" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = parse_logs()\n", "df[df['task_type'] == 'mp_task'].head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above shows how **autolabel** works. This is good for the use case where one is not quite clear about the resource requirements of the apps.\n", "\n", "## Per-task resource specification\n", "\n", "Besides **autolabel**, WQEX supports to specify the resource requirement of a Parsl app by using a specific kwarg **parsl_resource_specification**. There are two ways you can specify the resource requirements for each app.\n", "\n", "### Declare resource requirements in the kwargs when you define an app\n", "This declaration is the default resource requirement for `mp_task_spec` app. Currently, WQEX supports specification of three types of resources: **cores, memory, and disk**." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "@python_app\n", "def mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'memory': 100, 'disk': 100}):\n", " from multiprocessing import Process\n", " import time\n", " def stress(dur=2):\n", " start = time.time()\n", " while time.time() - start < dur:\n", " continue\n", " \n", " processes = []\n", " for i in range(cores):\n", " p = Process(target=stress, args=(2,))\n", " p.start()\n", " processes.append(p)\n", " for p in processes:\n", " p.join()\n", " return f'Done with {cores} cores'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we invoke an app without further specifying the resource requirements, WQEX allocates each invocation the default resource when we define the app." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['Done with 2 cores', 'Done with 2 cores', 'Done with 2 cores']\n" ] } ], "source": [ "tasks = []\n", "for i in range(3):\n", " # Invoke the app using the default resource specification\n", " fut = mp_task_spec(cores=2)\n", " tasks.append(fut)\n", "res = [t.result() for t in tasks]\n", "print(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the resource for each `mp_task_spec` task allocated by WorkQueue. As we see, all invocations of `mp_task_spec` are allocated by the same resource, which is 3 cores, 100MB memory, and 100MB disk." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
task_idtask_typecoresmemorydisk
56mp_task_spec2.0100.0100.0
67mp_task_spec2.0100.0100.0
78mp_task_spec2.0100.0100.0
\n", "
" ], "text/plain": [ " task_id task_type cores memory disk\n", "5 6 mp_task_spec 2.0 100.0 100.0\n", "6 7 mp_task_spec 2.0 100.0 100.0\n", "7 8 mp_task_spec 2.0 100.0 100.0" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = parse_logs()\n", "df[df['task_type'] == 'mp_task_spec'].head(n=10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In addition to using the default resource specification, we can also change the resource requirement when we invoke the app" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['Done with 1 cores', 'Done with 2 cores', 'Done with 3 cores']\n" ] } ], "source": [ "tasks = []\n", "for i in range(3):\n", " # Vary the resource specification per invocation\n", " fut = mp_task_spec(cores=i+1,\n", " parsl_resource_specification={'cores': i+1, 'memory': 100, 'disk': 100})\n", " tasks.append(fut)\n", "res = [t.result() for t in tasks]\n", "print(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is the resource for each mp_task_spec task assigned by WorkQueue. As we see, all invocations of `mp_task_spec` use different resources." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
task_idtask_typecoresmemorydisk
89mp_task_spec1.0100.0100.0
910mp_task_spec2.0100.0100.0
1011mp_task_spec3.0100.0100.0
\n", "
" ], "text/plain": [ " task_id task_type cores memory disk\n", "8 9 mp_task_spec 1.0 100.0 100.0\n", "9 10 mp_task_spec 2.0 100.0 100.0\n", "10 11 mp_task_spec 3.0 100.0 100.0" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = parse_logs()\n", "df[df['task_type'] == 'mp_task_spec'].tail(n=3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is worth mentioning that any resource specification overrides what Work Queue **autolabel** infers.\n", "\n", "1. With autolabel **enabled**, we do not have to specify all three types of resources for an app. We can choose to specify some of them. For example, in the below, we only specify cores and disk, but autolabel automatically infers what is needed for memory and fill that gap.\n", "\n", "2. With autolabel **disabled**, if we do not specify all three types of resources, cores, memory, and disk, Work Queue alloates all the resource of a worker (i.e., a compute node) to run the app. This is not a proper configuration we should use." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
task_idtask_typecoresmemorydisk
1112mp_task_spec2.060.050.0
\n", "
" ], "text/plain": [ " task_id task_type cores memory disk\n", "11 12 mp_task_spec 2.0 60.0 50.0" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fut = mp_task_spec(cores=2, parsl_resource_specification={'cores': 2, 'disk': 50})\n", "fut.result()\n", "df = parse_logs()\n", "df[df['task_type'] == 'mp_task_spec'].tail(n=1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python (parsl-issue)", "language": "python", "name": "parsl-issue" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 2 }