{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Environment Setup" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [], "source": [ "from os import path as p, chdir\n", "\n", "if 'examples' in p.abspath('.'):\n", " chdir('..')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch a webpage" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "In this example, we fetch the title of a webpage." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'content': b'Hacker News'}" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.fetchpage import pipe\n", "\n", "url = 'https://news.ycombinator.com/'\n", "next(pipe(conf={'url': url, 'start': '', 'end': ''}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch a webpage using an xpath" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "Here, we fetch the the first hackernews story link using an xpath." ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'class': 'storylink',\n", " 'content': 'Quintuple: a Python 5-qubit quantum computer simulator',\n", " 'href': 'https://arxiv.org/abs/1606.09225'}" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.xpathfetchpage import pipe\n", "\n", "xpath = '/html/body/center/table/tr[3]/td/table/tr[1]/td[3]/a'\n", "next(pipe(conf={'url': 'https://news.ycombinator.com/', 'xpath': xpath}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Word Count" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here, we use several pipes to count the number of words on a webpage." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{\"'sad\": 1}" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "### Create a SyncPipe flow ###\n", "#\n", "# `SyncPipe` is a convenience class that creates chainable flows\n", "# and allows for parallel processing.\n", "from riko.collections import SyncPipe\n", "\n", "### Set the pipe configurations ###\n", "#\n", "# Notes:\n", "# 1. the `detag` option will strip all html tags from the result\n", "# 2. fetch the text contained inside the 'body' tag of the hackernews homepage\n", "# 3. replace newlines with spaces and assign the result to 'content'\n", "# 4. tokenize the resulting text using whitespace as the delimeter\n", "# 5. count the number of times each token appears\n", "# 6. obtain the raw stream\n", "# 7. extract the first word and its count\n", "url = 'https://news.ycombinator.com/'\n", "fetch_conf = {'url': url, 'start': '', 'end': '', 'detag': True} # 1\n", "replace_conf = {'rule': [{'find': '\\r\\n', 'replace': ' '}, {'find': '\\n', 'replace': ' '}]}\n", "\n", "flow = (\n", " SyncPipe('fetchpage', conf=fetch_conf) # 2\n", " .strreplace(conf=replace_conf, assign='content') # 3\n", " .tokenizer(conf={'delimiter': ' '}, emit=True) # 4\n", " .count(conf={'count_key': 'content'})) # 5\n", "\n", "stream = flow.output # 6 \n", "next(stream) # 7" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetching feeds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`riko` can fetch rss feeds from both local and remote filepaths via \"source\"\n", "`pipes`. Each \"source\" `pipe` returns a `stream`, i.e., an iterator of\n", "dictionaries, aka `items`." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "('Quintuple: a Python 5-qubit quantum computer simulator',\n", " 'https://arxiv.org/abs/1606.09225',\n", " 'https://news.ycombinator.com/item?id=12106163')" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.fetch import pipe\n", "\n", "### Fetch an RSS feed ###\n", "stream = pipe(conf={'url': 'https://news.ycombinator.com/rss'})\n", "item = next(stream)\n", "item['title'], item['link'], item['comments']" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dict_keys(['title_detail', 'author.uri', 'tags', 'summary_detail', 'author_detail', 'author.name', 'y:published', 'y:title', 'content', 'title', 'pubDate', 'guidislink', 'id', 'summary', 'dc:creator', 'authors', 'published_parsed', 'links', 'y:id', 'author', 'link', 'published'])" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.fetchsitefeed import pipe \n", "\n", "### Fetch the first RSS feed found ###\n", "#\n", "# Note: regardless of how you fetch an RSS feed, it will have the same\n", "# structure\n", "stream = pipe(conf={'url': 'http://arstechnica.com/rss-feeds/'})\n", "item = next(stream)\n", "item.keys()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "('Gravity doesn’t care about quantum spin',\n", " 'Chris Lee',\n", " 'http://arstechnica.com/?p=924009')" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "item['title'], item['author'], item['id']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please see the [FAQ](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst) for a complete list of supported [file types](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-file-types-are-supported) and\n", "[protocols](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-protocols-are-supported). Please see [Fetching data and feeds](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#fetching-data-and-feeds) for more examples." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Synchronous processing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`riko` can modify `streams` via the 40 built-in `pipes`" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "'Open Artificial Pancreas home:'" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.collections import SyncPipe\n", "\n", "### Set the pipe configurations ###\n", "fetch_conf = {'url': 'https://news.ycombinator.com/rss'}\n", "filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}\n", "xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'\n", "xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}\n", "\n", "### Create a SyncPipe flow ###\n", "#\n", "# `SyncPipe` is a convenience class that creates chainable flows\n", "# and allows for parallel processing.\n", "#\n", "# The following flow will:\n", "# 1. fetch the hackernews RSS feed\n", "# 2. filter for items with '.com' in the link\n", "# 3. sort the items ascending by title\n", "# 4. fetch the first comment from each item\n", "# 5. flatten the result into one raw stream\n", "# 6. extract the first item's content\n", "#\n", "# Note: sorting is not lazy so take caution when using this pipe\n", "\n", "flow = (\n", " SyncPipe('fetch', conf=fetch_conf) # 1\n", " .filter(conf={'rule': filter_rule}) # 2\n", " .sort(conf={'rule': {'sort_key': 'title'}}) # 3\n", " .xpathfetchpage(conf=xpath_conf)) # 4\n", "\n", "stream = flow.output # 5\n", "next(stream)['content'] # 6" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please see [alternate workflow creation](https://github.com/nerevu/riko/blob/master/COOKBOOK.rst#synchronous-processing) for an alternative (function based) method for\n", "creating a `stream`. Please see [pipes](https://github.com/nerevu/riko/blob/master/docs/FAQ.rst#what-pipes-are-available) for a complete list of available `pipes`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallel processing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "An example using `riko`'s parallel API to spawn a `ThreadPool`. You can instead enable a `ProcessPool` by additionally passing `threads=False` to `SyncPipe`, i.e., `SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False)`." ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "'He uses the following example for when to throw your own errors:'" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.collections import SyncPipe\n", "\n", "### Set the pipe configurations ###\n", "fetch_conf = {'url': 'https://news.ycombinator.com/rss'}\n", "filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}\n", "xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'\n", "xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}\n", "\n", "### Create a parallel SyncPipe flow ###\n", "#\n", "# The following flow will:\n", "# 1. fetch the hackernews RSS feed \n", "# 2. filter for items with '.com' in the article link\n", "# 3. fetch the first comment from all items in parallel (using 4 workers)\n", "# 4. flatten the result into one raw stream\n", "# 5. extract the first item's content\n", "#\n", "# Note: no point in sorting after the filter since parallel fetching doesn't guarantee \n", "# order\n", "flow = (\n", " SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4) # 1\n", " .filter(conf={'rule': filter_rule}) # 2\n", " .xpathfetchpage(conf=xpath_conf)) # 3\n", "\n", "stream = flow.output # 4\n", "next(stream)['content'] # 5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Asynchronous processing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To enable asynchronous processing, you must install the `async` module." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`pip install riko[async]`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "An example using `riko`'s asynchronous API." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Here's how iteration works ():\n" ] } ], "source": [ "from riko.bado import coroutine, react\n", "from riko.collections import AsyncPipe\n", "\n", "### Set the pipe configurations ###\n", "fetch_conf = {'url': 'https://news.ycombinator.com/rss'}\n", "filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}\n", "xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'\n", "xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}\n", "\n", "### Create an AsyncPipe flow ###\n", "#\n", "# The following flow will:\n", "# 1. fetch the hackernews RSS feed\n", "# 2. filter for items with '.com' in the article link\n", "# 3. asynchronously fetch the first comment from each item (using 4 connections)\n", "# 4. flatten the result into one raw stream\n", "# 5. extract the first item's content\n", "#\n", "# Note: no point in sorting after the filter since async fetching doesn't guarantee \n", "# order\n", "@coroutine\n", "def run(reactor):\n", " stream = yield (\n", " AsyncPipe('fetch', conf=fetch_conf, connections=4) # 1\n", " .filter(conf={'rule': filter_rule}) # 2\n", " .xpathfetchpage(conf=xpath_conf) # 3\n", " .output) # 4\n", " \n", " print(next(stream)['content']) # 5\n", " \n", "try:\n", " react(run)\n", "except SystemExit:\n", " pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Design Principles" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The primary data structures in `riko` are the `item` and `stream`. An `item`\n", "is just a python dictionary, and a `stream` is an iterator of `items`. You can\n", "create a `stream` manually with something as simple as\n", "`[{'content': 'hello world'}]`. You manipulate `streams` in\n", "`riko` via `pipes`. A `pipe` is simply a function that accepts either a\n", "`stream` or `item`, and returns a `stream`. `pipes` are composable: you\n", "can use the output of one `pipe` as the input to another `pipe`.\n", "\n", "`riko` `pipes` come in two flavors; `operators` and `processors`.\n", "`operators` operate on an entire `stream` at once and are unable to handle\n", "individual items. Example `operators` include `pipecount`, `pipefilter`,\n", "and `pipereverse`." ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'title': 'riko pt. 2'}" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.reverse import pipe\n", "\n", "stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]\n", "next(pipe(stream))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`processors` process individual `items` and can be parallelized across\n", "threads or processes. Example `processors` include `pipefetchsitefeed`,\n", "`pipehash`, `pipeitembuilder`, and `piperegex`." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'hash': 1323840151, 'title': 'riko pt. 1'}" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.hash import pipe\n", "\n", "item = {'title': 'riko pt. 1'}\n", "stream = pipe(item, field='title')\n", "next(stream)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some `processors`, e.g., `pipetokenizer`, return multiple results." ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'tokenizer': [{'content': 'riko'},\n", " {'content': 'pt.'},\n", " {'content': '1'}],\n", " 'title': 'riko pt. 1'}" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.tokenizer import pipe\n", "\n", "item = {'title': 'riko pt. 1'}\n", "tokenizer_conf = {'delimiter': ' '}\n", "stream = pipe(item, conf=tokenizer_conf, field='title')\n", "next(stream)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'content': 'riko'}" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# In this case, if we just want the result, we can `emit` it instead\n", "stream = pipe(item, conf=tokenizer_conf, field='title', emit=True)\n", "next(stream)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`operators` are split into sub-types of `aggregators`\n", "and `composers`. `aggregators`, e.g., `count`, combine\n", "all `items` of an input `stream` into a new `stream` with a single `item`;\n", "while `composers`, e.g., `filter`, create a new `stream` containing\n", "some or all `items` of an input `stream`." ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'count': 2}" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.count import pipe\n", "\n", "stream = [{'title': 'riko pt. 1'}, {'title': 'riko pt. 2'}]\n", "next(pipe(stream))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`processors` are split into sub-types of `source` and `transformer`.\n", "`sources`, e.g., `itembuilder`, can create a `stream` while\n", "`transformers`, e.g. `hash` can only transform items in a `stream`." ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'title': 'riko pt. 1'}" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules.itembuilder import pipe\n", "\n", "attrs = {'key': 'title', 'value': 'riko pt. 1'}\n", "next(pipe(conf={'attrs': attrs}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following table summaries these observations:\n", "\n", "\n", "type | sub-type | input | output | parallelizable? | creates streams? \n", "-----|----------|-------|--------|-----------------|-----------------\n", "operator | aggregator | stream | stream * |\n", "operator | composer | stream | stream |\n", "processor | source | item | stream | √ | √ \n", "processor | transformer | item | stream | √ " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are unsure of the type of `pipe` you have, check its metadata.\n", "\n", "`*` the output `stream` of an `aggregator` is an iterator of only 1 `item`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'__wrapped__': ,\n", " 'name': 'fetchpage',\n", " 'sub_type': 'source',\n", " 'type': 'processor'}" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.modules import fetchpage, count\n", "\n", "fetchpage.async_pipe.__dict__" ] }, { "cell_type": "code", "execution_count": 29, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'__wrapped__': ,\n", " 'name': 'count',\n", " 'sub_type': 'aggregator',\n", " 'type': 'operator'}" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "count.pipe.__dict__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `SyncPipe` and `AsyncPipe` classes (among other things) perform this\n", "check for you to allow for convenient method chaining and transparent\n", "parallelization." ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'content': \"Let's talk about riko!\",\n", " 'hash': 2092196356,\n", " 'title': 'riko pt. 1'}" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from riko.collections import SyncPipe\n", "\n", "attrs = [\n", " {'key': 'title', 'value': 'riko pt. 1'},\n", " {'key': 'content', 'value': \"Let's talk about riko!\"}]\n", "flow = SyncPipe('itembuilder', conf={'attrs': attrs}).hash()\n", "flow.list[0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please see the [cookbook](https://github.com/nerevu/riko/blob/master/docs/COOKBOOK.rst) for advanced examples including how to wire in\n", "vales from other pipes or accept user input." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Command-line Interface" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`riko` provides a command, `runpipe`, to execute `workflows`. A\n", "`workflow` is simply a file containing a function named `pipe` that creates\n", "a `flow` and processes the resulting `stream`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CLI Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`flow.py`" ] }, { "cell_type": "code", "execution_count": 31, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from __future__ import print_function\n", "from riko.collections import SyncPipe\n", "\n", "conf1 = {'attrs': [{'value': 'https://google.com', 'key': 'content'}]}\n", "conf2 = {'rule': [{'find': 'com', 'replace': 'co.uk'}]}\n", "\n", "def pipe(test=False):\n", " kwargs = {'conf': conf1, 'test': test}\n", " flow = SyncPipe('itembuilder', **kwargs).strreplace(conf=conf2)\n", " stream = flow.output\n", " \n", " for i in stream:\n", " print(i)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CLI Usage" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now to execute `flow.py`, type the command `runpipe flow`. You should\n", "then see the following output in your terminal:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`https://google.co.uk`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`runpipe` will also search the `examples` directory for `workflows`. Type\n", "`runpipe demo` and you should see the following output:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Deadline to clear up health law eligibility near 682`" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.1" } }, "nbformat": 4, "nbformat_minor": 0 }