{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyarrow as pa\n", "import pyarrow.parquet as pq\n", "import pyarrow.flight as flight\n", "import numpy as np\n", "import pandas as pd\n", "import time\n", "import threading\n", "\n", "from pyarrow.util import find_free_port" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Implement a Flight server in Python\n", "\n", "This server has a few goals\n", "\n", "* Clients can send (\"put\") datasets, to be kept in memory by the server\n", "* Clients can request a list of cached datasets (\"list-tables\")\n", "* Clients can request (\"get\") a cached table\n", "\n", "Note that this server is very simple and does not show some of the more sophisticated \"query planning\" capabilities of Arrow Flight, nor does it show parallel or multi-part access. My goal is to show you that\n", "\n", "* It's easy to write a Flight service in Python\n", "* The performance of Flight is **very, very good**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class DemoServer(flight.FlightServerBase):\n", " \n", " def __init__(self, location):\n", " self._cache = {}\n", " super().__init__(location)\n", " \n", " def list_actions(self, context):\n", " return [flight.ActionType('list-tables', 'List stored tables'),\n", " flight.ActionType('drop-table', 'Drop a stored table')]\n", "\n", " # -----------------------------------------------------------------\n", " # Implement actions\n", " \n", " def do_action(self, context, action):\n", " handlers = {\n", " 'list-tables': self._list_tables,\n", " 'drop-table': self._drop_table\n", " } \n", " handler = handlers.get(action.type)\n", " if not handler:\n", " raise NotImplementedError \n", " return handlers[action.type](action)\n", " \n", " def _drop_table(self, action):\n", " del self._cache[action.body]\n", " \n", " def _list_tables(self, action):\n", " return iter([flight.Result(cache_key) \n", " for cache_key in sorted(self._cache.keys())])\n", "\n", " # -----------------------------------------------------------------\n", " # Implement puts\n", " \n", " def do_put(self, context, descriptor, reader, writer):\n", " self._cache[descriptor.command] = reader.read_all()\n", " \n", " # -----------------------------------------------------------------\n", " # Implement gets\n", "\n", " def do_get(self, context, ticket):\n", " table = self._cache[ticket.ticket]\n", " return flight.RecordBatchStream(table)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some helper utilities, you can ignore this part" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start server in background, connect client" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pa.ipc.IpcWriteOptions?" ] }, { "cell_type": "code", "execution_count": 117, "metadata": {}, "outputs": [], "source": [ "port = 1337\n", "location = flight.Location.for_grpc_tcp(\"localhost\", find_free_port())\n", "location\n", "\n", "server = DemoServer(location)\n", "\n", "thread = threading.Thread(target=lambda: server.serve(), daemon=True)\n", "thread.start()\n", "\n", "class DemoClient:\n", " \n", " def __init__(self, location, options=None):\n", " self.con = flight.connect(location)\n", " self.con.wait_for_available()\n", " self.options = options\n", " \n", " # Call \"list-tables\" RPC and return results as Python list\n", " def list_tables(self):\n", " action = flight.Action('list-tables', b'')\n", " return [x.body.to_pybytes().decode('utf8') for x in self.con.do_action(action)] \n", "\n", " # Send a pyarrow.Table to the server to be cached\n", " def cache_table_in_server(self, name, table):\n", " desc = flight.FlightDescriptor.for_command(name.encode('utf8'))\n", " put_writer, put_meta_reader = self.con.do_put(desc, table.schema,\n", " options=self.options)\n", " put_writer.write(table)\n", " put_writer.close()\n", "\n", " # Request a pyarrow.Table by name\n", " def get_table(self, name):\n", " reader = self.con.do_get(flight.Ticket(name.encode('utf8')),\n", " options=self.options)\n", " return reader.read_all()\n", "\n", " def list_actions(self):\n", " return self.con.list_actions()\n", "\n", "ipc_options = pa.ipc.IpcWriteOptions(compression='zstd')\n", "options = flight.FlightCallOptions(write_options=ipc_options)\n", "client = DemoClient(location, options=options)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Ask server for supported actions" ] }, { "cell_type": "code", "execution_count": 118, "metadata": {}, "outputs": [], "source": [ "table = pa.table([pa.array([1,2,3,4,5])], names=['f0'])\n", "client.cache_table_in_server('table1', table)" ] }, { "cell_type": "code", "execution_count": 119, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['table1']" ] }, "execution_count": 119, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.list_tables()" ] }, { "cell_type": "code", "execution_count": 120, "metadata": {}, "outputs": [], "source": [ "client.cache_table_in_server('table2', table)\n", "client.cache_table_in_server('table3', table)\n", "client.cache_table_in_server('table4', table)" ] }, { "cell_type": "code", "execution_count": 121, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['table1', 'table2', 'table3', 'table4']" ] }, "execution_count": 121, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.list_tables()" ] }, { "cell_type": "code", "execution_count": 122, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "pyarrow.Table\n", "f0: int64" ] }, "execution_count": 122, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.get_table('table1')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now let's make a much bigger table and test performance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# fec = pd.read_csv('/home/wesm/code/pydata-book/datasets/fec/P00000001-ALL.csv',\n", "# low_memory=False)\n", "# table = pa.table(fec)\n", "# pq.write_table(table, 'fec-2012.parquet')" ] }, { "cell_type": "code", "execution_count": 123, "metadata": {}, "outputs": [], "source": [ "fec_table = pq.read_table('fec-2012.parquet')" ] }, { "cell_type": "code", "execution_count": 124, "metadata": {}, "outputs": [], "source": [ "fec_table = pa.concat_tables([fec_table] * 10)" ] }, { "cell_type": "code", "execution_count": 125, "metadata": {}, "outputs": [], "source": [ "# How big is it?\n", "out = pa.BufferOutputStream()\n", "with pa.ipc.RecordBatchStreamWriter(out, fec_table.schema,\n", " options=ipc_options) as writer:\n", " writer.write(fec_table)\n", "num_bytes = len(out.getvalue())" ] }, { "cell_type": "code", "execution_count": 126, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Table is 0.4677470475435257 gigabytes\n" ] } ], "source": [ "print(f'Table is {num_bytes / (1 << 30)} gigabytes')" ] }, { "cell_type": "code", "execution_count": 127, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 8.6 s, sys: 878 ms, total: 9.48 s\n", "Wall time: 1.09 s\n" ] } ], "source": [ "%%time\n", "client.cache_table_in_server('fec_table', fec_table)" ] }, { "cell_type": "code", "execution_count": 128, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['fec_table', 'table1', 'table2', 'table3', 'table4']" ] }, "execution_count": 128, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.list_tables()" ] }, { "cell_type": "code", "execution_count": 129, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 358 ms, sys: 718 ms, total: 1.08 s\n", "Wall time: 630 ms\n" ] } ], "source": [ "%%time \n", "fec_table_received = client.get_table('fec_table')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }