{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# X-like Objects\n", "\n", "In Fugue, it's flexibile to initialize many built-in objects. This is a tutorial for all of them.\n", "\n", "## Schema\n", "\n", "Fugue creates a special syntax to represent schema: Separated by `,`, each column type pair is `:`\n", "\n", "For example: `a:int,b:str` or `a:int,b_array:[int],c_dict:{x:int,y:str}`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import Schema\n", "\n", "print(Schema(\"a:int,b:str\"))\n", "print(Schema(\"a:int32,b_array:[int64],c_dict:{x:int,y:string}\"))\n", "\n", "# get pyarrow schema\n", "schema = Schema(\" a : int , b : str\") # space is ok\n", "print(\"pa schema\", schema.pa_schema)\n", "\n", "# more ways to initialized fugue Schema\n", "print(Schema(schema.pa_schema)) # by pyarrow schema\n", "print(Schema(c=str,d=int)) # pythonic way\n", "print(Schema(dict(c=str,d=int))) # pythonic way\n", "print(Schema(\"e:str\",\"f:str\")) # you can separate\n", "print(Schema([\"e:str\",\"f:str\"], (\"g\",int))) # you can separate, notice int in python means long in schema\n", "print(Schema(Schema(\"a:int\",\"b:str\"))) # you can separate" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parameters\n", "\n", "`ParamDict` is not that flexible, it can only accept dict or list of tuples just like python dict. `ParamDict` itself is a python dict." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from triad.collections import ParamDict\n", "\n", "print(ParamDict())\n", "print(ParamDict(dict(a=1,b=\"d\")))\n", "print(ParamDict([(\"a\",1),(\"b\",\"d\")]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataFrame\n", "\n", "Normally, you should create a dataframe from [ExecutionEngine](execution_engine.ipynb) or [FugueWorkflow](dag.ipynb). In general, all execution engines and workflows support list/iterable of python arrays and pandas or Fugue dataframes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import ExecutionEngine, FugueWorkflow, NativeExecutionEngine, PandasDataFrame\n", "from fugue_dask import DaskExecutionEngine\n", "import pandas as pd\n", "\n", "def construct_df_by_execution_engine(eng:ExecutionEngine):\n", " eng.to_df([[0]], \"a:int\", {\"x\":1}).show(title=\"from array\")\n", " df = PandasDataFrame([[0]], \"a:int\")\n", " eng.to_df(df).show(title=\"from fugue dataframe\")\n", " eng.to_df(df.as_pandas()).show(title=\"from pandas dataframe\")\n", " \n", "construct_df_by_execution_engine(NativeExecutionEngine())\n", "construct_df_by_execution_engine(DaskExecutionEngine()) # notice the dataframe types change\n", "\n", "print(\"-----------------------------------\")\n", "\n", "def construct_df_by_workflow(eng:ExecutionEngine):\n", " with FugueWorkflow(eng) as dag:\n", " dag.df([[0]], \"a:int\", {\"x\":1}).show(title=\"from array\")\n", " df = PandasDataFrame([[0]], \"a:int\")\n", " dag.df(df).show(title=\"from fugue dataframe\")\n", " dag.df(df.as_pandas()).show(title=\"from pandas dataframe\")\n", " \n", "construct_df_by_workflow(NativeExecutionEngine())\n", "construct_df_by_workflow(DaskExecutionEngine()) # notice the dataframe types change " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataFrames\n", "\n", "`DataFrames` is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame\n", "\n", "df1 = ArrayDataFrame([[0]],\"a:int\")\n", "df2 = ArrowDataFrame([[1]],\"b:int\")\n", "\n", "dfs = DataFrames(df1, df2) # list-like\n", "assert not dfs.has_key\n", "assert df1 is dfs[0]\n", "assert df2 is dfs[1]\n", "# how to get values as an array in list-like DataFrames\n", "print(list(dfs.values()))\n", "\n", "dfs = DataFrames(x=df1, y=df2) # dict-like\n", "assert dfs.has_key\n", "assert df1 is dfs[\"x\"]\n", "assert df2 is dfs[\"y\"]\n", "assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate\n", "\n", "dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like\n", "\n", "df3 = ArrowDataFrame([[1]],\"a:int\")\n", "dfs1 = DataFrames(dict(x=df1,y=df2))\n", "dfs2 = DataFrames(dfs1, z=df3) # DataFrames are immutable, but you can update in this way\n", "dfs2 = DataFrames(dict(x=df1,y=df2), z=df3)\n", "\n", "dfs3 = DataFrames(df1,df2)\n", "dfs4 = DataFrames(dfs3, df3) # DataFrames are immutable, but you can update in this way\n", "dfs4 = DataFrames([df1,df2], df3) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### DataFrames in Programming Interface\n", "\n", "In Fugue programming interface, it's common to use DataFrames, it's also very flexible" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import FugueWorkflow, ArrayDataFrame, DataFrames\n", "from fugue_dask import DaskExecutionEngine\n", "\n", "def show(dfs:DataFrames, title=\"\"):\n", " if title!=\"\":\n", " print(title)\n", " for k,v in dfs.items():\n", " v.show(title=k)\n", "\n", "df1 = ArrayDataFrame([[0]],\"a:int\")\n", "df2 = ArrayDataFrame([[1]],\"b:int\")\n", "\n", "with FugueWorkflow(DaskExecutionEngine) as dag:\n", " # inside output, it constructs DataFrames(df1, df2) and then convert both to WorkflowDataFrames\n", " # you can see it's a list like dataframes, but due to the engine, they become DaskDataFrame\n", " # all these conversions are automatic\n", " dag.output(df1,df2,using=show, params={\"title\":\"*args from raw dataframes\"})\n", " dag.output([df1,df2],using=show, params={\"title\":\"list from raw dataframes\"})\n", " # dict-like input must be passed in as dicts\n", " dag.output(dict(x=df1,y=df2),dict(z=df1),using=show, params={\"title\":\"dict from raw dataframes\"})\n", " \n", " cdf1=dag.df(df1) # you can also convert the dataframes to WorkflowDataFrame explicitly (recommended)\n", " dag.output(cdf1,df2,using=show, params={\"title\":\"mixed\"}) # and you can mix them, although not recommended " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Partition" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import PartitionSpec\n", "\n", "assert PartitionSpec().empty # empty partition spec means no operation needed, it can be the default value\n", "PartitionSpec(num=4)\n", "PartitionSpec(algo=\"even\",num=4,by=[\"a\",\"b\"],presort=\"c,d desc\") # c,d desc == c ASC, d DESC\n", "\n", "# you can use expression in num, ROWCOUNT can be used to indicate using the row count of the dataframe to operate on\n", "# if a df has 1000 rows, this means I want to even partition it to 10 rows per partition\n", "PartitionSpec(algo=\"even\",num=\"ROWCOUNT/10\")\n", "\n", "PartitionSpec({\"num\":4, \"by\":[\"a\",\"b\"]}) # from dict, using dict on `partition-like` parameters is common\n", "PartitionSpec('{\"num\":4}') # from json\n", "\n", "a = PartitionSpec(num=4)\n", "b = PartitionSpec(by=[\"a\"])\n", "c = PartitionSpec(a,b) # combine\n", "\n", "p = PartitionSpec(num=4, by=[\"a\"])\n", "PartitionSpec(p, by=[\"a\",\"b\"], algo=\"even\") # override" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RPC\n", "\n", "For callbacks you defined for transformers, you can provide a lambda function, a native python function, or an instance implementing [RPCHandler](https://fugue.readthedocs.io/en/latest/api/fugue.rpc.html#fugue.rpc.base.RPCHandler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from fugue import FugueWorkflow\n", "from fugue.rpc import RPCHandler\n", "\n", "# schema: *\n", "def print_describe_and_return(df:pd.DataFrame, cb:callable) -> pd.DataFrame:\n", " cb(str(df.describe()))\n", " return df\n", "\n", "dag = FugueWorkflow()\n", "df = dag.df([[0,0],[1,1],[0,1],[2,2]],\"a:int,b:int\")\n", "\n", "# lambda\n", "df.partition(by=[\"a\"]).transform(print_describe_and_return, callback = lambda x:print(x)).show()\n", "\n", "# function\n", "def pt(x):\n", " print(x)\n", "\n", "df.partition(by=[\"a\"]).transform(print_describe_and_return, callback = pt).show()\n", "\n", "# RPCHandler\n", "class Handler(RPCHandler):\n", " def __init__(self):\n", " super().__init__()\n", " \n", " def __call__(self, x):\n", " print(x)\n", " \n", "df.partition(by=[\"a\"]).transform(print_describe_and_return, callback = Handler()).show()\n", "\n", "dag.run()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.9" } }, "nbformat": 4, "nbformat_minor": 4 }