{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Fugue (Spark, Ray, Dask) Integration\n", "\n", "[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Fugue_Profiling.ipynb)\n", "\n", "\n", "Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing **Spark, Dask or Ray** clusters or existing **Databricks, Coiled or Anyscale** platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃\n", "\n", "If you wish to have other insights on how to use whylogs, feel free to check our [other existing examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples), as they might be extremely useful!\n", "\n", "For detailed questions regarding [Fugue](https://github.com/fugue-project/fugue), please join Fugue's Slack channel: [![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](http://slack.fugue.ai)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Installing the extra dependency\n", "\n", "As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, install according to the following table:\n", "\n", "| Run Whylogs on ... | Installation Command |\n", "|:---|:---|\n", "| Any Spark cluster (including Databricks Notebooks) | `pip install 'whylogs[fugue]' 'fugue[spark]'` |\n", "| Databricks (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'` |\n", "| Any Ray cluster (including Anyscale Notebooks) | `pip install 'whylogs[fugue]' 'fugue[ray]'` |\n", "| Anyscale (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'` |\n", "| Any Dask cluster | `pip install 'whylogs[fugue]' 'fugue[dask]'` |\n", "| Coiled | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'` |\n", "\n", "For example, in this notebook we are using a local Spark cluster, so we should:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Note: you may need to restart the kernel to use updated packages.\n", "%pip install 'whylogs[fugue]' 'fugue[spark]'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following environment variable should NOT need to be set in your own environment." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "os.environ[\"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION\"] = \"python\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Constructing a dataset" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abcd
01a0.533206xy
12b0.230533z
21a0.394869z
32b0.618809z
42b0.474868xy
...............
951b0.904425xy
963a0.645785z
971a0.324683xy
982b0.519711z
993a0.000055z
\n", "

100 rows × 4 columns

\n", "
" ], "text/plain": [ " a b c d\n", "0 1 a 0.533206 xy\n", "1 2 b 0.230533 z\n", "2 1 a 0.394869 z\n", "3 2 b 0.618809 z\n", "4 2 b 0.474868 xy\n", ".. .. .. ... ..\n", "95 1 b 0.904425 xy\n", "96 3 a 0.645785 z\n", "97 1 a 0.324683 xy\n", "98 2 b 0.519711 z\n", "99 3 a 0.000055 z\n", "\n", "[100 rows x 4 columns]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "import numpy as np\n", "\n", "n = 100\n", "np.random.seed(0)\n", "tdf = pd.DataFrame(\n", " dict(\n", " a=np.random.choice([1, 2, 3], n),\n", " b=np.random.choice([\"a\", \"b\"], n),\n", " c=np.random.random(n),\n", " d=np.random.choice([\"xy\", \"z\"], n),\n", " )\n", ")\n", "tdf.to_parquet(\"/tmp/test.parquet\")\n", "tdf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Profiling using Whylogs + Fugue\n", "\n", "The simplest way to use `profile` is equivalent to use `why.log(df).view()`" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
cardinality/estcardinality/lower_1cardinality/upper_1counts/infcounts/ncounts/nancounts/nulldistribution/maxdistribution/meandistribution/median...distribution/stddevfrequent_items/frequent_stringsints/maxints/mintypetypes/booleantypes/fractionaltypes/integraltypes/objecttypes/string
column
a3.0000003.03.0001500100003.0000001.8800002.000000...0.807540[FrequentItem(value='1', est=39, upper=39, low...3.01.0SummaryType.COLUMN0010000
b2.0000002.02.000100010000NaN0.000000NaN...0.000000[FrequentItem(value='a', est=57, upper=57, low...NaNNaNSummaryType.COLUMN0000100
c100.000025100.0100.0050180100000.9923960.4999290.487838...0.294085NaNNaNNaNSummaryType.COLUMN0100000
d2.0000002.02.000100010000NaN0.000000NaN...0.000000[FrequentItem(value='xy', est=53, upper=53, lo...NaNNaNSummaryType.COLUMN0000100
\n", "

4 rows × 30 columns

\n", "
" ], "text/plain": [ " cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf \\\n", "column \n", "a 3.000000 3.0 3.000150 0 \n", "b 2.000000 2.0 2.000100 0 \n", "c 100.000025 100.0 100.005018 0 \n", "d 2.000000 2.0 2.000100 0 \n", "\n", " counts/n counts/nan counts/null distribution/max \\\n", "column \n", "a 100 0 0 3.000000 \n", "b 100 0 0 NaN \n", "c 100 0 0 0.992396 \n", "d 100 0 0 NaN \n", "\n", " distribution/mean distribution/median ... distribution/stddev \\\n", "column ... \n", "a 1.880000 2.000000 ... 0.807540 \n", "b 0.000000 NaN ... 0.000000 \n", "c 0.499929 0.487838 ... 0.294085 \n", "d 0.000000 NaN ... 0.000000 \n", "\n", " frequent_items/frequent_strings ints/max ints/min \\\n", "column \n", "a [FrequentItem(value='1', est=39, upper=39, low... 3.0 1.0 \n", "b [FrequentItem(value='a', est=57, upper=57, low... NaN NaN \n", "c NaN NaN NaN \n", "d [FrequentItem(value='xy', est=53, upper=53, lo... NaN NaN \n", "\n", " type types/boolean types/fractional types/integral \\\n", "column \n", "a SummaryType.COLUMN 0 0 100 \n", "b SummaryType.COLUMN 0 0 0 \n", "c SummaryType.COLUMN 0 100 0 \n", "d SummaryType.COLUMN 0 0 0 \n", "\n", " types/object types/string \n", "column \n", "a 0 0 \n", "b 0 100 \n", "c 0 0 \n", "d 0 100 \n", "\n", "[4 rows x 30 columns]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from whylogs.api.fugue import fugue_profile\n", "\n", "fugue_profile(tdf).to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can select the columns for profiling" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
cardinality/estcardinality/lower_1cardinality/upper_1counts/infcounts/ncounts/nancounts/nulldistribution/maxdistribution/meandistribution/median...distribution/q_95distribution/q_99distribution/stddevtypetypes/booleantypes/fractionaltypes/integraltypes/objecttypes/stringfrequent_items/frequent_strings
column
c100.000025100.0100.0050180100000.9923960.4999290.487838...0.9702370.9923960.294085SummaryType.COLUMN0100000NaN
d2.0000002.02.000100010000NaN0.000000NaN...NaNNaN0.000000SummaryType.COLUMN0000100[FrequentItem(value='xy', est=53, upper=53, lo...
\n", "

2 rows × 28 columns

\n", "
" ], "text/plain": [ " cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf \\\n", "column \n", "c 100.000025 100.0 100.005018 0 \n", "d 2.000000 2.0 2.000100 0 \n", "\n", " counts/n counts/nan counts/null distribution/max \\\n", "column \n", "c 100 0 0 0.992396 \n", "d 100 0 0 NaN \n", "\n", " distribution/mean distribution/median ... distribution/q_95 \\\n", "column ... \n", "c 0.499929 0.487838 ... 0.970237 \n", "d 0.000000 NaN ... NaN \n", "\n", " distribution/q_99 distribution/stddev type \\\n", "column \n", "c 0.992396 0.294085 SummaryType.COLUMN \n", "d NaN 0.000000 SummaryType.COLUMN \n", "\n", " types/boolean types/fractional types/integral types/object \\\n", "column \n", "c 0 100 0 0 \n", "d 0 0 0 0 \n", "\n", " types/string frequent_items/frequent_strings \n", "column \n", "c 0 NaN \n", "d 100 [FrequentItem(value='xy', est=53, upper=53, lo... \n", "\n", "[2 rows x 28 columns]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fugue_profile(tdf, profile_cols=[\"c\",\"d\"]).to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now assuming we want to use Spark to profile the dataset distributedly and assuming this is how we get a SparkSession:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we want to profile the pandas df on Spark:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fugue_profile(tdf, engine=spark)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If we want to profile a SparkDataFrame:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark_df = spark.createDataFrame(tdf)\n", "fugue_profile(spark_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fugue_profile(\"/tmp/test.parquet\", engine=spark)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is very similar to profile datasets or files using other backends, there will be detailed guides in the later sections.\n", "\n", "## Profiling on logical partitions\n", "\n", "If we want to profile `tdf` grouped by columns `a` and `b`" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
ab__whylogs_df_profile_view
01ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb3\\x93\\x93\\x80\\xda0...
11bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc6\\x93\\x93\\x80\\xda0...
22ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\xd6\\x93\\x93\\x80\\xda0...
32bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\xe5\\x93\\x93\\x80\\xda0...
43ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf3\\x93\\x93\\x80\\xda0...
53bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\x82\\x94\\x93\\x80\\xda0...
\n", "
" ], "text/plain": [ " a b __whylogs_df_profile_view\n", "0 1 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb3\\x93\\x93\\x80\\xda0...\n", "1 1 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc6\\x93\\x93\\x80\\xda0...\n", "2 2 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xd6\\x93\\x93\\x80\\xda0...\n", "3 2 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xe5\\x93\\x93\\x80\\xda0...\n", "4 3 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf3\\x93\\x93\\x80\\xda0...\n", "5 3 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x82\\x94\\x93\\x80\\xda0..." ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "fugue_profile(tdf, partition={\"by\":[\"a\",\"b\"]})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also control the output profile field:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abx
01ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf7\\xb1\\x93\\x80\\xda0...
11bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\x89\\xb2\\x93\\x80\\xda0...
22ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\x99\\xb2\\x93\\x80\\xda0...
32bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\xa8\\xb2\\x93\\x80\\xda0...
43ab'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb7\\xb2\\x93\\x80\\xda0...
53bb'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc5\\xb2\\x93\\x80\\xda0...
\n", "
" ], "text/plain": [ " a b x\n", "0 1 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xf7\\xb1\\x93\\x80\\xda0...\n", "1 1 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x89\\xb2\\x93\\x80\\xda0...\n", "2 2 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\x99\\xb2\\x93\\x80\\xda0...\n", "3 2 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xa8\\xb2\\x93\\x80\\xda0...\n", "4 3 a b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xb7\\xb2\\x93\\x80\\xda0...\n", "5 3 b b'WHY1\\x00\\xf6\\x02\\n\\x0e \\xc5\\xb2\\x93\\x80\\xda0..." ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "res = fugue_profile(tdf, partition={\"by\":[\"a\",\"b\"]}, profile_field=\"x\")\n", "res" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is how to retrieve the views:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0 " ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import whylogs.api.fugue.registry # you don't really need to import this explicitly, the registration is automatic\n", "import fugue.api as fa\n", "\n", "fa.fugue_sql_flow(\"\"\"\n", "-- visualize a single dataframe's profile\n", "OUTPUT df USING why:viz\n", "-- compare profiles, must set reference and target\n", "OUTPUT target=df, reference=df USING why:viz\n", "\"\"\", df = tdf).run();" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "If running using a distributed backend, the profling will be done by `fugue_profile`" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/html": [ "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "fa.fugue_sql_flow(\"\"\"\n", "df = LOAD \"/tmp/test.parquet\"\n", "\n", "OUTPUT USING why:viz\n", "\n", "OUTPUT target=df, reference=df USING why:viz\n", "\"\"\").run(spark);" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "\n", "\n", "## Performance Tips\n", "\n", "### Spark\n", "\n", "Please use `Fugue >= 0.8.0`, which enables Pandas UDF by default\n", "\n", "It is also beneficial to enabled pandas UDF on Spark to get better performance. We need to follow [this instruction](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) to enable `spark.sql.execution.arrow.pyspark.enabled`.\n", "\n", "When we profile a dataframe without logical partition, we may control the number of partitions in order to control the parallelism:\n", "\n", "```python\n", "fugue_profile(..., partition={\"num\": 200}, engine=spark)\n", "```\n", "\n", "If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be the number of CPUs of the Spark cluster.\n", "\n", "When we profile a dataframe with logical partitions, we can also be explicit on how many physical partitions to use:\n", "\n", "```python\n", "fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=spark)\n", "```\n", "\n", "But the convention in Spark is to set `spark.shuffle.partitions` when starting the Spark cluster. And an ideal number should be 2-4 times of the total CPUs.\n", "\n", "### Ray\n", "\n", "When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:\n", "\n", "```python\n", "fugue_profile(..., partition={\"num\": 200}, engine=\"ray\")\n", "```\n", "\n", "If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be 1. **So in Ray, it is always a good idea to be explicit about `num`**\n", "\n", "When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:\n", "\n", "```python\n", "fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=\"ray\")\n", "```\n", "\n", "### Dask\n", "\n", "When we profile a dataframe without logical partition, we should control the number of partitions in order to control the parallelism:\n", "\n", "```python\n", "fugue_profile(..., partition={\"num\": 200}, engine=dask_client)\n", "```\n", "\n", "If we don't specify `num` then the default partitioning of the input dataframe will be used. If the input dataframe is a local dataframe such as pandas dataframe, the default partitioning will be a small number representing the local CPUs. **So in Dask, it is always a good idea to be explicit about `num`**\n", "\n", "When we profile a dataframe with logical partitions, we should also be explicit on how many physical partitions to use:\n", "\n", "```python\n", "fugue_profile(..., partition={\"by\":[\"a\",\"b\"], \"num\": 200}, engine=dask_client)\n", "```\n", "\n", "**No matter in Spark, Ray or Dask, no matter which way to set `num`, setting it to 2 times of the total cluster CPUs will in general work very well.**\n", "\n", "## Accessing distributed platforms\n", "\n", "In Fugue, accessing distributed platforms can be very simple. For example with proper setups, to profile a large S3 folder using Databricks, Anyscale or Coiled will be as simple as:\n", "\n", "```python\n", "fugue_profile(\"s3://\", engine=\"db:\")\n", "fugue_profile(\"s3://\", engine=\"\")\n", "fugue_profile(\"s3://\", engine=\"coiled:\")\n", "```\n", "\n", "For details of each platform, please read the instructions for [Databricks](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/databricks.html), [Anyscale](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/anyscale.html) and [Coiled](https://fugue-tutorials.readthedocs.io/tutorials/integrations/cloudproviders/coiled.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "vscode": { "interpreter": { "hash": "5dd5901cadfd4b29c2aaf95ecd29c0c3b10829ad94dcfe59437dbee391154aea" } } }, "nbformat": 4, "nbformat_minor": 2 }