{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Yoochoose: Aggregate Sessions\n", "\n", "Here we download the data, upload it to HDFS and create session objects using Spark." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload data to HDFS" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--2015-07-07 15:18:40-- http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z\n", "Resolving s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)... 54.231.136.80\n", "Connecting to s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)|54.231.136.80|:80... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 287211932 (274M) [application/octet-stream]\n", "Saving to: `yoochoose-data.7z'\n", "\n", "100%[======================================>] 287,211,932 31.0M/s in 14s \n", "\n", "2015-07-07 15:18:54 (19.9 MB/s) - `yoochoose-data.7z' saved [287211932/287211932]\n", "\n" ] } ], "source": [ "!wget http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "7-Zip [64] 9.20 Copyright (c) 1999-2010 Igor Pavlov 2010-11-18\n", "p7zip Version 9.20 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,8 CPUs)\n", "\n", "Processing archive: yoochoose-data.7z\n", "\n", "Extracting yoochoose-buys.dat\n", "Extracting yoochoose-clicks.dat\n", "Extracting yoochoose-test.dat\n", "Extracting dataset-README.txt\n", "\n", "Everything is Ok\n", "\n", "Files: 4\n", "Size: 1914111754\n", "Compressed: 287211932\n" ] } ], "source": [ "!7z x yoochoose-data.7z" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Put data to HDFS" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [], "source": [ "!hdfs dfs -mkdir -p yoochoose/\n", "!hdfs dfs -put yoochoose-*.dat yoochoose/" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false }, "outputs": [], "source": [ "!rm yoochoose-* dataset-README.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Aggregate sessions" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import datetime\n", "import operator\n", "\n", "def parse_datetime(dt_str):\n", " return datetime.datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S.%fZ')\n", "\n", "def parse_clicks(line):\n", " parts = line.split(',')\n", " session_id = int(parts[0])\n", " timestamp, item_id, category = parse_datetime(parts[1]), parts[2], parts[3]\n", " return session_id, (timestamp, item_id, category)\n", "\n", "def parse_buys(line):\n", " parts = line.split(',')\n", " session_id = int(parts[0])\n", " timestamp, item_id, price, quantity = parse_datetime(parts[1]), parts[2], float(parts[3]), int(parts[4])\n", " return session_id, (timestamp, item_id, price, quantity)\n", "\n", "def sort_sessions((session_id, (clicks, buys))):\n", " clicks = sorted(clicks, key=operator.itemgetter(0)) if clicks is not None else []\n", " buys = sorted(buys, key=operator.itemgetter(0)) if buys is not None else []\n", " return session_id, (clicks, buys)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run Spark job" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ ] } ], "source": [ "# Start Spark context\n", "%spark YoochooseSessions\n", "\n", "# read input\n", "clicks = sc.textFile('yoochoose/yoochoose-clicks.dat', 40).map(parse_clicks).groupByKey()\n", "clicksTest = sc.textFile('yoochoose/yoochoose-test.dat', 40).map(parse_clicks).groupByKey()\n", "buys = sc.textFile('yoochoose/yoochoose-buys.dat', 40).map(parse_buys).groupByKey()\n", "\n", "train_sessions = clicks.fullOuterJoin(buys).map(sort_sessions)\n", "test_sessions = clicksTest.map(lambda (session_id, clicks): (session_id, (clicks, None))).map(sort_sessions)\n", "\n", "# screate session files\n", "train_sessions.saveAsPickleFile('yoochoose/train_sessions.pickle')\n", "test_sessions.saveAsPickleFile('yoochoose/test_sessions.pickle')\n", "\n", "sc.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.3" } }, "nbformat": 4, "nbformat_minor": 0 }