{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# funcX Tutorial\n", "\n", "funcX is a Function-as-a-Service (FaaS) platform for science that enables you to register functions in a cloud-hosted service and then reliably execute those functions on a remote funcX endpoint. This tutorial is configured to use a tutorial endpoint hosted by the funcX team. You can setup and use your own endpoint by following the [funcX documentation](https://funcx.readthedocs.io/en/latest/endpoints.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## funcX Python SDK\n", "\n", "The funcX Python SDK provides programming abstractions for interacting with the funcX service. Before running this tutorial you should first install the funcX SDK as follows:\n", "\n", " $ pip install funcx\n", "\n", "The funcX SDK exposes a `FuncXClient` object for all interactions with the funcX service. In order to use the funcX service, you must first authenticate using one of hundreds of supported identity provides (e.g., your institution, ORCID, Google). As part of the authentication process you must grant permission for funcX to access your identity information (to retrieve your email address), Globus Groups management access (to share functions and endpoints), and Globus Search (to discover functions and endpoints). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from funcx.sdk.client import FuncXClient\n", "\n", "fxc = FuncXClient()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic usage\n", "\n", "The following example demonstrates how you can register and execute a function. \n", "\n", "## Registering a function\n", "\n", "funcX works like any other FaaS platform: you must first register a function with funcX before being able to execute it on a remote endpoint. The registration process will serialize the function body and store it securely in the funcX service. As we will see below, you may share functions with others and discover functions that are shared with you.\n", "\n", "Upon registration of a function, funcX will return a UUID for that function. This UUID can then be used to manage and invoke the function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def hello_world():\n", " return \"Hello World!\"\n", "\n", "func_uuid = fxc.register_function(hello_world)\n", "print(func_uuid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running a function \n", "\n", "To invoke a function, you must provide a) the function's UUID, and b) the `endpoint_id` of the endpoint on which you wish to execute that function. Note: here we use the public funcX tutorial endpoint; you may change the `endpoint_id` to the UUID of any endpoint for which you have permission to execute functions. \n", "\n", "funcX functions are designed to be executed remotely and asynchrously. To avoid synchronous invocation, the result of a function invocation (called a `task`) is a UUID that may be introspected to monitor its execution status and retrieve its results.\n", "\n", "The funcX service will manage the reliable execution of a task, for example, by qeueing tasks when the endpoint is busy or offline and retrying tasks in case of node failures. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tutorial_endpoint = '4b116d3c-1703-4f8f-9f6f-39921e5864df' # Public tutorial endpoint\n", "res = fxc.run(endpoint_id=tutorial_endpoint, function_id=func_uuid)\n", "print(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Retrieving results\n", "\n", "When the task has completed executing you can access the results via the funcX client as follows. Note: while the task is processing it will return exceptions at various stages of the lifecycle (e.g., waiting for the endpoint). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " print(fxc.get_result(res))\n", "except Exception as e:\n", " print(\"Exception: {}\".format(e))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Functions with arguments\n", "\n", "funcX supports registration and invocation of functions with arbitrary arguments and returned parameters. funcX will serialize any \\*args and \\*\\*kwargs when invoking a function and it will serialize any return parameters or exceptions. Note: funcX uses standard Python serilaization libraries (e.g., Pickle, Dill). It also limits the size of input arguments and returned parameters to 5 MB. \n", "\n", "The following example shows a function that computes the sum of a list of input arguments. First, we register the function as above:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def funcx_sum(items):\n", " return sum(items)\n", "\n", "sum_function = fxc.register_function(funcx_sum)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When invoking the function you can pass in arguments like any other function, either by position or with keyword arguments. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "items = [1, 2, 3, 4, 5]\n", "\n", "res = fxc.run(items, endpoint_id=tutorial_endpoint, function_id=sum_function)\n", "\n", "print (fxc.get_result(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Functions with dependencies\n", "\n", "funcX requires that functions explictly state all dependencies within the function body. It also assumes that the dependent libraries are available on the endpoint in which the function will execute. For example, in the following function, we explictly import the time module. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def funcx_date():\n", " from datetime import date\n", " return date.today()\n", "\n", "date_function = fxc.register_function(funcx_date)\n", "\n", "res = fxc.run(endpoint_id=tutorial_endpoint, function_id=date_function)\n", "\n", "print (fxc.get_result(res))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Calling external applications\n", "\n", "Depending on the configuration of the funcX endpoint, you can often invoke external applications that are available in the endpoint environment. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def funcx_echo(name):\n", " import os\n", " return os.popen(\"echo Hello {}\".format(name)).read()\n", "\n", "echo_function = fxc.register_function(funcx_echo)\n", "\n", "res = fxc.run(\"World\", endpoint_id=tutorial_endpoint, function_id=echo_function)\n", "\n", "print (fxc.get_result(res))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Catching exceptions\n", "\n", "When functions fail, the exception is captured and serialized by the funcX endpoint, and reraised when you try to get the result. In the following example, the 'deterministic failure' exception is raised when `fxc.get_result` is called on the failing function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def failing():\n", " raise Exception(\"deterministic failure\")\n", "\n", "failing_function = fxc.register_function(failing)\n", "\n", "res = fxc.run(endpoint_id=tutorial_endpoint, function_id=failing_function)\n", "\n", "fxc.get_result(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running functions many times\n", "\n", "After registering a function you can invoke it repeatedly. The following example shows how the Monte Carlo method can be used to estimate pi. \n", "\n", "Specifically, if a circle with radius $r$ is inscribed inside a square with side length $2r$, the area of the circle is $\\pi r^2$ and the area of the square is $(2r)^2$. Thus, if $N$ uniformly-distributed points are dropped at random locations within the square, approximately $N\\pi/4$ will be inside the circle.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "# function that estimates pi by placing points in a box\n", "def pi(num_points):\n", " from random import random\n", " inside = 0 \n", " for i in range(num_points):\n", " x, y = random(), random() # Drop a point randomly within the box.\n", " if x**2 + y**2 < 1: # Count points within the circle.\n", " inside += 1 \n", " return (inside*4 / num_points)\n", "\n", "# register the function\n", "pi_function = fxc.register_function(pi)\n", "\n", "# execute the function 3 times \n", "estimates = []\n", "for i in range(3):\n", " estimates.append(fxc.run(10**5, endpoint_id=tutorial_endpoint, function_id=pi_function))\n", "\n", "# wait for all tasks to complete\n", "for e in estimates: \n", " while fxc.get_task(e)['pending']:\n", " time.sleep(3)\n", "\n", "# get the results and calculate the total\n", "results = [fxc.get_result(i) for i in estimates]\n", "total = 0\n", "for r in results: \n", " total += r\n", "\n", "# print the results\n", "print(\"Estimates: {}\".format(results))\n", "print(\"Average: {:.5f}\".format(total/len(results)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Asynchronous execution\n", "\n", "Periodically polling funcX for task completion can be computationally expensive. funcX implements an asynchronous execution mode via which you can register for asynchronous callbacks when functions complete (either successfully or when generating an exception). \n", "\n", "To use this mode, you need to set `asynchronous=True` when creating the funcX client. When you subsequently call `run` via the funcX client, you will receive an asyncio task. You can then call `await` to block until the task completes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from funcx.sdk.client import FuncXClient\n", "import asyncio\n", "\n", "fxc = FuncXClient(asynchronous=True)\n", "\n", "def slow_sum(nums):\n", " import time\n", " time.sleep(10)\n", " return sum(nums)\n", "\n", "func_id = fxc.register_function(slow_sum)\n", "\n", "task = fxc.run([1, 2, 3, 4, 5], function_id=func_id, endpoint_id=tutorial_endpoint)\n", "\n", "await task # wait for task completion\n", "print(task.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python Executor interface\n", "\n", "The funcX SDK implements Python's [concurrent.futures Executor](https://docs.python.org/3/library/concurrent.futures.html#executor-objects) interface. The executor interface provides a standard way to run one or more functions asynchronously. We suggest using this interface for execution many functions.\n", "\n", "The Executor interface specifies a slightly different model for execution. Here, you do not need to first register functions for invocation (that is done transpently by funcX), instead you can simply pass a function and input arguments to the `submit` function. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from funcx.sdk.client import FuncXClient\n", "from funcx.sdk.executor import FuncXExecutor\n", "\n", "fx = FuncXExecutor(FuncXClient())\n", "\n", "def double(x):\n", " return x * 2\n", "\n", "count = 10\n", "\n", "futures = []\n", "for i in range(count):\n", " future = fx.submit(double, i, endpoint_id=tutorial_endpoint)\n", " futures.append(future)\n", "\n", "for fu in futures:\n", " print(fu.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Running batches\n", "\n", "After registering a function, you might want to invoke that function many times without making individual calls to the funcX service. Such examples occur when running Monte Carlo simulations, ensembles, and parameter sweep applications. \n", "\n", "funcX provides a batch interface that enables specification of a range of function invocations. To use this interface, you must create a funcX batch object and then add each invocation to that object. You can then pass the constructed object to the `batch_run` interface. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fxc = FuncXClient()\n", "\n", "def squared(x):\n", " return x**2\n", "\n", "squared_function = fxc.register_function(squared)\n", "\n", "inputs = list(range(10))\n", "batch = fxc.create_batch()\n", "\n", "for x in inputs:\n", " batch.add(x, endpoint_id=tutorial_endpoint, function_id=squared_function)\n", " \n", "batch_res = fxc.batch_run(batch)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similary, funcX provides an interface to retrieve the status of the entire batch of invocations. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fxc.get_batch_result(batch_res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Describing and discovering functions \n", "\n", "funcX manages a registry of functions that can be shared, discovered and reused. \n", "\n", "When registering a function, you may choose to set a description to support discovery, as well as making it `public` (so that others can run it) and/or `searchable` (so that others can discover it). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def hello_world():\n", " return \"Hello World!\"\n", "\n", "func_uuid = fxc.register_function(hello_world, description=\"hello world function\", public=True, searchable=True)\n", "print(func_uuid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can search previously registered functions to which you have access using `search_function`. The first parameter, `q`, is searched against all the fields, such as author, description, function name, and function source. You can navigate through pages of results with the `offset` and `limit` keyword args. \n", "\n", "The object returned is a simple wrapper on a list, so you can index into it, but also can have a pretty-printed table. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "search_results = fxc.search_function(\"hello\", offset=0, limit=5)\n", "print(search_results)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Endpoint operations\n", "\n", "You can retrieve information about endpoints including status and information about how the endpoint is configured. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fxc.get_endpoint_status(tutorial_endpoint)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.13" } }, "nbformat": 4, "nbformat_minor": 4 }