{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Plug your execution system into Parsl\n", "\n", "This notebook will dive straight into a live coding demo of how you could plug your execution system as a backend to Parsl. An execution system in this context is a system that will take some code (python function) and execute it on some compute provider for eg. a laptop, GPU cluster, a cloud vendor etc. \n", "\n", "Now, why would you want to do this ?\n", "\n", "* Your system supports features our systems don't eg. resource based task packing\n", "* Your system might have faster, more reliable or efficient task execution\n", "* You want to create a mix and match system where tasks execute in different environments using systems that suit them, and you want Parsl to be combine these systems\n", "\n", "\n", "In this notebook we will cover:\n", "\n", "1. Parsl Executor API\n", "2. Balsam Overview\n", "3. Cover how basic operation can be done with Balsam\n", "4. Write a new Balsam Executor that we'll test." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook environment\n", "\n", "1. We are in a Jupyter notebook hosted at ALCF, with access to the cobalt scheduler\n", "2. This notebook has both Parsl and Balsam libraries available from the active Conda environment\n", "3. We will not focus on the specifics of Balsam, but rather on how it's task execution capabilities can be integrated into Parsl." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parsl Executors\n", "\n", "Parsl executors extend the `concurrent.futures.Executor` class. This gives us a widely used standard interface that at it's core is very simple.\n", "\n", "![API Diagram](API_diagram.png)\n", "\n", "\n", "1. An executor has a submit method\n", "2. When submit is called with a function, and it's params the executor will execute the function\n", "3. The submit method immediately returns a Future, rather than waiting for the function execution\n", "4. The Future is asynchronously updated with the results when it becomes available." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### Balsam\n", "\n", "Balsam is an HPC workflows and edge computing system out of ALCF. Balsam's execution model follows a Pilot job model where a scheduler job is requested for a large walltime, onto which workers are launched.These workers connect back to the main system and can run arbitrary number of tasks. Balsam uses a Postgres database to store the task definitions, making it robust in the face of both software and hardware failures.\n", "\n", "![Balsam](balsam.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's first import both Parsl and Balsam and confirm that we have the latest versions available" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Balsam version : 0.3.8\n", "Parsl version : 1.0.0\n" ] } ], "source": [ "import balsam\n", "import parsl\n", "\n", "print(\"Balsam version :\", balsam.__version__)\n", "print(\"Parsl version :\", parsl.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Step 1: Batch Job submission\n", "\n", "Submit a batch job to the Cobalt scheduler on theta using Balsam" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "\n", "import os\n", "os.environ[\"BALSAM_DB_PATH\"] = '~/myWorkflow'\n", "os.environ[\"DJANGO_ALLOW_ASYNC_UNSAFE\"] = \"true\"\n", "\n", "from balsam.scripts.cli_commands import submitlaunch\n", "import os\n", "import sys\n", "\n", "# Step.1 would be get a test job launched with Balsam.\n", "def balsam_submit_batch_job(nodes=1, walltime=10):\n", " \n", " sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/env/lib/python3.6/site-packages/')\n", " sys.path.insert(0,'/soft/datascience/Balsam/0.3.8/')\n", "\n", " # We also need balsam and postgresql to be in the path.\n", " import os\n", " os.environ['PATH'] ='/soft/datascience/Balsam/0.3.8/env/bin/:' + os.environ['PATH']\n", " os.environ['PATH'] +=':/soft/datascience/PostgreSQL/9.6.12/bin/'\n", "\n", " from argparse import Namespace\n", " args = Namespace(nodes=nodes,\n", " time_minutes=walltime,\n", " job_mode='serial', \n", " project='CSC249ADCD01',\n", " wf_filter='',\n", " sched_flags='',\n", " queue='debug-cache-quad')\n", " submitlaunch(args) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Step 2: Register Application Definition\n", "\n", "We need to create an application profile for the tasks we want Balsam to execute\n", "In our case, we want balsam to execute a task that:\n", "\n", "![Sequence Diagram](executor_sequence.png)\n", "\n", " 1. Read a serialized function and params package\n", " 2. Deserialize\n", " 3. Execute the function package\n", " 4. Serialize the results or exceptions. Let's assume our functions will never fail.\n", " 5. Write out the serialized results package for parsl" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from balsam.core.models import BalsamJob, ApplicationDefinition\n", "def register_balsam_executor():\n", " from balsam.core.models import BalsamJob, ApplicationDefinition\n", "\n", " return ApplicationDefinition.objects.get_or_create(\n", " name = 'execute_parsl_task', # Let's call execute_parsl_task\n", " envscript = '/home/yadunand/setup_parsl_test_env.sh', # This script will activate the parsl conda env\n", " executable = os.path.abspath('execute_parsl_task.py'), # This script does the 5 steps described above.\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Step 3: Launch tasks via Balsam\n", " \n", "Balsam internally creates a database entry in Postgres, and the job object\n", "returned can be used to check the state of the function execution." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from balsam.launcher.futures import FutureTask, wait\n", "\n", "def _cbk():\n", " pass\n", "\n", "def launch_balsam_task(func_name, task_id, fn_package, result_package):\n", " job = BalsamJob(name = f\"{func_name}.{task_id}\",\n", " workflow = \"parsl-balsam\",\n", " application = \"execute_parsl_task\",\n", " args = f\"-i {fn_package} -o {result_package}\",\n", " node_packing_count=16 # TEST THIS TODO:\n", " )\n", " job.data[\"task_id\"] = task_id\n", " job.save()\n", " return FutureTask(job, _cbk)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Step 4: Check task status\n", "\n", "We need a mechanism that waits for all launched jobs and updates the results when they become available. One mechanism available from balsam is a poll based system triggered when job.done is called." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[>, >]\n" ] } ], "source": [ "import threading\n", "import time\n", "from concurrent.futures import Future\n", "\n", "# Ideally we'd have a thread that sits in the background polling tasks, rather than having the main thread block\n", "def task_status_poller(job_list, kill_event, poll_freq=2):\n", " \n", " to_remove = []\n", " while not kill_event.is_set():\n", " print([job.done for job in job_list])\n", " for job in job_list:\n", " if job.done:\n", " to_remove.append(job)\n", " \n", " for job in to_remove:\n", " to_remove.remove(job)\n", " time.sleep(5)\n", " print(\"Dying\")\n", "\n", " \n", "kill_event = threading.Event()\n", "#task_status_poller([Future(), Future()], kill_event)\n", "\n", "\n", "thread = threading.Thread(target=task_status_poller, args=([Future(), Future()], kill_event, ))\n", "\n", "# Start thread, sleep 5s, and kill thread\n", "thread.start()\n", "time.sleep(4)\n", "kill_event.set()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Test everything so far.\n", "# balsam_submit_batch_job()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# register_balsam_executor()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "#task = os.path.abspath('balsam_workdir/c4eb5544-741b-420f-bd10-829fb85fedf0.pkl')\n", "#task_out = task + '.out'\n", "#j1 = launch_balsam_task('foo', 1, task, task+'.1.out')\n", "#j2 = launch_balsam_task('foo', 2, task, task+'.2.out')" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "scrolled": true }, "outputs": [], "source": [ "#kill_event = threading.Event()\n", "#thread = threading.Thread(target=poll_for_results,\n", "# args=([j1, j2], kill_event, ))\n", "\n", "# Start thread, sleep 5s, and kill thread\n", "#thread.start()\n", "#time.sleep(300)\n", "#kill_event.set()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Step 5: Make a new executor\n", "\n", "Here we will create a new executor with the following methods :\n", "\n", "1. `start()` -> starts the task status polling thread, a new batch job, and registers the execute application\n", "2. `shutdown()` -> set the event for the polling thread to terminate\n", "3. `submit()` -> launch a task via balsam\n", "4. `scaling_enabled()` -> set to `false` to disable auto-scaling" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import threading\n", "import time\n", "from parsl.utils import RepresentationMixin\n", "from parsl.executors.status_handling import NoStatusHandlingExecutor\n", "from concurrent.futures import Future\n", "import os\n", "import uuid\n", "\n", "from parsl.serialize import pack_apply_message, deserialize\n", "\n", "class BalsamExecutor(NoStatusHandlingExecutor, RepresentationMixin):\n", " \n", " def __init__(self, nodes=1, walltime=10, workdir='balsam_workdir', label='BalsamExecutor'):\n", " self.label = label \n", " self._tasks = {}\n", " self.workdir = os.path.abspath(workdir)\n", " self.task_count = 0\n", " self.nodes = nodes\n", " self.walltime = walltime\n", " self._kill_event = threading.Event()\n", " \n", " def update_task_status(self, result_path, parsl_future):\n", " print(f\"Loading result from {result_path}\")\n", " with open(result_path, 'rb') as f:\n", " result = deserialize(f.read())\n", " parsl_future.set_result(result)\n", " \n", " def task_status_poller(self, kill_event, poll_delay=20):\n", " \"\"\" This should be a thread\n", " \"\"\"\n", " while not kill_event.is_set():\n", " to_remove = []\n", " if self._tasks:\n", " print(\"Poll: \", {t:self._tasks[t]['balsam_future'].done for t in self._tasks})\n", " for task_id in self._tasks:\n", " if self._tasks[task_id]['balsam_future'].done:\n", " print(f\"Updating task:{task_id} with result\")\n", " self.update_task_status(self._tasks[task_id]['output_path'],\n", " self._tasks[task_id]['parsl_future'])\n", " to_remove.append(task_id)\n", " \n", " # Remove completed items from internal tasks table\n", " print(f\"Removing tasks : {to_remove}\")\n", " for done_task in to_remove:\n", " self._tasks.pop(done_task)\n", " \n", " time.sleep(poll_delay) \n", " print(\"[POLL_THREAD]: Exiting\")\n", " \n", " def start(self):\n", " \"\"\" Called when DFK starts the executor when the config is loaded\n", " 1. Make workdir \n", " 2. Start task_status_poller\n", " 3. Register balsam executor\n", " 4. Launch cobalt job from balsam\n", " \"\"\"\n", " os.makedirs(self.workdir, exist_ok=True)\n", " # Start the task status poller thread\n", " self.thread = threading.Thread(target=self.task_status_poller,\n", " args=(self._kill_event, ))\n", " self.thread.start()\n", " \n", " # Register the parsl_execute app\n", " register_balsam_executor()\n", " \n", " balsam_submit_batch_job(nodes=self.nodes,\n", " walltime=self.walltime) \n", " print(\"Start\")\n", " \n", " def shutdown(self):\n", " self.kill_event.set()\n", " print(\"Shutdown\")\n", " \n", " def submit(self, func, resource_spec, *args, **kwargs):\n", " print(f\"{func} submitted\")\n", "\n", " tid = str(uuid.uuid4())\n", " self.task_count += 1\n", " \n", " try:\n", " fn_buf = pack_apply_message(func, args, kwargs,\n", " buffer_threshold=1024 * 1024)\n", " task_package = f'{self.workdir}/{tid}.pkl'\n", " task_output = f'{self.workdir}/{tid}.out.pkl'\n", " with open(task_package, 'wb') as f:\n", " f.write(fn_buf)\n", " \n", " balsam_future = launch_balsam_task(func.__name__, tid, task_package, task_output)\n", " self._tasks[tid] = {'name': func.__name__,\n", " 'balsam_future': balsam_future,\n", " 'parsl_future': Future(),\n", " 'output_path': task_output,\n", " 'task_path': task_package,\n", " } \n", " \n", " except TypeError:\n", " raise SerializationError(func.__name__)\n", " \n", " return self._tasks[tid]['parsl_future']\n", " \n", " def scale_in(self):\n", " pass\n", " \n", " def scale_out(self):\n", " pass\n", " \n", " def scaling_enabled(self):\n", " return False\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's load the executor we defined above and run some quick tests" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "from parsl.config import Config\n", "\n", "config = Config(executors=[BalsamExecutor()])" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Dying\n", "Submit OK: Qlaunch { 'command': '/home/yadunand/myWorkflow/qsubmit/qlaunch51.sh',\n", " 'from_balsam': True,\n", " 'id': 51,\n", " 'job_mode': 'serial',\n", " 'nodes': 1,\n", " 'prescheduled_only': False,\n", " 'project': 'CSC249ADCD01',\n", " 'queue': 'debug-cache-quad',\n", " 'scheduler_id': 471701,\n", " 'state': 'submitted',\n", " 'wall_minutes': 10,\n", " 'wf_filter': ''}\n", "Start\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "parsl.clear()\n", "parsl.load(config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll create two apps, one python based, the other a bash app." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "from parsl import python_app, bash_app\n", "\n", "@python_app\n", "def double(x):\n", " return x * 2\n", "\n", "@bash_app\n", "def platinfo(stdout='test.out'):\n", " return 'uname -a'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's launch a few tasks that invoke the `double` app defined above" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " submitted\n", " submitted\n", " submitted\n", " submitted\n" ] } ], "source": [ "futures = {}\n", "for i in range(4):\n", " f = double(i)\n", " futures[i] = f" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ ">\n", ">\n", ">\n", ">\n" ] } ], "source": [ "for i in futures:\n", " print(futures[i])" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': False, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': False, '0d5ef422-d12e-4bda-accd-bc3882bcad61': False, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': False}\n", "Removing tasks : []\n", "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': False, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': False, '0d5ef422-d12e-4bda-accd-bc3882bcad61': False, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': False}\n", "Removing tasks : []\n", "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': False, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': False, '0d5ef422-d12e-4bda-accd-bc3882bcad61': False, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': False}\n", "Removing tasks : []\n", "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': False, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': False, '0d5ef422-d12e-4bda-accd-bc3882bcad61': False, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': False}\n", "Removing tasks : []\n", "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': False, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': False, '0d5ef422-d12e-4bda-accd-bc3882bcad61': False, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': False}\n", "Removing tasks : []\n", "Poll: {'f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6': True, '39b5eba0-cd6e-4060-9b44-437e1fe4e75d': True, '0d5ef422-d12e-4bda-accd-bc3882bcad61': True, '95f527ea-c93e-4b91-8ef4-cf97cacfad8d': True}\n", "Updating task:f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6 with result\n", "Loading result from /gpfs/mira-home/yadunand/parsl-tutorial/ParslFest2020/Plug_your_execution_system_into_Parsl/balsam_workdir/f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6.out.pkl\n", "0Updating task:39b5eba0-cd6e-4060-9b44-437e1fe4e75d with result\n", "\n", "Loading result from /gpfs/mira-home/yadunand/parsl-tutorial/ParslFest2020/Plug_your_execution_system_into_Parsl/balsam_workdir/39b5eba0-cd6e-4060-9b44-437e1fe4e75d.out.pkl\n", "2Updating task:0d5ef422-d12e-4bda-accd-bc3882bcad61 with result\n", "\n", "Loading result from /gpfs/mira-home/yadunand/parsl-tutorial/ParslFest2020/Plug_your_execution_system_into_Parsl/balsam_workdir/0d5ef422-d12e-4bda-accd-bc3882bcad61.out.pkl\n", "Updating task:95f527ea-c93e-4b91-8ef4-cf97cacfad8d with result4\n", "Loading result from /gpfs/mira-home/yadunand/parsl-tutorial/ParslFest2020/Plug_your_execution_system_into_Parsl/balsam_workdir/95f527ea-c93e-4b91-8ef4-cf97cacfad8d.out.pkl\n", "\n", "6Removing tasks : ['f88bf5a0-5a14-4ac0-aa41-7bf29c754bb6', '39b5eba0-cd6e-4060-9b44-437e1fe4e75d', '0d5ef422-d12e-4bda-accd-bc3882bcad61', '95f527ea-c93e-4b91-8ef4-cf97cacfad8d']\n", "\n" ] } ], "source": [ "for i in futures:\n", " print(futures[i].result())" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Recap\n", "\n", "We basically implemented the sequence shown in this diagram:\n", "\n", "![Sequence Diagram](executor_sequence.png)\n", "\n", "* We saw a walkthrough of Balsam interfaces that show you how to \n", " * Place batch job requests via Balsam\n", " * Register an application that will execute parsl tasks\n", " * Launch a Balsam job for each Parsl task\n", "* Wrote a new BalsamExecutor that plugs into parsl " ] } ], "metadata": { "kernelspec": { "display_name": "parsl_balsam", "language": "python", "name": "parsl_balsam" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }