{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 3.1 Aggregation Tutorial\n", "\n", "## About\n", "\n", "This notebook contains a minimal example for running workflows on aggregates of jobs using **signac-flow**. \n", "\n", "## Author\n", "\n", "Hardik Ojha\n", "\n", "## Prerequisites\n", "\n", "This notebooks requires the following packages:\n", "\n", "1. **signac-flow** >= 0.15\n", "2. numpy\n", "3. matplotlib\n", "\n", "Execute the command below to install the required packages:\n", "```bash\n", "pip install signac-flow>=0.15 matplotlib numpy\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Definition\n", "\n", "Aggregation allows a **signac-flow** operation to act on multiple jobs, rather than one job at a time.\n", "\n", "An aggregate is defined as a subset of the jobs in a **signac** project. Aggregates are generated when a `flow.aggregator` object is provided to the `FlowProject.operation` decorator.\n", "\n", "Please refer to the [documentation](https://docs.signac.io/en/latest/aggregation.html) for detailed instructions on how to use aggregation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Objective\n", "\n", "The goal of this project is to plot the temperature values present in a **signac** data space along with the average value of all the temperatures present." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Project Setup\n", "\n", "Before we initialize a **signac** project inside the `projects/tutorial-aggregation` directory, we need to be sure that no such directory exists. Uncomment before executing the below cell to remove the directory if exists." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# !rm -rf projects/tutorial-aggregation" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "\n", "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import signac\n", "from flow import FlowProject, aggregator\n", "\n", "# Setting default figure size\n", "plt.rcParams[\"figure.figsize\"] = (10, 4)\n", "\n", "\n", "# Initializing a signac project\n", "project = signac.init_project(\"projects/tutorial-aggregation\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Initializing the data space\n", "\n", "For the purpose of this notebook, we will be creating a random dataset using some mathematical calculations.\n", "\n", "All the **signac** jobs will have two state point parameters and one document value.\n", "\n", "- `job.statepoint[\"city\"]`: City for which data is being collected.\n", "- `job.statepoint[\"day\"]`: Day of the year.\n", "- `job.document[\"temperature\"]`: Average temperature for that day." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "days = np.arange(365)\n", "\n", "\n", "def generate_temperatures(days, seed=None):\n", " rng = np.random.default_rng(seed)\n", " avg_temperature = 10 + rng.random() * 10\n", " annual_variation = -10 * np.cos(days / 365 * 2 * np.pi)\n", " random_variation = 5 * rng.random(len(days))\n", " temperatures = avg_temperature + annual_variation + random_variation\n", " return temperatures\n", "\n", "\n", "temperatures = generate_temperatures(days, seed=123)\n", "\n", "for day, temperature in zip(days.tolist(), temperatures.tolist()):\n", " # Create a signac job having the state point parameters 'day' and 'temperature'\n", " statepoint = dict(city=\"Anytown\", day=day)\n", " job = project.open_job(statepoint)\n", " job.document[\"temperature\"] = temperature" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at the project schema to see the jobs that were created." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "ProjectSchema(<len=2>)
{\n",
       " 'city': 'str([Anytown], 1)',\n",
       " 'day': 'int([0, 1, 2, ..., 363, 364], 365)',\n",
       "}
" ], "text/plain": [ "ProjectSchema()" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "project.detect_schema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a FlowProject with aggregate operations\n", "\n", "In order to achieve our goal using **signac-flow**, we need to create a `FlowProject` and add operations to it.\n", "There will be following operations in our workflow:\n", "\n", "1. `compute_average_temperature`: This operation computes the average temperature of the year and stores it in the project document. For this operation, all the jobs present in the **signac** project will be aggregated together. This will be the first operation to get executed in our workflow.\n", "2. `plot_deviation_from_average`: This operation plots the temperature (as a scatter plot) and the average temperature of the month. For this operation, all the jobs, when sorted by the state point parameter `day`, present in the **signac** project will be aggregated together. This will be executed after the operation `compute_average_temperature`." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "class AggregationProject(FlowProject):\n", " pass\n", "\n", "\n", "@AggregationProject.post(lambda *jobs: project.doc.get(\"average_temperature\", False))\n", "@AggregationProject.operation(aggregator=aggregator())\n", "def compute_average_temperature(*jobs):\n", " \"\"\"Compute the average temperature using the state point parameter,\n", " \"temperature\", of all jobs present in the signac project and\n", " store the computed value to the project document.\n", " \"\"\"\n", " average_temp = np.mean([job.document[\"temperature\"] for job in jobs])\n", " project.document[\"average_temperature\"] = float(average_temp)\n", "\n", "\n", "@AggregationProject.pre.after(compute_average_temperature)\n", "@AggregationProject.operation(aggregator=aggregator(sort_by=\"day\"))\n", "def plot_daily_temperature(*jobs):\n", " \"\"\"Graph of daily temperature for the year.\"\"\"\n", " print(\"Generating plot of daily temperature.\")\n", " average_temp = project.document[\"average_temperature\"]\n", " days = [job.sp[\"day\"] for job in jobs]\n", " fig, ax = plt.subplots()\n", " ax.plot(\n", " days,\n", " [job.document[\"temperature\"] for job in jobs],\n", " \"rx\",\n", " label=\"Daily Temperature (°C)\",\n", " )\n", " # Plot the average as a line\n", " ax.axhline(average_temp, c=\"green\", label=\"Average Annual Temperature (°C)\")\n", " ax.legend()\n", " ax.set_xlabel(\"Day\")\n", " ax.set_ylabel(\"Temperature (°C)\")\n", " plt.show()\n", "\n", "\n", "every_sunday = aggregator(sort_by=\"day\", select=lambda job: job.sp[\"day\"] % 7 == 0)\n", "\n", "\n", "@AggregationProject.operation(aggregator=every_sunday)\n", "def plot_weekly_temperature(*jobs):\n", " \"\"\"Graph the temperature for only one day of each week.\"\"\"\n", " print(\"Generating plot of weekly temperature.\")\n", " days = [job.sp.day for job in jobs]\n", " fig, ax = plt.subplots()\n", " ax.plot(\n", " days,\n", " [job.document[\"temperature\"] for job in jobs],\n", " \"rx\",\n", " label=\"Daily Temperature (°C)\",\n", " )\n", " ax.set_xlabel(\"Day\")\n", " ax.set_ylabel(\"Temperature (°C)\")\n", " plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Executing the workflow" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Initializing the FlowProject\n", "In order to register the operations, conditions, and the aggregators associated with the project we created, we need to initialize a `FlowProject`.\n", "Since the **signac** project does not belong in the current directory, we specify its path to `FlowProject.get_project`." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "flow_project = AggregationProject.get_project(project.path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running the workflow\n", "The `FlowProject.run` method allows the execution all eligible operations in the `FlowProject`." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Generating plot of weekly temperature.\n" ] }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "Operation 'plot_weekly_temperature' has no postconditions!\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Generating plot of daily temperature.\n" ] }, { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "Operation 'plot_daily_temperature' has no postconditions!\n", "Operation 'plot_weekly_temperature' has no postconditions!\n" ] } ], "source": [ "flow_project.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Summary" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have successfully plotted the temperature values present in a **signac** data space along with the average value of all the temperatures present using the aggregation feature of **signac-flow**.\n", "\n", "To learn more about how to use aggregation, see the [documentation on aggregation](https://docs.signac.io/en/latest/aggregation.html)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.0" } }, "nbformat": 4, "nbformat_minor": 4 }