{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fugue (Spark, Ray, Dask) Integration\n",
"\n",
"[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Fugue_Profiling.ipynb)\n",
"\n",
"\n",
"Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing **Spark, Dask or Ray** clusters or existing **Databricks, Coiled or Anyscale** platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃\n",
"\n",
"If you wish to have other insights on how to use whylogs, feel free to check our [other existing examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples), as they might be extremely useful!\n",
"\n",
"For detailed questions regarding [Fugue](https://github.com/fugue-project/fugue), please join Fugue's Slack channel: [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Installing the extra dependency\n",
"\n",
"As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, install according to the following table:\n",
"\n",
"| Run Whylogs on ... | Installation Command |\n",
"|:---|:---|\n",
"| Any Spark cluster (including Databricks Notebooks) | `pip install 'whylogs[fugue]' 'fugue[spark]'` |\n",
"| Databricks (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'` |\n",
"| Any Ray cluster (including Anyscale Notebooks) | `pip install 'whylogs[fugue]' 'fugue[ray]'` |\n",
"| Anyscale (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'` |\n",
"| Any Dask cluster | `pip install 'whylogs[fugue]' 'fugue[dask]'` |\n",
"| Coiled | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'` |\n",
"\n",
"For example, in this notebook we are using a local Spark cluster, so we should:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Note: you may need to restart the kernel to use updated packages.\n",
"%pip install 'whylogs[fugue]' 'fugue[spark]'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following environment variable should NOT need to be set in your own environment."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"os.environ[\"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION\"] = \"python\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Constructing a dataset"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" b | \n",
" c | \n",
" d | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" a | \n",
" 0.533206 | \n",
" xy | \n",
"
\n",
" \n",
" 1 | \n",
" 2 | \n",
" b | \n",
" 0.230533 | \n",
" z | \n",
"
\n",
" \n",
" 2 | \n",
" 1 | \n",
" a | \n",
" 0.394869 | \n",
" z | \n",
"
\n",
" \n",
" 3 | \n",
" 2 | \n",
" b | \n",
" 0.618809 | \n",
" z | \n",
"
\n",
" \n",
" 4 | \n",
" 2 | \n",
" b | \n",
" 0.474868 | \n",
" xy | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 95 | \n",
" 1 | \n",
" b | \n",
" 0.904425 | \n",
" xy | \n",
"
\n",
" \n",
" 96 | \n",
" 3 | \n",
" a | \n",
" 0.645785 | \n",
" z | \n",
"
\n",
" \n",
" 97 | \n",
" 1 | \n",
" a | \n",
" 0.324683 | \n",
" xy | \n",
"
\n",
" \n",
" 98 | \n",
" 2 | \n",
" b | \n",
" 0.519711 | \n",
" z | \n",
"
\n",
" \n",
" 99 | \n",
" 3 | \n",
" a | \n",
" 0.000055 | \n",
" z | \n",
"
\n",
" \n",
"
\n",
"
100 rows × 4 columns
\n",
"
"
],
"text/plain": [
" a b c d\n",
"0 1 a 0.533206 xy\n",
"1 2 b 0.230533 z\n",
"2 1 a 0.394869 z\n",
"3 2 b 0.618809 z\n",
"4 2 b 0.474868 xy\n",
".. .. .. ... ..\n",
"95 1 b 0.904425 xy\n",
"96 3 a 0.645785 z\n",
"97 1 a 0.324683 xy\n",
"98 2 b 0.519711 z\n",
"99 3 a 0.000055 z\n",
"\n",
"[100 rows x 4 columns]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"n = 100\n",
"np.random.seed(0)\n",
"tdf = pd.DataFrame(\n",
" dict(\n",
" a=np.random.choice([1, 2, 3], n),\n",
" b=np.random.choice([\"a\", \"b\"], n),\n",
" c=np.random.random(n),\n",
" d=np.random.choice([\"xy\", \"z\"], n),\n",
" )\n",
")\n",
"tdf.to_parquet(\"/tmp/test.parquet\")\n",
"tdf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Profiling using Whylogs + Fugue\n",
"\n",
"The simplest way to use `profile` is equivalent to use `why.log(df).view()`"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" cardinality/est | \n",
" cardinality/lower_1 | \n",
" cardinality/upper_1 | \n",
" counts/inf | \n",
" counts/n | \n",
" counts/nan | \n",
" counts/null | \n",
" distribution/max | \n",
" distribution/mean | \n",
" distribution/median | \n",
" ... | \n",
" distribution/stddev | \n",
" frequent_items/frequent_strings | \n",
" ints/max | \n",
" ints/min | \n",
" type | \n",
" types/boolean | \n",
" types/fractional | \n",
" types/integral | \n",
" types/object | \n",
" types/string | \n",
"
\n",
" \n",
" column | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" a | \n",
" 3.000000 | \n",
" 3.0 | \n",
" 3.000150 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" 3.000000 | \n",
" 1.880000 | \n",
" 2.000000 | \n",
" ... | \n",
" 0.807540 | \n",
" [FrequentItem(value='1', est=39, upper=39, low... | \n",
" 3.0 | \n",
" 1.0 | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" b | \n",
" 2.000000 | \n",
" 2.0 | \n",
" 2.000100 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" NaN | \n",
" 0.000000 | \n",
" NaN | \n",
" ... | \n",
" 0.000000 | \n",
" [FrequentItem(value='a', est=57, upper=57, low... | \n",
" NaN | \n",
" NaN | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 100 | \n",
"
\n",
" \n",
" c | \n",
" 100.000025 | \n",
" 100.0 | \n",
" 100.005018 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" 0.992396 | \n",
" 0.499929 | \n",
" 0.487838 | \n",
" ... | \n",
" 0.294085 | \n",
" NaN | \n",
" NaN | \n",
" NaN | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
"
\n",
" \n",
" d | \n",
" 2.000000 | \n",
" 2.0 | \n",
" 2.000100 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" NaN | \n",
" 0.000000 | \n",
" NaN | \n",
" ... | \n",
" 0.000000 | \n",
" [FrequentItem(value='xy', est=53, upper=53, lo... | \n",
" NaN | \n",
" NaN | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 100 | \n",
"
\n",
" \n",
"
\n",
"
4 rows × 30 columns
\n",
"
"
],
"text/plain": [
" cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf \\\n",
"column \n",
"a 3.000000 3.0 3.000150 0 \n",
"b 2.000000 2.0 2.000100 0 \n",
"c 100.000025 100.0 100.005018 0 \n",
"d 2.000000 2.0 2.000100 0 \n",
"\n",
" counts/n counts/nan counts/null distribution/max \\\n",
"column \n",
"a 100 0 0 3.000000 \n",
"b 100 0 0 NaN \n",
"c 100 0 0 0.992396 \n",
"d 100 0 0 NaN \n",
"\n",
" distribution/mean distribution/median ... distribution/stddev \\\n",
"column ... \n",
"a 1.880000 2.000000 ... 0.807540 \n",
"b 0.000000 NaN ... 0.000000 \n",
"c 0.499929 0.487838 ... 0.294085 \n",
"d 0.000000 NaN ... 0.000000 \n",
"\n",
" frequent_items/frequent_strings ints/max ints/min \\\n",
"column \n",
"a [FrequentItem(value='1', est=39, upper=39, low... 3.0 1.0 \n",
"b [FrequentItem(value='a', est=57, upper=57, low... NaN NaN \n",
"c NaN NaN NaN \n",
"d [FrequentItem(value='xy', est=53, upper=53, lo... NaN NaN \n",
"\n",
" type types/boolean types/fractional types/integral \\\n",
"column \n",
"a SummaryType.COLUMN 0 0 100 \n",
"b SummaryType.COLUMN 0 0 0 \n",
"c SummaryType.COLUMN 0 100 0 \n",
"d SummaryType.COLUMN 0 0 0 \n",
"\n",
" types/object types/string \n",
"column \n",
"a 0 0 \n",
"b 0 100 \n",
"c 0 0 \n",
"d 0 100 \n",
"\n",
"[4 rows x 30 columns]"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from whylogs.api.fugue import fugue_profile\n",
"\n",
"fugue_profile(tdf).to_pandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can select the columns for profiling"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" cardinality/est | \n",
" cardinality/lower_1 | \n",
" cardinality/upper_1 | \n",
" counts/inf | \n",
" counts/n | \n",
" counts/nan | \n",
" counts/null | \n",
" distribution/max | \n",
" distribution/mean | \n",
" distribution/median | \n",
" ... | \n",
" distribution/q_95 | \n",
" distribution/q_99 | \n",
" distribution/stddev | \n",
" type | \n",
" types/boolean | \n",
" types/fractional | \n",
" types/integral | \n",
" types/object | \n",
" types/string | \n",
" frequent_items/frequent_strings | \n",
"
\n",
" \n",
" column | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" c | \n",
" 100.000025 | \n",
" 100.0 | \n",
" 100.005018 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" 0.992396 | \n",
" 0.499929 | \n",
" 0.487838 | \n",
" ... | \n",
" 0.970237 | \n",
" 0.992396 | \n",
" 0.294085 | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" NaN | \n",
"
\n",
" \n",
" d | \n",
" 2.000000 | \n",
" 2.0 | \n",
" 2.000100 | \n",
" 0 | \n",
" 100 | \n",
" 0 | \n",
" 0 | \n",
" NaN | \n",
" 0.000000 | \n",
" NaN | \n",
" ... | \n",
" NaN | \n",
" NaN | \n",
" 0.000000 | \n",
" SummaryType.COLUMN | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 0 | \n",
" 100 | \n",
" [FrequentItem(value='xy', est=53, upper=53, lo... | \n",
"
\n",
" \n",
"
\n",
"
2 rows × 28 columns
\n",
"
"
],
"text/plain": [
" cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf \\\n",
"column \n",
"c 100.000025 100.0 100.005018 0 \n",
"d 2.000000 2.0 2.000100 0 \n",
"\n",
" counts/n counts/nan counts/null distribution/max \\\n",
"column \n",
"c 100 0 0 0.992396 \n",
"d 100 0 0 NaN \n",
"\n",
" distribution/mean distribution/median ... distribution/q_95 \\\n",
"column ... \n",
"c 0.499929 0.487838 ... 0.970237 \n",
"d 0.000000 NaN ... NaN \n",
"\n",
" distribution/q_99 distribution/stddev type \\\n",
"column \n",
"c 0.992396 0.294085 SummaryType.COLUMN \n",
"d NaN 0.000000 SummaryType.COLUMN \n",
"\n",
" types/boolean types/fractional types/integral types/object \\\n",
"column \n",
"c 0 100 0 0 \n",
"d 0 0 0 0 \n",
"\n",
" types/string frequent_items/frequent_strings \n",
"column \n",
"c 0 NaN \n",
"d 100 [FrequentItem(value='xy', est=53, upper=53, lo... \n",
"\n",
"[2 rows x 28 columns]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fugue_profile(tdf, profile_cols=[\"c\",\"d\"]).to_pandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.getOrCreate()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If we want to profile the pandas df on Spark:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/plain": [
""
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fugue_profile(tdf, engine=spark)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If we want to profile a SparkDataFrame:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark_df = spark.createDataFrame(tdf)\n",
"fugue_profile(spark_df)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fugue_profile(\"/tmp/test.parquet\", engine=spark)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections.\n",
"\n",
"## Profiling on logical partitions\n",
"\n",
"If we want to profile `tdf` grouped by columns `a` and `b`"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" b | \n",
" __whylogs_df_profile_view | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb3\\x93\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc6\\x93\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 2 | \n",
" 2 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xd6\\x93\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 3 | \n",
" 2 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xe5\\x93\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 4 | \n",
" 3 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf3\\x93\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 5 | \n",
" 3 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x82\\x94\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a b __whylogs_df_profile_view\n",
"0 1 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb3\\x93\\x93\\x80\\xda0...\n",
"1 1 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc6\\x93\\x93\\x80\\xda0...\n",
"2 2 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xd6\\x93\\x93\\x80\\xda0...\n",
"3 2 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xe5\\x93\\x93\\x80\\xda0...\n",
"4 3 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf3\\x93\\x93\\x80\\xda0...\n",
"5 3 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x82\\x94\\x93\\x80\\xda0..."
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fugue_profile(tdf, partition={\"by\":[\"a\",\"b\"]})"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also control the output profile field:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" a | \n",
" b | \n",
" x | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 1 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf7\\xb1\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 1 | \n",
" 1 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x89\\xb2\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 2 | \n",
" 2 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x99\\xb2\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 3 | \n",
" 2 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xa8\\xb2\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 4 | \n",
" 3 | \n",
" a | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb7\\xb2\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
" 5 | \n",
" 3 | \n",
" b | \n",
" b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc5\\xb2\\x93\\x80\\xda0... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" a b x\n",
"0 1 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf7\\xb1\\x93\\x80\\xda0...\n",
"1 1 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x89\\xb2\\x93\\x80\\xda0...\n",
"2 2 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x99\\xb2\\x93\\x80\\xda0...\n",
"3 2 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xa8\\xb2\\x93\\x80\\xda0...\n",
"4 3 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb7\\xb2\\x93\\x80\\xda0...\n",
"5 3 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc5\\xb2\\x93\\x80\\xda0..."
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res = fugue_profile(tdf, partition={\"by\":[\"a\",\"b\"]}, profile_field=\"x\")\n",
"res"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is how to retrieve the views:"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0 "
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import whylogs.api.fugue.registry # you don't really need to import this explicitly, the registration is automatic\n",
"import fugue.api as fa\n",
"\n",
"fa.fugue_sql_flow(\"\"\"\n",
"-- visualize a single dataframe's profile\n",
"OUTPUT df USING why:viz\n",
"-- compare profiles, must set reference and target\n",
"OUTPUT target=df, reference=df USING why:viz\n",
"\"\"\", df = tdf).run();"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"If running using a distributed backend, the profling will be done by `fugue_profile`"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"fa.fugue_sql_flow(\"\"\"\n",
"df = LOAD \"/tmp/test.parquet\"\n",
"\n",
"OUTPUT USING why:viz\n",
"\n",
"OUTPUT target=df, reference=df USING why:viz\n",
"\"\"\").run(spark);"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"## Performance Tips\n",
"\n",
"### Spark\n",
"\n",
"Please use `Fugue >= 0.8.0`, which enables Pandas UDF by default\n",
"\n",
"It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow [this instruction](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) to enable `spark.sql.execution.arrow.pyspark.enabled`.\n",
"\n",
"When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"num\": 200}, engine=spark)\n",
"```\n",
"\n",
"If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster.\n",
"\n",
"When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=spark)\n",
"```\n",
"\n",
"But the convention in Spark is to set `spark.shuffle.partitions` when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs.\n",
"\n",
"### Ray\n",
"\n",
"When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"num\": 200}, engine=\"ray\")\n",
"```\n",
"\n",
"If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. **So in Ray, it is always a good idea to be explicit about `num`**\n",
"\n",
"When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=\"ray\")\n",
"```\n",
"\n",
"### Dask\n",
"\n",
"When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"num\": 200}, engine=dask_client)\n",
"```\n",
"\n",
"If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. **So in Dask, it is always a good idea to be explicit about `num`**\n",
"\n",
"When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:\n",
"\n",
"```python\n",
"fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=dask_client)\n",
"```\n",
"\n",
"**No matter in Spark, Ray or Dask, no matter which way to set `num`, setting it to 2 times of the total cluster CPUs will in general work very well.**\n",
"\n",
"## Accessing distributed platforms\n",
"\n",
"In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:\n",
"\n",
"```python\n",
"fugue_profile(\"s3://\", engine=\"db:\")\n",
"fugue_profile(\"s3://\", engine=\"\")\n",
"fugue_profile(\"s3://\", engine=\"coiled:\")\n",
"```\n",
"\n",
"For details of each platform, please read the instructions for [Databricks](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/databricks.html), [Anyscale](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/anyscale.html) and [Coiled](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/coiled.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
},
"vscode": {
"interpreter": {
"hash": "5dd5901cadfd4b29c2aaf95ecd29c0c3b10829ad94dcfe59437dbee391154aea"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}