{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Datapackage Pipelines Tutorial\n", "\n", "This tutorial is built as a Jupyter notebook which allows you to run and modify the code inline and can be used as a starting point for new Datapackage Pipelines projects." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Installation\n", "\n", "Follow the [DataFlows Tutorial](https://github.com/datahq/dataflows/blob/master/TUTORIAL.ipynb) installation instructions.\n", "\n", "Save this tutorial in curreny working directory (right-click and save on following link): https://raw.githubusercontent.com/frictionlessdata/datapackage-pipelines/master/TUTORIAL.ipynb\n", "\n", "Start Jupyter Lab in the dataflows environment and open the datapackage pipelines tutorial notebook you downloaded." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install datapackage-pipelines" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%%sh\n", "python3 -m pip install -qU datapackage-pipelines[seedup]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This installs datapackage-pipelines with speed optimizations, if you encounter problems installing it, remove the `[speedup]` suffix.\n", "\n", "Verify you have the latest datapackage-pipelines version" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Installed version: 2.0.0\n", "Latest version: 2.0.0\n", "\n" ] } ], "source": [ "%%sh\n", "dpp version" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a flow\n", "\n", "Datapackage-pipelines uses the DataFlows library's Flow objects as the basic building blocks for larger pipeline systems.\n", "\n", "It's recommended to follow the [DataFlows Tutorial](https://github.com/datahq/dataflows/blob/master/TUTORIAL.ipynb) to get a better understanding of the DataFlows concepts which will be used here.\n", "\n", "Run the following cell to create a file called `countries_population_flow.py` which scrapes a list of countries populations from wikipedia.\n", "\n", "This flow is copied from the DataFlows tutorial, the processing function `country_population` is exactly the same, the flow and how we run it is changed to integrate with Datapackage Pipelines:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting countries_population_flow.py\n" ] } ], "source": [ "%%writefile countries_population_flow.py\n", "\n", "# notice that we don't import any datapackage-pipelines modules\n", "# all the flow code is written purely with the DataFlows library\n", "from dataflows import Flow, dump_to_path, load, add_metadata, printer, update_resource\n", "from lxml import etree\n", "from urllib.request import urlopen\n", "\n", "\n", "# Generator flow step, copied from the DataFlows tutorial\n", "# it just spews rows of data - in this case, countries populations scraped from Wikipedia\n", "def country_population():\n", " # Read the Wikipedia page and parse it using etree\n", " page = urlopen('https://en.wikipedia.org/w/index.php?title=List_of_countries_and_dependencies_by_population&oldid=987469839').read()\n", " parser = etree.XMLParser(recover=True)\n", " tree = etree.fromstring(page, parser)\n", " # Iterate on all tables, rows and cells\n", " for table in tree.findall('.//table'):\n", " if 'wikitable' in table.attrib.get('class', ''):\n", " for row in table.find('tbody').findall('tr'):\n", " cells = row.findall('td')\n", " if len(cells) > 3:\n", " # If a matching row is found...\n", " name = cells[0].find('.//a').attrib.get('title').replace(\"Demographics of\",\"\")\n", " population = cells[1].text\n", " # ... yield a row with the information\n", " yield dict(\n", " name=name,\n", " population=population\n", " )\n", "\n", "\n", "# The main entrypoint for Datapackage Pipelines, each flow file should have a single flow function\n", "def flow(*args):\n", " return Flow(\n", " country_population(),\n", " update_resource('res_1', **{\n", " # Set a proper name for the resource\n", " 'name': 'countries_population',\n", " # Always set a path as well, even if you don't intend to save it to the filesystem\n", " 'path': 'countries_population.csv',\n", " # dpp:streaming property is required to let Datapackage Pipelines know it should handle this resource\n", " 'dpp:streaming': True,\n", " })\n", " )\n", "\n", "\n", "# Entrypoint for running the flow directly, without Datapackage Pipelines\n", "if __name__ == '__main__':\n", " # Add a printer step and run the flow\n", " Flow(flow(), printer(num_rows=1, tablefmt='html')).process()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the flow:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "

countries_population

" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
# name\n", "(string) population\n", "(string)
1 China 1,394,640,000
2 India 1,338,310,000
...
240Pitcairn Islands50
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%run countries_population_flow.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is standard DataFlows library usage, now let's see what datapackage-pipelines provides" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a pipeline spec\n", "\n", "Datapackage-pipelines uses yaml files to to define pipelines of flow steps.\n", "\n", "Create a spec to run the countries population flow and save to a path:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting pipeline-spec.yaml\n" ] } ], "source": [ "%%writefile pipeline-spec.yaml\n", "countries-population:\n", " pipeline:\n", " - flow: countries_population_flow\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/countries_population" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Using dpp\n", "\n", "`dpp` is the CLI interface to the datapackage-pipelines library. It is used to list and run available pipelines.\n", "\n", "Let's list the available pipelines to see if our countries-population pipeline is available:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Available Pipelines:\n", "- ./countries-population \n" ] } ], "source": [ "%%sh\n", "dpp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the pipeline:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1A\n", "\u001b[2K./countries-population: \u001b[31mWAITING FOR OUTPUT\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 100 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 200 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 240 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO :RESULTS:\n", "INFO :SUCCESS: ./countries-population {'bytes': 6425, 'count_of_rows': 240, 'dataset_name': '_', 'hash': '1b1585349acef8e155d112fe0cb4b3fc'}\n" ] } ], "source": [ "%%sh\n", "dpp run ./countries-population" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline Dependencies\n", "\n", "Let's add another pipeline which depends on the countries-population pipeline.\n", "\n", "This time we will use just the pipeline spec yaml to write the pipeline, without any DataFlows code (although DataFlows library is used to implement the processors we are using here):" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting pipeline-spec.yaml\n" ] } ], "source": [ "%%writefile pipeline-spec.yaml\n", "\n", "countries-population:\n", " pipeline:\n", " - flow: countries_population_flow\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/countries_population\n", "\n", "sorted_countries_by_name:\n", " dependencies:\n", " - pipeline: ./countries-population\n", " - datapackage: data/countries_population/datapackage.json\n", " pipeline:\n", " - run: load\n", " parameters:\n", " from: data/countries_population/datapackage.json\n", " resources: ['countries_population']\n", " - run: sort\n", " parameters:\n", " resources: ['countries_population']\n", " sort-by: '{name}'\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/sorted_countries_by_name\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Clear the pipelines state using `dpp init` and list the available pipelines:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Available Pipelines:\n", "- ./countries-population (*)\n", "- ./sorted_countries_by_name (*)(E)\n", "\tDirty dependency: Cannot run until dependency is executed: ./countries-population\n" ] } ], "source": [ "%%sh\n", "dpp init\n", "dpp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see that the new pipeline can't run until it's dependency is executed.\n", "\n", "Let's run all the \"Dirty\" dependencies:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1A\n", "\u001b[2K./countries-population: \u001b[31mWAITING FOR OUTPUT\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 100 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 200 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[33mRUNNING, processed 240 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2K./sorted_countries_by_name: \u001b[31mWAITING FOR OUTPUT\u001b[0m\n", "\u001b[3A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2K./sorted_countries_by_name: \u001b[33mRUNNING, processed 100 rows\u001b[0m\n", "\u001b[3A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2K./sorted_countries_by_name: \u001b[33mRUNNING, processed 200 rows\u001b[0m\n", "\u001b[3A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2K./sorted_countries_by_name: \u001b[33mRUNNING, processed 240 rows\u001b[0m\n", "\u001b[3A\n", "\u001b[2K./countries-population: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n", "\u001b[2K./sorted_countries_by_name: \u001b[32mSUCCESS, processed 240 rows\u001b[0m\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO :RESULTS:\n", "INFO :SUCCESS: ./countries-population {'bytes': 6425, 'count_of_rows': 240, 'dataset_name': '_', 'hash': '1b1585349acef8e155d112fe0cb4b3fc'}\n", "INFO :SUCCESS: ./sorted_countries_by_name {'bytes': 6492, 'count_of_rows': 240, 'dataset_name': '_', 'hash': 'a63e74300bbe619d4a8efba26bc43688'}\n" ] } ], "source": [ "%%sh\n", "dpp run --dirty all" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Inspect the created datapackage" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "

countries_population

" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
# name\n", "(string) population\n", "(string)
1 Abkhazia 240,705
2 Afghanistan31,575,018
...
240Zimbabwe 14,848,905
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "(, {})" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dataflows import Flow, load, printer\n", "\n", "Flow(\n", " load('data/sorted_countries_by_name/datapackage.json'),\n", " printer(num_rows=1, tablefmt='html')\n", ").process()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline processors\n", "\n", "Datapackage Pipelines has a standard library of processors, like the `sort` processor used previously. These processors correspond to DataFlows standard library processors.\n", "\n", "See the [Datapackage Pipelines README](https://github.com/frictionlessdata/datapackage-pipelines/blob/master/README.md) for reference and usage examples.\n", "\n", "An example showing usage of common processors:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting pipeline-spec.yaml\n" ] } ], "source": [ "%%writefile pipeline-spec.yaml\n", "\n", "double-winners:\n", " pipeline:\n", " - run: load\n", " parameters:\n", " name: emmies\n", " from: https://raw.githubusercontent.com/datahq/dataflows/master/data/emmy.csv\n", " - run: load\n", " parameters:\n", " name: oscars\n", " from: https://raw.githubusercontent.com/datahq/dataflows/master/data/academy.csv\n", " - run: filter\n", " parameters:\n", " resources: ['emmies']\n", " in:\n", " - winner: 1\n", " - run: concatenate\n", " parameters:\n", " target: {'name': 'emmies_filtered'}\n", " resources: ['emmies']\n", " fields:\n", " emmy_nominee: ['nominee']\n", " - run: join\n", " parameters:\n", " source:\n", " name: 'emmies_filtered'\n", " key: ['emmy_nominee']\n", " delete: true\n", " target:\n", " name: 'oscars'\n", " key: ['Name']\n", " fields: {}\n", " full: false\n", " - run: filter\n", " parameters:\n", " in:\n", " - Winner: \"1\"\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/double_winners\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the pipeline:" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[1A\n", "\u001b[2K./double-winners: \u001b[31mWAITING FOR OUTPUT\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./double-winners: \u001b[33mRUNNING, processed 98 rows\u001b[0m\n", "\u001b[2A\n", "\u001b[2K./double-winners: \u001b[32mSUCCESS, processed 98 rows\u001b[0m\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "INFO :RESULTS:\n", "INFO :SUCCESS: ./double-winners {'bytes': 6766, 'count_of_rows': 98, 'dataset_name': '_', 'hash': 'bc61b69dc87b0da0348049802c616d95'}\n" ] } ], "source": [ "%%sh\n", "dpp run ./double-winners" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print the datapackage:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "

oscars

" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
# Year\n", "(string) Ceremony\n", "(integer)Award\n", "(string) Winner\n", "(string)Name\n", "(string) Film\n", "(string)
1 1931/1932 5Actress 1Helen Hayes The Sin of Madelon Claudet
2 1932/1933 6Actress 1Katharine HepburnMorning Glory
...
98 2015 88Honorary Award1Gena Rowlands
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "(, {})" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dataflows import Flow, printer, load\n", "Flow(load('data/double_winners/datapackage.json'), printer(tablefmt='html', num_rows=1)).process()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipelines Server\n", "\n", "Running pipelines on your laptop is fine for many use-cases but sometimes you want to run pipelines in a more reproducible, scalable and automatic fashion.\n", "\n", "The Datapackage Pipelines Server is a Docker image which provides the core functionality to achieve this.\n", "\n", "To start a local pipelines server for development, you will need to install Docker for [Windows](https://store.docker.com/editions/community/docker-ce-desktop-windows),\n", "[Mac](https://store.docker.com/editions/community/docker-ce-desktop-mac) or [Linux](https://docs.docker.com/install/)\n", "\n", "Pull the datapackage-pipelines image:" ] }, { "cell_type": "code", "execution_count": 120, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using default tag: latest\n", "latest: Pulling from frictionlessdata/datapackage-pipelines\n", "Digest: sha256:50fd5b40523146af0e46275f836357bf27097c1d9c83726b03da884e56d385bb\n", "Status: Image is up to date for frictionlessdata/datapackage-pipelines:latest\n" ] } ], "source": [ "%%sh\n", "docker pull frictionlessdata/datapackage-pipelines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start a local pipelines server, mounting the current working directory into the container:" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "33254bf2b410b28d20cb7d4989144d49488b9b9eea1af7dfe3861d97a58216d6\n" ] } ], "source": [ "%%sh\n", "docker run -d --name dpp -v `pwd`:/pipelines:rw -p 5000:5000 frictionlessdata/datapackage-pipelines server" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After a few seconds, the pipelines dashboad should be available at http://localhost:5000\n", "\n", "New / modified pipelines and dirty dependencies are executed by the pipelines server automatically.\n", "\n", "The server also supports scheduled pipelines for periodical execution.\n", "\n", "Let's see this in action:" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting pipeline-spec.yaml\n" ] } ], "source": [ "%%writefile pipeline-spec.yaml\n", "\n", "countries-population:\n", " schedule:\n", " # minute hour day_of_week day_of_month month_of_year\n", " crontab: '* * * * *'\n", " pipeline:\n", " - flow: countries_population_flow\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/countries_population\n", "\n", "sorted_countries_by_name:\n", " dependencies:\n", " - pipeline: ./countries-population\n", " - datapackage: data/countries_population/datapackage.json\n", " pipeline:\n", " - run: load\n", " parameters:\n", " from: data/countries_population/datapackage.json\n", " resources: ['countries_population']\n", " - run: sort\n", " parameters:\n", " resources: ['countries_population']\n", " sort-by: '{name}'\n", " - run: dump.to_path\n", " parameters:\n", " out-path: data/sorted_countries_by_name\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Inspect the Pipelines server logs and wait for `Update Pipelines` task to complete and pipelines to start running" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[2018-10-16 13:32:00,346: INFO/ForkPoolWorker-1(86)] Update Pipelines (scheduled)\n", "[2018-10-16 13:32:00,353: INFO/ForkPoolWorker-1(86)] 4017b8d9 QUEUEING SCHEDULED task ./countries-population\n", "[2018-10-16 13:32:00,356: INFO/MainProcess(38)] Received task: datapackage_pipelines.celery_tasks.celery_tasks.execute_pipeline_task[9c70f3a8-f598-4232-9c59-5ced466a3ae2] \n", "[2018-10-16 13:32:00,357: INFO/ForkPoolWorker-2(87)] 4017b8d9 RUNNING ./countries-population\n", "[2018-10-16 13:32:00,588: INFO/ForkPoolWorker-1(86)] Task datapackage_pipelines.celery_tasks.celery_tasks.update_pipelines[064cd648-04c0-43ed-9b11-2ec476b8d693] succeeded in 0.24168486997950822s: None\n" ] } ], "source": [ "%%sh\n", "docker logs dpp --tail 5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Refresh the dashboard to see the new pipelines and execution logs: http://localhost:5000" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Next Steps\n", "\n", "* [Datapackage Pipelines Documentation](https://github.com/frictionlessdata/datapackage-pipelines/blob/master/README.md) - contains detailed documentation, reference and usage examples\n", "* [DataFlows Processors reference](https://github.com/datahq/dataflows/blob/master/PROCESSORS.md) - detailed reference for all the available DataFlows standard library processors." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.0" } }, "nbformat": 4, "nbformat_minor": 2 }