{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Transformer\n", "\n", "`Transformer` represents the logic unit executing on logical partitions of the input dataframe. The partitioning logic is not a concern of `Transformer`, it should be specified in a previous step. But you must understand the concept of partition in Fugue, please read [this](./partition.ipynb).\n", "\n", "**It accepts these input DataFrame Types**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`\n", "\n", "**It accepts these output DataFrame types**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`\n", "\n", "Notice that `ArrayDataFrame` and other local dataframes can't be used as annotation, you must use `LocalDataFrame`.\n", "\n", "`Transformer` requires users to be explicit on the output schema. `*` can represent the input dataframe schema, so `*,b:int` means the output will have an additional column. The schema can be specified by shema hint, decorator, or in the Fugue code.\n", "\n", "## Why Explicit on Output Schema?\n", "\n", "Normally computing frameworks can infer output schema, however, it is neither reliable nor efficient. To infer the schema, it has to go through at least one partition of data and figure out the possible schema. However, what if a transformer is producing inconsistent schemas on different data partitions? What if that partition takes a long time or fail? So to avoid potential correctness and performance issues, `Transformer` and `CoTransformer` output schemas are required in Fugue.\n", "\n", "## Native Approach\n", "\n", "The simplest way, with no dependency on Fugue. You just need to have acceptable annotations on input dataframe and output. In native approach, you must specify schema in the Fugue code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import Iterable, Dict, Any, List\n", "import pandas as pd\n", "\n", "def add(df:pd.DataFrame, n=1) -> pd.DataFrame:\n", " df[\"b\"]+=n\n", " return df\n", " \n", "def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:\n", " yield next(df)\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import FugueWorkflow\n", "\n", "with FugueWorkflow() as dag:\n", " df = dag.df([[0,1],[0,2],[1,3],[1,1]],\"a:int,b:int\")\n", " # with out schema hint you have to specify schema in Fugue code\n", " df = df.transform(add, schema=\"*\").transform(add, schema=\"*\", params=dict(n=2))\n", " # how to define partition for transformers to operate on\n", " # get smallest b of each partition\n", " df.partition(by=[\"a\"], presort=\"b\").transform(get_top, schema=\"*\").show()\n", " # get largest b of each partition\n", " df.partition(by=[\"a\"], presort=\"b DESC\").transform(get_top, schema=\"*\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## With Schema Hint\n", "\n", "When you need to reuse a transformer multiple times, it's tedious to specify the schema in Fugue code every time. You can instead, write a schema hint on top of the function, this doesn't require you to have Fugue dependency. The following code is doing the same thing as above but see how much shorter." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import Iterable, Dict, Any, List\n", "import pandas as pd\n", "\n", "# schema: *\n", "def add(df:pd.DataFrame, n=1) -> pd.DataFrame:\n", " df[\"b\"]+=n\n", " return df\n", " \n", "# schema: *\n", "def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:\n", " yield next(df)\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import FugueWorkflow\n", "\n", "with FugueWorkflow() as dag:\n", " df = dag.df([[0,1],[0,2],[1,3],[1,1]],\"a:int,b:int\")\n", " df = df.transform(add).transform(add, params=dict(n=2)) # see how parameters are set\n", " df.partition(by=[\"a\"], presort=\"b\").transform(get_top).show()\n", " df.partition(by=[\"a\"], presort=\"b DESC\").transform(get_top).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Schema Hint Syntax\n", "\n", "ONLY for `Transformer` the schema hint has special syntax that makes it very flexible. Please read [this](https://triad.readthedocs.io/en/latest/api/triad.collections.html#triad.collections.schema.Schema.transform) for detailed syntax, here we only show some examples." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from typing import Iterable, Dict, Any, List\n", "import pandas as pd\n", "\n", "# schema: *,c:int\n", "def with_c(df:pd.DataFrame) -> pd.DataFrame:\n", " df[\"c\"]=1\n", " return df\n", "\n", "# schema: *-b\n", "def drop_b(df:pd.DataFrame) -> pd.DataFrame:\n", " return df.drop(\"b\", axis=1)\n", "\n", "# schema: *~b,c\n", "def drop_b_c_if_exists(df:pd.DataFrame) -> pd.DataFrame:\n", " return df.drop([\"b\",\"c\"], axis=1, errors='ignore')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import FugueWorkflow\n", "\n", "with FugueWorkflow() as dag:\n", " df = dag.df([[0,1],[0,2],[1,3],[1,1]],\"a:int,b:int\")\n", " df = df.transform(with_c)\n", " df.show()\n", " df = df.transform(drop_b)\n", " df.show()\n", " df = df.transform(drop_b_c_if_exists)\n", " df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Decorator Approach\n", "\n", "Decorator approach can do everything the schema hint can do, plus, it can take in a function to generate the schema." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from fugue import transformer\n", "\n", "# df is the zipped DataFrames, **kwargs is the parameters passed in from fugue\n", "@transformer(lambda df, **kwargs: df.schema+\"c:int\") # == @transformer(\"*,c:int\") \n", "def with_c(df:pd.DataFrame) -> pd.DataFrame:\n", " df[\"c\"]=1\n", " return df\n", "\n", "with FugueWorkflow() as dag:\n", " df = dag.df([[0,1],[0,2],[1,3],[1,1]],\"a:int,b:int\")\n", " df = df.transform(with_c)\n", " df.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interface Approach\n", "\n", "All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But for certain cases, you should implement interface, for example:\n", "\n", "* Your output schema needs partition information, such as partition keys, schema, and current values of the keys\n", "* You have an expensive but common initialization step for processing each logical partition, this should happen when initializaing physical partition\n", "\n", "The biggest advantage of interface approach is that you can customize pyhisical partition level initialization, and you have all the up-to-date context variables to use.\n", "\n", "In the interface approach, type annotations are not necessary, but again, it's good practice to have them.\n", "\n", "The following case focuses on performance comparison, to see how to use context variables, read [CoTransfromer example](./cotransformer.ipynb#Interface-Approach)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import Transformer, FugueWorkflow, PandasDataFrame, DataFrame, LocalDataFrame\n", "from triad.collections import Schema\n", "from time import sleep\n", "import pandas as pd\n", "import numpy as np\n", "\n", "def expensive_init(sec=5):\n", " sleep(sec)\n", "\n", "def helper(ct=20) -> pd.DataFrame:\n", " np.random.seed(0)\n", " return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))\n", "\n", "class Median(Transformer):\n", " # this is invoked on driver side\n", " def get_output_schema(self, df):\n", " return df.schema + (self.params.get_or_throw(\"col\", str),float)\n", " \n", " # on initialization of the physical partition\n", " def on_init(self, df: DataFrame) -> None:\n", " self.col = self.params.get_or_throw(\"col\", str)\n", " expensive_init(self.params.get(\"sec\",0))\n", " \n", " def transform(self, df):\n", " pdf = df.as_pandas()\n", " pdf[self.col]=pdf[\"b\"].median()\n", " return PandasDataFrame(pdf)\n", " \n", "\n", "with FugueWorkflow() as dag:\n", " dag.create(helper).partition(by=[\"a\"]).transform(Median, params={\"col\":\"m\", \"sec\": 1}).show(rows=100) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that we set `self.col` in `on_init`, it's better to set it in `on_init` or `transform`. It's better not to set it in `get_output_schema` because that will need to be serialized and send to each workers if using a distributed engine, serialization can fail for some value types.\n", "\n", "In order to show the benefit of `on_init` we also create an interfaceless version (which is a lot simpler), but you have to call `expensive_init` in that function for each logical partition. Also, in the run function, we set `num=2` to show the effect. So for `Median` transformer, the `expensive_init` will be called at most twice, but for `median` it will be called for more times.\n", "\n", "Notice, the numbers may be off if you run this on binder." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_spark import SparkExecutionEngine\n", "from timeit import timeit\n", "\n", "# schema: *, m:double\n", "def median(df:pd.DataFrame, sec=0) -> pd.DataFrame:\n", " expensive_init(sec)\n", " df[\"m\"]=df[\"b\"].median()\n", " return df\n", "\n", "def run(engine, interfaceless, sec):\n", " with FugueWorkflow(engine) as dag:\n", " df = dag.create(helper)\n", " if interfaceless:\n", " df.partition(by=[\"a\"], num=2).transform(median, params={\"sec\": sec}).show(rows=100)\n", " else:\n", " df.partition(by=[\"a\"], num=2).transform(Median, params={\"col\":\"m\", \"sec\": sec}).show(rows=100)\n", " \n", "engine = SparkExecutionEngine()\n", "print(timeit(lambda: run(engine, True, 1), number=1))\n", "print(timeit(lambda: run(engine, False, 1), number=1))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }