{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Execution Engine\n", "\n", "It is the heart of Fugue. It is the layer that unifies core concepts of distributed computing, and separates the underlying computing frameworks from user's high level logic. Normally you don't directly operate on execution engines. But it's good to understand the basics.\n", "\n", "In Fugue, **the only dataset is schemaed dataframes**, so although there are other important concepts such as `RDD`, we don't cover them. More options may result in more flexibility or more confusion. You can read these [1](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html) [2](https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/) to get more insights.\n", "\n", "However, it is important to understand that, **you have full access to any underlying computing frameworks**, and to use any specific features including `RDD`. We unify certain things to make them easy and consistent, but we don't block anything else.\n", "\n", "## Initialization\n", "\n", "Although there is no hard rules for initializing an ExecutionEngine. The general way is to initialize from configs. We should use the config to describe each components, such as what logger to use, what SQLEngine to use and other properties.\n", "\n", "Here is the best practice to initialize each built-in ExecutionEngine.\n", "\n", "### Native Python" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import NativeExecutionEngine\n", "\n", "engine = NativeExecutionEngine({\"myconfig\":\"abc\"})\n", "assert engine.conf.get_or_throw(\"myconfig\" ,str) == \"abc\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Dask" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from distributed import Client\n", "client = Client() # without this, dask is not in distributed mode\n", "from fugue_dask import DaskExecutionEngine\n", "\n", "# fugue.dask.dataframe.default.partitions determines the default partitions for a new DaskDataFrame\n", "engine = DaskExecutionEngine({\"fugue.dask.dataframe.default.partitions\":4})\n", "assert engine.conf.get_or_throw(\"fugue.dask.dataframe.default.partitions\" ,int) == 4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Spark" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from fugue_spark import SparkExecutionEngine\n", "\n", "# here is the place you get a spark session, it's the same as if there is no Fugue\n", "# notice that you can configure almost everything in this way, even running mode, such as local mode or other mode\n", "# the best way based on my experience is to only use this way + spark-defaults.conf to initialize SparkSessions.\n", "spark_session = (SparkSession\n", " .builder\n", " .config(\"spark.executor.cores\",4)\n", " .config(\"fugue.dummy\",\"dummy\")\n", " .getOrCreate())\n", "\n", "engine = SparkExecutionEngine(spark_session, {\"additional_conf\":\"abc\"})\n", "assert engine.conf.get_or_throw(\"spark.executor.cores\" ,int) == 4\n", "assert engine.conf.get_or_throw(\"fugue.dummy\" ,str) == \"dummy\"\n", "assert engine.conf.get_or_throw(\"additional_conf\" ,str) == \"abc\"\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A special feature of Fugue is that, the `engine.conf` is also accessible on workers (different machines) within all types of Fugue exetensions. The engine itself is only accessible on driver or dirver side extensions.\n", "\n", "\n", "## Create DataFrame\n", "\n", "With ExecutionEngine, you only need to tell the system I need to create a DataFrame with raw data or dataframes. And with different ExecutionEngines, different types of DataFrames will be created. `to_df` is the common interface for ExecutionEngines, use it to create DataFrames." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import NativeExecutionEngine, ArrayDataFrame\n", "from fugue_spark import SparkExecutionEngine\n", "\n", "engine1 = NativeExecutionEngine()\n", "engine2 = SparkExecutionEngine() # if spark_session is not provided, it will get the current active session\n", "\n", "df1 = engine1.to_df([[0]],\"a:int\")\n", "df2 = engine2.to_df([[0]],\"a:int\")\n", "df3 = engine1.to_df(df2) # see, NativeExecutionEngine can also take SparkDataFrame and convert to a local dataframe\n", "\n", "print(type(df1))\n", "print(type(df2))\n", "assert df1.as_array() == df2.as_array() # both materialized on driver, and compare\n", "assert df1.as_array() == df3.as_array()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For every general methods that ExecutionEngine supports, it will take arbitrary `DataFrame` object and will call `to_df` to make sure it becomes an engine compatible dataframe. So you may not need to call `to_df` directly\n", "\n", "## Repartition\n", "\n", "This is to partition a DataFrame with PartitionSpec (read [this](./partition.ipynb)). Normally it's for your next operation. Calling it directly may or may not have effect." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_spark import SparkExecutionEngine\n", "from fugue import ArrayDataFrame, PartitionSpec\n", "\n", "engine = SparkExecutionEngine()\n", "df = engine.repartition(ArrayDataFrame([[0],[1]],\"a:int\"), PartitionSpec(by=[\"a\"]))\n", "df.show()\n", "# as you can see df becomes a SparkDataFrame because of the engine, but the repartition does not have any effect" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map\n", "\n", "Before reading it, read the [Partition tutorial](./partition.ipynb) first. Map in Fugue is on [**logical** partition level](./partition.ipynb#Physical-vs-Logical-Partitions). You can pass in an `on_init` function which will be called when starting a **physical** partition. (the following example will not cover `on_init`)\n", "\n", "It's not important to understand how to use it, because there is a better [programming interface](dag.ipynb#Transformer) on top of it. It's important to understand the meaning of this method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_spark import SparkExecutionEngine\n", "from fugue import ArrayDataFrame, PandasDataFrame, PartitionSpec\n", "import pandas as pd\n", "import numpy as np\n", "\n", "def map_func(cursor, df):\n", " key = cursor.key_value_dict.__repr__()\n", " sub = df.as_array().__repr__()\n", " return ArrayDataFrame([[key,sub]],\"key:str,data:str\")\n", "\n", "df = PandasDataFrame(pd.DataFrame(np.random.randint(0,4,size=(20, 3)), columns=list('abc')))\n", "engine = SparkExecutionEngine()\n", "\n", "engine.map(\n", " df, \n", " map_func,\n", " output_schema=\"key:str,data:str\",\n", " partition_spec=PartitionSpec(by=[\"a\"])).show()\n", "\n", "engine.map(\n", " df, \n", " map_func,\n", " output_schema=\"key:str,data:str\",\n", " partition_spec=PartitionSpec(by=[\"a\",\"b\"],presort=\"c DESC\")).show(rows=100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Persist & Broadcast\n", "\n", "Similar to Spark, Fugue is lazy, so `persist` is a very important operation to control the execution plan. Notice that different from Spark, when calling persist in Fugue, it will materialize the dataframe immediately. Broadcast is also similar to Spark, read [this](https://forums.databricks.com/questions/18849/broadcast-dataframe-once-and-use-it-many-a-times-s.html).\n", "\n", "Both `persist` and `broadcast` are on Fugue dataframes, for one dataframe, if you call persist or broadcast multiple times, only the first calls will take effect.\n", "\n", "If you want auto broadcast, it's supported on Spark, read [this](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options), so you can configure the Fugue SparkExecutionEngine with these parameters.\n", "\n", "If you want auto persist, it's supported on Fugue, read [this](useful_config.ipynb#Auto-Persist)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_spark import SparkExecutionEngine\n", "from fugue import ArrayDataFrame, PandasDataFrame, PartitionSpec\n", "from time import sleep\n", "import timeit\n", "\n", "def map_func(cursor, df):\n", " sleep(5)\n", " return df\n", "\n", "engine = SparkExecutionEngine()\n", "\n", "def run(persist):\n", " df = engine.map(\n", " ArrayDataFrame([[0]],\"a:int\"),\n", " map_func,\n", " output_schema=\"a:int\",\n", " partition_spec=PartitionSpec())\n", " if persist:\n", " df=engine.persist(df)\n", " df.as_array()\n", " df.as_array() # without persist, this will trigger map to run again\n", "\n", "print(timeit.timeit(lambda: run(False), number=3)) # 30 sec + overhead\n", "print(timeit.timeit(lambda: run(True), number=3)) # 15 sec + overhead\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Join\n", "\n", "Join is a very common operation for big data problems, it is also highly optimized by all computing frameworks. So if you can convert some task to join operation, it normally will be faster and also scale agnostic.\n", "\n", "Currently, these join types are supported: `CROSS`, `LEFT SEMI`, `LEFT ANTI`, `INNER`, `LEFT OUTER`, `RIGHT OUTER`, `FULL OUTER`.\n", "\n", "If you directly use SQL to interact with the underlying frameworks, you may be able to use more join types." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import NativeExecutionEngine, ArrayDataFrame\n", "\n", "df1 = ArrayDataFrame([[\"k1\",\"a\"],[\"k2\",\"b\"]],\"a:str,b:str\")\n", "df2 = ArrayDataFrame([[\"k2\",\"c\"],[\"k3\",\"d\"]],\"a:str,c:str\")\n", "df3 = ArrayDataFrame([[\"k4\",\"c\"],[\"k5\",\"d\"]],\"d:str,e:str\")\n", "\n", "engine = NativeExecutionEngine()\n", "\n", "# in joins, except for `on` columns, all other columns must be different in df1 and df2\n", "engine.join(df1,df2,how=\"semi\",on=[\"a\"]).show(title=\"LEFT SEMI\")\n", "engine.join(df1,df2,how=\"anti\",on=[\"a\"]).show(title=\"LEFT ANTI\")\n", "engine.join(df1,df2,how=\"inner\",on=[\"a\"]).show(title=\"INNER\")\n", "engine.join(df1,df2,how=\"left_outer\",on=[\"a\"]).show(title=\"LEFT OUTER\")\n", "engine.join(df1,df2,how=\"right_outer\",on=[\"a\"]).show(title=\"RIGHT OUTER\")\n", "engine.join(df1,df2,how=\"full_outer\",on=[\"a\"]).show(title=\"FULL OUTER\")\n", "\n", "# `on` parameter can be inferred, so you don't really need to specify that\n", "engine.join(df1,df2,how=\"semi\").show(title=\"LEFT SEMI\")\n", "engine.join(df1,df2,how=\"anti\").show(title=\"LEFT ANTI\")\n", "engine.join(df1,df2,how=\"inner\").show(title=\"INNER\")\n", "engine.join(df1,df2,how=\"left_outer\").show(title=\"LEFT OUTER\")\n", "engine.join(df1,df2,how=\"right_outer\").show(title=\"RIGHT OUTER\")\n", "engine.join(df1,df2,how=\"full_outer\").show(title=\"FULL OUTER\")\n", "engine.join(df2,df3,how=\"cross\").show(title=\"CROSS\")\n", "\n", "# but what if we want to join two dfs with common columns that is not a join key?\n", "df1_ = df1.rename({\"b\":\"c\"}) # rename these columns before join\n", "engine.join(df1, df1_, how=\"inner\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Zip & Comap\n", "\n", "Sometimes, you need to partition multiple dataframes in the same way (on the same keys), and then on each logical partition, you process them together as a collection. This will require you to `zip` multiple dataframes partitioned in the same way, and then process them using `comap`\n", "\n", "Read through the following code and run them to see the results. It's not important to learn how to use them directly because there will be [programming interface](dag.ipynb#CoTransformer) on top of it. It's important to see the results and to understand the basic ideas of zip and comap." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import NativeExecutionEngine, ArrayDataFrame, PartitionSpec\n", "\n", "df1 = ArrayDataFrame([[\"k1\",\"a\"],[\"k1\",\"b\"],[\"k2\",\"b\"],[\"k2\",\"c\"]],\"a:str,b:str\")\n", "df2 = ArrayDataFrame([[\"k2\",\"c\"],[\"k2\",\"d\"],[\"k3\",\"d\"],[\"k3\",\"e\"]],\"a:str,b:str\")\n", "df3 = ArrayDataFrame([[\"k4\",\"c\"]],\"d:str,e:str\")\n", "\n", "engine = NativeExecutionEngine()\n", "# different from engine.join, engine.zip can accept two dataframes with common columns that are not partition keys\n", "# so it's good practice to be explicit about the the partition spec\n", "print(engine.zip(df1,df2, how=\"inner\", partition_spec = PartitionSpec(by=[\"a\"])).schema)\n", "# if you let zip to infer, it will find the common schemas as the keys\n", "print(engine.zip(df1,df2, how=\"inner\").schema) # notice the difference?\n", "print(engine.zip(df1,df2, how=\"left_outer\", partition_spec = PartitionSpec(by=[\"a\"])).schema)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# dfs is of type DataFames, it always contains all zipped dataframes in order, \n", "# in an outer join some dataframe can be empty instead if None\n", "def map_func(cursor, dfs):\n", " key = cursor.key_value_dict.__repr__()\n", " value = [x.as_array().__repr__() for x in dfs.values()]\n", " return ArrayDataFrame([[key,value]],\"key:str,value:[str]\")\n", "\n", "\n", "# comap must operate on zipped dataframe, or error will be thrown\n", "\n", "df = engine.zip(df1,df2, how=\"left_outer\", partition_spec = PartitionSpec(by=[\"a\"]))\n", "# for this left outer join case, df2 has no key k1, so in the map_func, it will receive an empty dataframe as df2 on the key k1\n", "engine.comap(df, map_func, output_schema = \"key:str,value:[str]\", partition_spec = PartitionSpec()).show()\n", "\n", "df = engine.zip(df1,df2, how=\"inner\")\n", "engine.comap(df, map_func, output_schema = \"key:str,value:[str]\", partition_spec = PartitionSpec()).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load & Save\n", "\n", "Load and save are also in common interface for ExecutionEngines. Across all ExecutionEngines, `parquet`, `csv` and `json` are supported.\n", "\n", "Fugue depends on [PyFilesystem2](https://docs.pyfilesystem.org/en/latest/index.html). For details, read [this](https://triad.readthedocs.io/en/latest/api/triad.collections.html#triad.collections.fs.FileSystem)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue_spark import SparkExecutionEngine\n", "from fugue import ArrayDataFrame\n", "from triad.collections.fs import FileSystem\n", "\n", "df1 = ArrayDataFrame([[\"1\",\"a\"],[\"1\",\"b\"],[\"2\",\"b\"],[\"2\",\"c\"]],\"a:str,b:str\")\n", "\n", "fs = FileSystem()\n", "\n", "# simplest examples\n", "engine = SparkExecutionEngine()\n", "engine.save_df(df1,\"/tmp/t1.parquet\")\n", "print(fs.listdir(\"/tmp/t1.parquet\"))\n", "engine.load_df(\"/tmp/t1.parquet\").show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# you can use format hint to specify file types\n", "engine.save_df(df1,\"/tmp/t1\", format_hint=\"parquet\")\n", "print(fs.listdir(\"/tmp/t1\"))\n", "engine.load_df(\"/tmp/t1\", format_hint=\"parquet\").show()\n", "# you can load just certain columns, this can be faster for certain execution engine and certain file types\n", "engine.load_df(\"/tmp/t1\", format_hint=\"parquet\", columns=[\"a\"]).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# by default, the save mode is overwrite, you can also use append and error (throw if the file exists)\n", "engine.load_df(\"/tmp/t1.parquet\").show()\n", "engine.save_df(df1,\"/tmp/t1.parquet\", mode=\"append\")\n", "engine.load_df(\"/tmp/t1.parquet\").show()\n", "# engine.save_df(df1,\"/tmp/t1.parquet\", mode=\"error\") # this will throw exception" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# distributed engines can save files as a folder, but sometimes you need a single file, here is how\n", "fs.removetree(\"/tmp/t1.parquet\")\n", "engine.save_df(df1,\"/tmp/t1.parquet\", force_single = True)\n", "print(fs.isfile(\"/tmp/t1.parquet\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# here is how you work with csv\n", "engine.save_df(df1,\"/tmp/t1.csv\", header=True)\n", "engine.load_df(\"/tmp/t1.csv\", header=True).show()\n", "# you can use columns to define the column types. The column names has to match the header\n", "engine.load_df(\"/tmp/t1.csv\", header=True, columns=\"a:int,b:str\").show()\n", "\n", "engine.save_df(df1, \"/tmp/t1.csv\") # by default header will not be saved\n", "# you can use columns to define the column types. The column names has to match the header\n", "engine.load_df(\"/tmp/t1.csv\", columns=\"a:int,b:str\").show() # if header is not saved, you must specify the schema" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SQLEngine\n", "\n", "`SQLEngine` is based on `ExecutionEngine`, but is separated from `ExecutionEngine`. You can write your own `SQLEngine` to work with different `ExecutionEngines`. \n", "\n", "For most users, you only need to know that you can switch `SQLEngine` when you have chosen a certain `ExecutionEngine`. A real use case is that if you have a `PrestoSQLEngine` to send queries to presto and get dataframe back. you can use it as well as the built in `SparkSQLEngine` to get data from mutliple sources and process in `SparkExecutionEngine`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fugue import SqliteEngine # default engine for NativeExecutionEngine and DaskExecutionEngine\n", "from fugue_spark.execution_engine import SparkSQLEngine # default engine for SparkExecutionEngine\n", "from fugue import ArrayDataFrame, DataFrames, NativeExecutionEngine\n", "from fugue_spark import SparkExecutionEngine\n", "\n", "df1 = ArrayDataFrame([[0,1],[1,2]],\"a:long,b:long\")\n", "df2 = ArrayDataFrame([[1,1],[2,2]],\"a:long,c:long\")\n", "dfs = DataFrames(x=df1,y=df2)\n", "\n", "engine = NativeExecutionEngine()\n", "sql_engine = SqliteEngine(engine)\n", "sql_engine.select(dfs,\"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a\").show()\n", "\n", "engine = SparkExecutionEngine()\n", "sql_engine = SparkSQLEngine(engine)\n", "sql_engine.select(dfs,\"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a\").show()\n", "# spark execution engine can also use SqliteEngine, it's just to show you can switch SQLEngines\n", "sql_engine = SqliteEngine(engine) \n", "sql_engine.select(dfs,\"SELECT x.*,c FROM x INNER JOIN y ON x.a=y.a\").show()\n" ] } ], "metadata": { "interpreter": { "hash": "4cd7ab41f5fca4b9b44701077e38c5ffd31fe66a6cab21e0214b68d958d0e462" }, "kernelspec": { "display_name": "Python 3.7.9 64-bit", "name": "python3" }, "language_info": { "name": "python", "version": "" } }, "nbformat": 4, "nbformat_minor": 4 }