{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Data Type, Schema & DataFrames\n", "\n", "## Data Types\n", "\n", "Fugue does not have its own data types. Instead, we use a subset of data types from [Apache Arrow](https://arrow.apache.org/docs/index.html). Most of the pyarrow data types are supported. To see what the complete supported list you can read [this](https://triad.readthedocs.io/en/latest/api/triad.utils.html#triad.utils.pyarrow.is_supported) and its source code.\n", "\n", "### Non-nested Types\n", "\n", "For non-nested types, here is the list. `is_primary` means if converting from pyarrow type to Fugue expression, we will use the primary ones. If you are interested, it's generated by [this Fugue code](#Generate-Types)\n", "\n", "Primary |Fugue Expression |PyArrow\n", "--------------|-------------------|-----------\n", "* |bool |bool \n", " |boolean |bool \n", "* |date |date32[day] \n", "* |double |double \n", " |float64 |double \n", "* |float |float \n", " |float32 |float \n", "* |float16 |halffloat \n", "* |short |int16 \n", " |int16 |int16 \n", "* |int |int32 \n", " |int32 |int32 \n", "* |long |int64 \n", " |int64 |int64 \n", "* |byte |int8 \n", " |int8 |int8 \n", "* |null |null \n", "* |str |string \n", " |string |string \n", "* |datetime |timestamp[us] \n", "* |ushort |uint16 \n", " |uint16 |uint16 \n", "* |uint |uint32 \n", " |uint32 |uint32 \n", "* |ulong |uint64 \n", " |uint64 |uint64 \n", "* |ubyte |uint8 \n", " |uint8 |uint8 \n", "\n", "### Nested Types\n", "\n", "`pa.ListType` and `pa.StructType` are supported. For list type, the type expression is `[]`, for struct type, it is json like expression, for example `{a:int,b:[str]}` meaning the data is a dict with key `a` as int and `b` as a list of string.\n", "\n", "Notice, it is just a way to express pyarrow data types, it does not invent new types.\n", "\n", "## Schema\n", "Again, Fugue does not invent schema, it uses [pyarrow schema](https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html). But Fugue creates a special syntax to represent schema: Separated by `,`, each column type pair is `:`\n", "\n", "For example: `a:int,b:str` or `a:int,b_array:[int],c_dict:{x:int,y:str}`\n", "\n", "Now let's see some examples using the API:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from triad.collections import Schema\n", "\n", "print(Schema(\"a:int,b:str\"))\n", "print(Schema(\"a:int32,b_array:[int64],c_dict:{x:int,y:string}\"))\n", "\n", "# get pyarrow schema\n", "schema = Schema(\" a : int , b : str\") # space is ok\n", "print(\"pa schema\", schema.pa_schema)\n", "\n", "# more ways to initialized fugue Schema\n", "print(Schema(schema.pa_schema)) # by pyarrow schema\n", "print(Schema(c=str,d=int)) # pythonic way\n", "print(Schema(\"e:str\",\"f:str\")) # you can separate\n", "\n", "# Compare schema with string\n", "assert Schema(\"a: int, b: int64\") == \"a:int,b:long\"\n", "\n", "# Operations\n", "print(Schema(\"a:int\")+Schema(\"b:str\"))\n", "print(Schema(\"a:int\")+\"b:str\")\n", "print(Schema(\"a:int,c:int,d:int\") - [\"c\"]) # for '-' all cols must exist\n", "print(Schema(\"a:int,c:int,d:int\").exclude([\"c\",\"x\"])) # exclude means exclude if exists\n", "print(Schema(\"a:int,c:int,d:int\").extract([\"d\",\"a\"]))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Schema` is very flexiible, for full API reference, please read [this](https://triad.readthedocs.io/en/latest/api/triad.collections.html#module-triad.collections.schema)\n", "\n", "\n", "## DataFrame\n", "\n", "**All Fugue operations are on DataFrame**, there is no concept such as `RDD` or arbitrary object (they are not the core concetps Fugue wants to unify, but you still can use them easily in this framework, see other tutorials). `DataFrame` is an abstract concept, it is schemaed dataset. And schema has been defined above.\n", "\n", "The motivation of Fugue DataFrame is significantly different from other ideas such as Dask, Modin or Koalas. Fugue DataFrame is not to become another pandas-like DataFrame. And **Fugue is NOT going to use Pandas language to unify data processing**. That being said, Pandas and Pandas-like dataframes are still widely used and well supported in this framework, because it's an important component for data science.\n", "\n", "### LocalDataFrame\n", "\n", "These are built in local dataframes of Fugue:\n", "* **ArrayDataFrame**: the underlying data is an array\n", "* **IterableDataFrame**: the underlying is an iterable, this unbounded dataframe, it is extemely useful to process dataset with unknown size, it minimize the memory usage and can jump out from the iteration any time.\n", "* **PandasDataFrame**: adapter to pandas DataFrame\n", "* **ArrowDataFrame**: adapter to arrow Table\n", "\n", "You can convert between each other. For more all DataFrames, they all can convert to local dataframe, or local bounded dataframe.\n", "\n", "\n", "### Other DataFrames\n", "\n", "These are built in non-local dataframes. To use them, you need to pip install the extras\n", "* **SparkDataFrame**: adapter to Spark DataFrame\n", "* **DaskDataFrame**: adapter to Dask DataFrame\n", "\n", "\n", "### Initialization & Conversion\n", "\n", "It's more important to learn how to initialize local dataframes because by using Fugue, in most of the cases, you only deal with local dataframes on a single machine, single thread." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import ArrayDataFrame, ArrowDataFrame, IterableDataFrame, PandasDataFrame\n", "\n", "data = [[0,\"a\"],[1,\"b\"]]\n", "schema = \"x:int,y:str\"\n", "\n", "# The most basic initialization is the same\n", "ArrayDataFrame(data,schema).show()\n", "ArrowDataFrame(data,schema).show()\n", "IterableDataFrame(data,schema).show()\n", "PandasDataFrame(data,schema).show()\n", "\n", "# common methods as_array, as_array_iterable, as_dict_iterable\n", "print(ArrowDataFrame(data,schema).as_array())\n", "print(ArrowDataFrame(data,schema).as_array_iterable()) # iterator object\n", "# type safe is very useful\n", "df = ArrayDataFrame(data, \"x:str,y:str\")\n", "assert isinstance(df.as_array()[0][0], int) # as_array or as_array_iterable by default returns raw data\n", "assert isinstance(df.as_array(type_safe=True)[0][0], str) # turn on type safe to return the data according to schema\n", "\n", "\n", "# as_pandas is the common interface for all DataFrames\n", "pdf = ArrayDataFrame(data,schema).as_pandas()\n", "print(pdf)\n", "PandasDataFrame(pdf).show() # convert pd.DataFrame to PandasDataFrame\n", "\n", "# as_arrow is the common interface for all DataFrames\n", "adf = ArrayDataFrame(data,schema).as_arrow()\n", "print(adf)\n", "ArrowDataFrame(adf).show() # convert arrow table to ArrowDataFrame\n", "\n", "# access the native data structure using .native, it applies for all built in DataFrames\n", "print(ArrayDataFrame(data,schema).native)\n", "print(ArrowDataFrame(data,schema).native)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`IterableDataFrame` is very special, because it's not bounded. But one important feature of Fugue `IterableDataFrame` is that, it is empty aware, so at any point you can check if the dataframe is empty and peek the head row, it will not affect the iteration. If you are interested in details, read [this](https://triad.readthedocs.io/en/latest/api/triad.utils.html#triad.utils.iter.EmptyAwareIterable)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import IterableDataFrame\n", "\n", "data = [[0,\"a\"],[1,\"b\"]]\n", "schema = \"x:int,y:str\"\n", "\n", "# The most basic initialization is the same\n", "df = IterableDataFrame(data,schema)\n", "assert not df.empty\n", "df.show()\n", "assert df.empty # because show consumes all elements of the iterable\n", "\n", "# it is ok to convert to other types, but they can work only once\n", "print(IterableDataFrame(data,schema).as_array())\n", "print(IterableDataFrame(data,schema).as_pandas())\n", "print(IterableDataFrame(data,schema).as_arrow())\n", "\n", "# common way to use\n", "df = IterableDataFrame(data,schema)\n", "for row in df.as_dict_iterable():\n", " print(row)\n", "\n", " \n", "from fugue.dataframe.utils import to_local_df, to_local_bounded_df\n", "\n", "df = IterableDataFrame(data,schema)\n", "assert to_local_df(df) is df # because it is already local dataframe\n", "assert to_local_bounded_df(df) is not df # because it is not bounded" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For non-local dataframes, you can convert them to local dataframe. But the initialization will depend on specific execution engines. Here we only use DaskDataFrame as an example" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_dask import DaskDataFrame, DaskExecutionEngine\n", "\n", "data = [[0,\"a\"],[1,\"b\"]]\n", "schema = \"x:int,y:str\"\n", "\n", "engine = DaskExecutionEngine()\n", "df = engine.to_df(data, schema) # use engine.to_df is the best way to generate engine dependent dataframes\n", "assert isinstance(df, DaskDataFrame)\n", "\n", "df.as_local().show() # it converts to a local DataFrame, for Dask, it becomes a PandasDataFrame\n", "print(df.native) # access the dask dataframe\n", "\n", "\n", "from fugue.dataframe.utils import to_local_df, to_local_bounded_df\n", "\n", "df = engine.to_df(data, schema)\n", "to_local_df(df).show()\n", "to_local_bounded_df(df).show() # this is stronger, it prevents using IterableDataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Commonly, you only tell Fugue: I want to create a dataframe, and here is the raw data source. And Fugue with certain execution engine will do the job for you. In you own logic, you mostly care about two abstract ypes in you functions: `DataFrame` and `LocalDataFrame`. They can be seen in the extentions tutorials." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## DataFrames\n", "\n", "`DataFrames` is a type, it represents a collection of Fugue DataFrames. It can be dict-like where each dataframe has a name, or list-like. It is also an extensively used data structure in the framework" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import DataFrames, ArrayDataFrame, ArrowDataFrame\n", "\n", "df1 = ArrayDataFrame([[0]],\"a:int\")\n", "df2 = ArrowDataFrame([[1]],\"b:int\")\n", "\n", "dfs = DataFrames(df1, df2) # list-like\n", "assert not dfs.has_key\n", "assert df1 is dfs[0]\n", "assert df2 is dfs[1]\n", "# how to get values as an array in list-like DataFrames\n", "print(list(dfs.values()))\n", "\n", "dfs = DataFrames(x=df1, y=df2) # dict-like\n", "assert dfs.has_key\n", "assert df1 is dfs[\"x\"]\n", "assert df2 is dfs[\"y\"]\n", "assert isinstance(dfs, dict) # dfs itself is dict, so you know how to iterate\n", "\n", "dfs = DataFrames(dict(x=df1,y=df2)) # another equal way to init dict-like\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Appendix\n", "\n", "## Generate Types\n", "We use Fugue SQL to generate Fugue -> PyArrow type table. We will learn how to use Fugue SQL later. You don't have to understand everything in this code" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import triad\n", "from fugue_sql import FugueSQLWorkflow\n", "from typing import List, Any\n", "\n", "#schema: fugue_type_expr:str, pa_type:str\n", "def type_to_expr(primary:bool=False) -> List[List[Any]]:\n", " if not primary:\n", " return [[k,str(v)] for k,v in triad.utils.pyarrow._TYPE_EXPRESSION_MAPPING.items()]\n", " else:\n", " return [[v,str(k)] for k,v in triad.utils.pyarrow._TYPE_EXPRESSION_R_MAPPING.items()]\n", " \n", "with FugueSQLWorkflow() as dag:\n", " dag(\"\"\"\n", " f2p = CREATE USING type_to_expr\n", " f2p_primary = CREATE USING type_to_expr(primary=true)\n", " SELECT CASE WHEN f2p_primary.pa_type IS NOT NULL THEN \"*\" ELSE \"\" END AS is_primary,f2p.*\n", " FROM f2p LEFT OUTER JOIN f2p_primary \n", " ON f2p.fugue_type_expr=f2p_primary.fugue_type_expr\n", " ORDER BY pa_type, is_primary DESC\n", " \n", " PRINT ROWS 100\n", " \"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }