# Yoochoose: Aggregate Sessions

Here we download the data, upload it to HDFS and create session objects using Spark.

### Upload data to HDFS

In [1]:
!wget http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z

--2015-07-07 15:18:40-- http://s3-eu-west-1.amazonaws.com/yc-rdata/yoochoose-data.7z
Resolving s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)... 54.231.136.80
Connecting to s3-eu-west-1.amazonaws.com (s3-eu-west-1.amazonaws.com)|54.231.136.80|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 287211932 (274M) [application/octet-stream]
Saving to: `yoochoose-data.7z'


2015-07-07 15:18:54 (19.9 MB/s) - `yoochoose-data.7z' saved [287211932/287211932]



In [2]:
!7z x yoochoose-data.7z


7-Zip [64] 9.20 Copyright (c) 1999-2010 Igor Pavlov 2010-11-18
p7zip Version 9.20 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,8 CPUs)

Processing archive: yoochoose-data.7z

Extracting yoochoose-buys.dat
Extracting yoochoose-clicks.dat
Extracting yoochoose-test.dat
Extracting dataset-README.txt

Everything is Ok

Files: 4
Size: 1914111754
Compressed: 287211932


### Put data to HDFS

In [3]:
!hdfs dfs -mkdir -p yoochoose/
!hdfs dfs -put yoochoose-*.dat yoochoose/

In [4]:
!rm yoochoose-* dataset-README.txt

### Aggregate sessions

In [5]:
import datetime
import operator

def parse_datetime(dt_str):
 return datetime.datetime.strptime(dt_str, '%Y-%m-%dT%H:%M:%S.%fZ')

def parse_clicks(line):
 parts = line.split(',')
 session_id = int(parts[0])
 timestamp, item_id, category = parse_datetime(parts[1]), parts[2], parts[3]
 return session_id, (timestamp, item_id, category)

def parse_buys(line):
 parts = line.split(',')
 session_id = int(parts[0])
 timestamp, item_id, price, quantity = parse_datetime(parts[1]), parts[2], float(parts[3]), int(parts[4])
 return session_id, (timestamp, item_id, price, quantity)

def sort_sessions((session_id, (clicks, buys))):
 clicks = sorted(clicks, key=operator.itemgetter(0)) if clicks is not None else []
 buys = sorted(buys, key=operator.itemgetter(0)) if buys is not None else []
 return session_id, (clicks, buys)

#### Run Spark job

In [6]:
# Start Spark context
%spark YoochooseSessions

# read input
clicks = sc.textFile('yoochoose/yoochoose-clicks.dat', 40).map(parse_clicks).groupByKey()
clicksTest = sc.textFile('yoochoose/yoochoose-test.dat', 40).map(parse_clicks).groupByKey()
buys = sc.textFile('yoochoose/yoochoose-buys.dat', 40).map(parse_buys).groupByKey()

train_sessions = clicks.fullOuterJoin(buys).map(sort_sessions)
test_sessions = clicksTest.map(lambda (session_id, clicks): (session_id, (clicks, None))).map(sort_sessions)

# screate session files
train_sessions.saveAsPickleFile('yoochoose/train_sessions.pickle')
test_sessions.saveAsPickleFile('yoochoose/test_sessions.pickle')

sc.stop()

