{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Writing data/solar_events.csv\n" ] } ], "source": [ "%%writefile data/solar_events.csv\n", "timestamp,event\n", "2021-03-20 09:37:00,March Equinox\n", "2021-06-21 03:32:00,June Solstice\n", "2021-09-22 19:21:00,September Equinox\n", "2021-12-21 15:59:00,December Solstice" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Interactive Beam" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Pandas has the pandas.read_csv function to easily read CSV files into DataFrames. Beam has the beam.dataframe.io.read_csv function that emulates pandas.read_csv, but returns a deferred Beam DataFrame.\n", "\n", "If you’re using Interactive Beam, you can use collect to bring a Beam DataFrame into local memory as a Pandas DataFrame." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", " \n", "
\n", "
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "data": { "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_2067c8707aef665e25e10b48a3d89199\").remove();\n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n $(\"#progress_indicator_2067c8707aef665e25e10b48a3d89199\").remove();\n });\n }" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
timestampevent
02021-03-20 09:37:00March Equinox
12021-06-21 03:32:00June Solstice
22021-09-22 19:21:00September Equinox
32021-12-21 15:59:00December Solstice
\n", "
" ], "text/plain": [ " timestamp event\n", "0 2021-03-20 09:37:00 March Equinox\n", "1 2021-06-21 03:32:00 June Solstice\n", "2 2021-09-22 19:21:00 September Equinox\n", "3 2021-12-21 15:59:00 December Solstice" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import apache_beam as beam\n", "import apache_beam.runners.interactive.interactive_beam as ib\n", "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", "\n", "pipeline = beam.Pipeline(InteractiveRunner())\n", "\n", "# Create a deferred Beam DataFrame with the contents of our csv file.\n", "beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('data/solar_events.csv')\n", "\n", "# We can use `ib.collect` to view the contents of a Beam DataFrame.\n", "ib.collect(beam_df)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Beam DataFrames to PCollections\n", "\n", "If you have your data as a Beam DataFrame, you can convert it into a regular PCollection with to_pcollection.\n", "\n", "Converting a Beam DataFrame in this way yields a PCollection with a schema. This allows us to easily access each property by attribute, for example element.event and element.timestamp.\n", "\n", "Sometimes it's more convenient to convert the named tuples to Python dictionaries. We can do that with the _asdict method." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "application/javascript": "\n if (typeof window.interactive_beam_jquery == 'undefined') {\n var jqueryScript = document.createElement('script');\n jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n jqueryScript.type = 'text/javascript';\n jqueryScript.onload = function() {\n var datatableScript = document.createElement('script');\n datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n datatableScript.type = 'text/javascript';\n datatableScript.onload = function() {\n window.interactive_beam_jquery = jQuery.noConflict(true);\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }\n document.head.appendChild(datatableScript);\n };\n document.head.appendChild(jqueryScript);\n } else {\n window.interactive_beam_jquery(document).ready(function($){\n \n });\n }" }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" ] } ], "source": [ "import apache_beam as beam\n", "from apache_beam.dataframe import convert\n", "\n", "with beam.Pipeline() as pipeline:\n", " beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('data/solar_events.csv')\n", "\n", " (\n", " # Convert the Beam DataFrame to a PCollection.\n", " convert.to_pcollection(beam_df)\n", "\n", " # We get named tuples, we can convert them to dictionaries like this.\n", " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", "\n", " # Print the elements in the PCollection.\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Pandas DataFrames to PCollections\n", "\n", "If you have your data as a Pandas DataFrame, you can convert it into a regular PCollection with to_pcollection.\n", "\n", "Since Pandas DataFrames are not part of any Beam pipeline, we must provide the pipeline explicitly." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" ] } ], "source": [ "import pandas as pd\n", "import apache_beam as beam\n", "from apache_beam.dataframe import convert\n", "\n", "with beam.Pipeline() as pipeline:\n", " df = pd.read_csv('data/solar_events.csv')\n", "\n", " (\n", " # Convert the Pandas DataFrame to a PCollection.\n", " convert.to_pcollection(df, pipeline=pipeline)\n", "\n", " # We get named tuples, we can convert them to dictionaries like this.\n", " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", "\n", " # Print the elements in the PCollection.\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "If you have your data as a PCollection of Pandas DataFrames, you can convert them into a PCollection with FlatMap.\n", "\n", "ℹ️ If the number of elements in each DataFrame can be very different (that is, some DataFrames might contain thousands of elements while others contain only a handful of elements), it might be a good idea to Reshuffle. This basically rebalances the elements in the PCollection, which helps make sure all the workers have a balanced number of elements." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" ] } ], "source": [ "import pandas as pd\n", "import apache_beam as beam\n", "\n", "with beam.Pipeline() as pipeline:\n", " (\n", " pipeline\n", " | 'Filename' >> beam.Create(['data/solar_events.csv'])\n", "\n", " # Each element is a Pandas DataFrame, so we can do any Pandas operation.\n", " | 'Read CSV' >> beam.Map(pd.read_csv)\n", "\n", " # We yield each element of all the DataFrames into a PCollection of dictionaries.\n", " | 'To dictionaries' >> beam.FlatMap(lambda df: df.to_dict('records'))\n", "\n", " # Reshuffle to make sure parallelization is balanced.\n", " | 'Reshuffle' >> beam.Reshuffle()\n", "\n", " # Print the elements in the PCollection.\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## PCollections to Beam DataFrames\n", "\n", "If you have your data as a PCollection, you can convert it into a deferred Beam DataFrame with to_dataframe.\n", "\n", "ℹ️ To convert a PCollection to a Beam DataFrame, each element must have a schema." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'}\n", "{'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'}\n", "{'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'}\n", "{'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}\n" ] } ], "source": [ "import csv\n", "import apache_beam as beam\n", "from apache_beam.dataframe import convert\n", "\n", "with open('data/solar_events.csv') as f:\n", " solar_events = [dict(row) for row in csv.DictReader(f)]\n", "\n", "with beam.Pipeline() as pipeline:\n", " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n", "\n", " # Convert the PCollection into a Beam DataFrame\n", " beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(\n", " lambda x: beam.Row(\n", " timestamp=x['timestamp'],\n", " event=x['event'],\n", " )\n", " ))\n", "\n", " # Print the elements in the Beam DataFrame.\n", " (\n", " convert.to_pcollection(beam_df)\n", " | 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "PCollections to Pandas DataFrames\n", "If you have your data as a PCollection, you can convert it into an in-memory Pandas DataFrame via a side input.\n", "\n", "ℹ️ It's recommended to only do this if you need to use a Pandas operation that is not supported in Beam DataFrames. Converting a PCollection into a Pandas DataFrame consolidates elements from potentially multiple workers into a single worker, which could create a performance bottleneck.\n", "\n", "⚠️ Pandas DataFrames are in-memory data structures, so make sure all the elements in the PCollection fit into memory. If they don't fit into memory, consider yielding multiple DataFrame elements via FlatMap." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " timestamp event\n", "0 2021-03-20 09:37:00 March Equinox\n", "1 2021-06-21 03:32:00 June Solstice\n", "2 2021-09-22 19:21:00 September Equinox\n", "3 2021-12-21 15:59:00 December Solstice\n" ] } ], "source": [ "import csv\n", "import pandas as pd\n", "import apache_beam as beam\n", "\n", "with open('data/solar_events.csv') as f:\n", " solar_events = [dict(row) for row in csv.DictReader(f)]\n", "\n", "with beam.Pipeline() as pipeline:\n", " pcoll = pipeline | 'Create data' >> beam.Create(solar_events)\n", "\n", " (\n", " pipeline\n", "\n", " # Create a single element containing the entire PCollection. \n", " | 'Singleton' >> beam.Create([None])\n", " | 'As Pandas' >> beam.Map(\n", " lambda _, dict_iter: pd.DataFrame(dict_iter),\n", " dict_iter=beam.pvalue.AsIter(pcoll),\n", " )\n", "\n", " # Print the Pandas DataFrame.\n", " | 'Print' >> beam.Map(print)\n", " )" ] } ], "metadata": { "kernelspec": { "display_name": "env-spacy", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "343191058819caea96d5cde1bd3b1a75b4807623ce2cda0e1c8499e39ac847e3" } } }, "nbformat": 4, "nbformat_minor": 2 }