{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Core Techniques used in our ETL\n", "\n", "* Generators\n", "* Partial function application\n", "* Batching / Chunking\n", "* Caching" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import collections\n", "import functools\n", "import more_itertools\n", "import json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Generators\n", "\n", "python [generators](https://docs.python.org/3/tutorial/classes.html#generators) allow you to concisely create iterators.\n", "\n", "They are a highlighted technique in this workshop because they provide:\n", "* Concise code\n", "* Deferred evaluation\n", "* Easy chaining for composing a tranformation process" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# start with a function that produces a list of squared numbers\n", "def squares_as_list(max_n):\n", " accum = []\n", " x = 1\n", " while x <= max_n:\n", " accum.append(x * x)\n", " x = x + 1\n", " return accum\n", "\n", "# output the result\n", "result = squares_as_list(10)\n", "print('Type is: ' + str(type(result)))\n", "for i in result:\n", " print(i)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# here is a similar function, but implemented as a generator\n", "def squares_as_generator(max_n):\n", " x = 1\n", " while x < max_n:\n", " yield x * x\n", " x = x + 1\n", "\n", "\n", "result = squares_as_generator(10)\n", "print('Type is: ' + str(type(result)))\n", "\n", "# # loop directly as an iterable\n", "print('All 10 using a loop')\n", "for s in result:\n", " print(s)\n", " \n", "# print('Just 5 iterations to demonstrate deferred evaluation...')\n", "another_gen = squares_as_generator(10)\n", "print(next(another_gen))\n", "print(next(another_gen))\n", "print(next(another_gen))\n", "print(next(another_gen))\n", "print(next(another_gen))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Chaining\n", "\n", "Generators are first-class objects in python. So you can pass them as arguments (iterables) to other generators to change operations." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#\n", "# Generator Chaining example\n", "#\n", "\n", "def f_A(n):\n", " x = 1\n", " while x < n:\n", " yield x * x\n", " x = x + 1\n", " \n", "def f_B(iter_a):\n", " for y in iter_a:\n", " yield y + 10000\n", " \n", "def f_C(iter_b):\n", " for z in iter_b:\n", " yield \"'myprefix \" + str(z) + \"'\"\n", " \n", "# chain the first two\n", "gen_a = f_A(10)\n", "gen_b = f_B(gen_a)\n", "print('First two chained')\n", "for r in gen_b:\n", " print(r)\n", "\n", "# print('\\nAll 3 chained')\n", "gen_a = f_A(10)\n", "gen_b = f_B(gen_a)\n", "gen_c = f_C(gen_b)\n", "for r in gen_c:\n", " print(r)\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Simplistic ETL\n", "\n", "This code sample shows a very simple *ETL* which leverages generators and chaining.\n", "\n", "This is somewhat contrived as it doesn't use a database. It uses a list as \"source data\" and a dictionary as a \"destination\" for inserting results. The main point is to show the separation of the 3 areas and how they can be chained together as generators." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# source: assume this list are the database rows\n", "SOURCE_DATA = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 13, 14]\n", "\n", "DESTINATION_DB = collections.OrderedDict()\n", "\n", "def extractor(source_data):\n", " for item in source_data:\n", " yield item\n", " \n", "def transformer(iter_extractor):\n", " for item in iter_extractor:\n", " # transform it into a tuple of (n, n^2)\n", " transformed_item = (item, item * item)\n", " yield transformed_item\n", " \n", "def loader(iter_transformer, db):\n", " for item in iter_transformer:\n", " # insert each tuple as an item into the storage dictionary\n", " k = str(item[0])\n", " v = item[1]\n", " db[k] = v\n", " \n", "\n", "# here is a simple example of chaining generators\n", "extracted_gen = extractor(SOURCE_DATA)\n", "\n", "transformed_gen = transformer(extracted_gen)\n", "\n", "loader(transformed_gen, DESTINATION_DB)\n", "\n", "# output the loaded results\n", "print(json.dumps(DESTINATION_DB, indent=2))\n" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## Partial functions\n", "\n", "You can create partial function objects using [functools.partial()](https://docs.python.org/3/library/functools.html#functools.partial).\n", " \n", "This allows you to \"freeze\" function arguments (args) or keyword (kwargs).\n", "\n", "This is a quick method to implement encapsulation (bundling data with methods)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def add(x, y):\n", " return x + y\n", "\n", "# print('Simple addition')\n", "# print('1 + 2 = %d' % add(1, 2))\n", "# print('2 + 3 = %d' % add(2, 3))\n", "\n", "print('partial add_1 function')\n", "# NOTE: order of args matters!\n", "add_1 = functools.partial(add, 1)\n", "\n", "print('add_1(1) = %d' % add_1(1))\n", "print('add_1(2) = %d' % add_1(2))\n", "\n", "print('partial add_2 function')\n", "add_2 = functools.partial(add, 2)\n", "\n", "print('add_2(1) = %d' % add_2(1))\n", "print('add_2(2) = %d' % add_2(2))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import functools\n", "\n", "# similarly, you can freeze kwargs to avoid ordering constraints\n", "def pow(x, n=1):\n", " return x ** n\n", " \n", "print('regular')\n", "print( pow(2, n=3) )\n", "\n", "print('partial with n=2')\n", "pow_2 = functools.partial(pow, n=2)\n", "print(type(pow_2))\n", "print( pow_2(2) )\n", "\n", "print('partial with n=3')\n", "pow_3 = functools.partial(pow, n=3)\n", "print( pow_3(2) )\n", "\n", "pow_easy = functools.partial(pow, 5, n=2)\n", "print( pow_easy() )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For modules with _single operations_, you can quickly implement parameterization using partial functions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# example: this tranformer generator has multiple kwargs which serve\n", "# parameters indicating its behavior\n", "def tranform_func_with_config(\n", " iter_extractor, \n", " translate=0,\n", " scale=1, \n", " cast_func=int\n", "):\n", " for x in iter_extractor:\n", " t = x + translate\n", " t = scale * t\n", " t = cast_func(t)\n", "\n", " yield (x, t)\n", " \n", "# now we can create multiple transformer configurations via partial functions\n", "# these configurations can be read from a JSON file\n", "config_1 = {'translate': 1, 'scale': 2}\n", "config_2 = {'scale': -1, 'cast_func': str}\n", "\n", "# create partial functions quickly by unpacking the configuration to freeze the kwargs\n", "transform_1 = functools.partial(tranform_func_with_config, **config_1)\n", "transform_2 = functools.partial(tranform_func_with_config, **config_2)\n", "\n", "# let's output one of them\n", "extracted_gen = extractor(SOURCE_DATA)\n", "tranform_1_gen = transform_1(extracted_gen)\n", "\n", "for t in tranform_1_gen:\n", " print(t)\n", " \n", "# any questions?" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# the real power is that the partial function _encapsulates_ the confirmation so that \n", "# other functions (like this simple process method) need not be concerned with it\n", "def process(f_extractor, f_transformer, f_loader):\n", " \n", " # run the process\n", " extractor_gen = f_extractor(SOURCE_DATA)\n", " \n", " transformer_gen = f_transformer(extractor_gen)\n", " \n", " f_loader(transformer_gen, DESTINATION_DB)\n", "\n", "\n", "DESTINATION_DB.clear()\n", "print('configuration 1')\n", "process(extractor, transform_1, loader)\n", "print(json.dumps(DESTINATION_DB, indent=2))\n", "\n", "\n", "DESTINATION_DB.clear()\n", "print('\\nconfiguration 2')\n", "process(extractor, transform_2, loader)\n", "print(json.dumps(DESTINATION_DB, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Batching\n", "\n", "This is also known as \"chunking\". This is easy using [more_itertools.chunked()](https://more-itertools.readthedocs.io/en/latest/api.html#more_itertools.chunked).\n", "\n", "This consumes any iterable, but outputs its iterated items into batched lists of a maximum size. This greatly reduces complexity of your code because you need not worry about how many items your input iterator produces. You also don't need any edge case logic to handle 'remainder' items." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# range() is a python built-in. since python 3, it is a generator!\n", "source_gen = range(20)\n", "\n", "print('normal consumption')\n", "for item in source_gen:\n", " print(item)\n", " \n", "print('\\nbatched consumption')\n", "source_gen = range(20)\n", "chunk_size = 3\n", "batched_gen = more_itertools.chunked(source_gen, chunk_size)\n", "for item in batched_gen:\n", " print('{} of size {}: {}'.format(type(item), len(item), item))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Depth First Search\n", "\n", "We're specifically using depth-first search with pre-order traversal\n", "\n", "* **Recursive:** The same function is called on nested subtrees\n", "\n", "* **Pre-order traversal:** Inspect a node's items first *before* considering its children\n", "\n", "* **Depth first search:** When you recurse, explore as far down a child's tree before backtracking\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# node - current node in the tree\n", "# path - list of strings representing 'path components' down the JSON tree\n", "\n", "# f_gen_items - produces 'transformed' items for a node\n", "# f_gen_children - produces child nodes to search\n", "\n", "def _recursive_map_nested(node:dict, path: list, f_gen_items, f_gen_children):\n", " if not node: # empty node\n", " return\n", "\n", "\n", " gen_items = f_gen_items(node, path)\n", " yield from gen_items\n", "\n", " gen_children = f_gen_children(node, path)\n", " for child_path, child_node in gen_children:\n", " yield from _recursive_map_nested(\n", " child_node,\n", " child_path,\n", " gen_items,\n", " gen_children)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def my_gen_items(node: dict, path: list):\n", " \"\"\" converts scalar dictionary items to response event arguments reflecting answers \"\"\"\n", " for k, v in node.items():\n", " if not isinstance(v, (dict, list,)):\n", " path_str = '.'.join(path + [k])\n", " node_info = ...\n", " \n", " if node_info:\n", " str_value= ...\n", " yield path_str, {\n", " 'answer_type': node_info.answer_type, \n", " 'value': str_value\n", " }\n", "\n", "\n", "def my_gen_children():\n", " \"\"\" locates and generates child nodes\"\"\"\n", " node_slug = node.get('slug')\n", " children = node.get('children')\n", " if node_slug and children:\n", " for child in children:\n", " child_slug = child.get('slug')\n", " if child_slug:\n", " yield path + [child_slug], child\n", " \n", "\n", " \n", "# initial call would be\n", "root = { ... }\n", "transformed_items = _recursive_map_nested(root, [], my_gen_items, my_gen_children)\n", "\n", "# pass 'transformed_items' (another generator) to the loader" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Caching\n", "\n", "Some computations are time consuming. You can store pre-computed results in memory via a cache.\n", "\n", "Python comes with a built-in caching function: [functools.lru_cache()](https://docs.python.org/3/library/functools.html#functools.lru_cache). You can easily wrap an \"expensive\" function so that it will cache a maximum number of results.\n", "\n", "This cache uses a [Least Recently Used](https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_Recently_Used_.28LRU.29) cache replacement policy. This just means that if you need to add a new item to a cache that is full, review your existing items and evict the least recently used one before inserting a new item. This is most easily implemented with a hash table (for quick lookup) along with a doubly-linked list (for quickly locating the least recently used item to evict). Other data structures exist with some tradeoffs (e.g. data structures with age bits).\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "@functools.lru_cache(maxsize=4)\n", "def cached_pow(x, n):\n", " print(\"-- Oh be careful... I'm expensive!\")\n", " return x ** n\n", "\n", "# this will run the actual method but cache the results\n", "print('Populate cache with 2 different items')\n", "print( cached_pow(2, 3) )\n", "print( cached_pow(2, 4) )\n", "\n", "# this will use cached results (notice the absence of the warning)\n", "print('\\nRe-run same requests so that it retrieves from the cache')\n", "print( cached_pow(2, 3) )\n", "print( cached_pow(2, 3) )\n", "print( cached_pow(2, 4) )\n", "print( cached_pow(2, 4) )\n", "print( cached_pow(2, 4) )\n", "\n", "# this will force an eviction (2+3 > 4 max items) of the first pow(2,3) result\n", "print('\\n3 more different items')\n", "print( cached_pow(2, 5) )\n", "print( cached_pow(2, 6) )\n", "print( cached_pow(2, 7) )\n", "\n", "# run the very last one along with (2,3) again to re-evaluate\n", "print('\\n(2,3) should have been evicted, will require an evaluation')\n", "print( cached_pow(2, 7) )\n", "print( cached_pow(2, 3) )\n", "\n", "print('cache metrics')\n", "cache_info = cached_pow.cache_info()\n", "print(cache_info)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, using this cache requires a bit of care. The documentation briefly mentions that:\n", "\n", "```\n", "...the positional and keyword arguments to the function must be hashable...\n", "```\n", "\n", "This is actually quite critical when working with the SQLAlchemy ORM. This is because the [Session](http://docs.sqlalchemy.org/en/latest/orm/session_api.html#sqlalchemy.orm.session.Session) object should not be considered hasheable. It is a class instance that likely has a lot of internal state that is dynamically changing under the hood." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# a contrived session class which uses our contrived database\n", "class CrankySession(object):\n", " def __init__(self, db):\n", " self.db = db\n", " \n", " def query(self, idx: int):\n", " print(\"-- fine fine... I'll check the database\")\n", " return self.db[idx]\n", " \n", " def __hash__(self):\n", " raise RuntimeError(\"WATCH IT BUDDY! I'm not hashable!\")\n", " \n", "# let's use the lru_cache decorator disregarding the documentation regarding hashable arguments\n", "@functools.lru_cache(maxsize=4)\n", "def broken_session_lookup(session: CrankySession, idx: int):\n", " return session.query(idx)\n", "\n", "# now try running it\n", "session = CrankySession(DESTINATION_DB)\n", "broken_session_lookup(session, \"1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can easily get around this using partial functions. Here is an example which implements a contrived (but simple Session) which explodes if you try and hash it." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# start with an unwrapped function\n", "def raw_session_lookup(session: CrankySession, idx: int):\n", " return session.query(idx)\n", "\n", "# create a new partial function to \"freeze\" the session argument\n", "partial_session_lookup = functools.partial(raw_session_lookup, session)\n", "\n", "# now you can safely wrap the partial function with the lru_cache method\n", "# NOTE: you need to call the wrapper directly rather than using a decorator syntax\n", "cache_wrapper = functools.lru_cache(maxsize=4)\n", "cached_session_lookup = cache_wrapper(partial_session_lookup)\n", "\n", "# now call it to your heart's content\n", "print(cached_session_lookup(\"1\"))\n", "print(cached_session_lookup(\"2\"))\n", "print(cached_session_lookup(\"2\"))\n", "print(cached_session_lookup(\"1\"))\n", "print(cached_session_lookup(\"1\"))\n", "cache_info = cached_session_lookup.cache_info()\n", "print(cache_info)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.3" } }, "nbformat": 4, "nbformat_minor": 2 }