{"metadata": {"kernelspec": {"language": "python", "display_name": "Python 2 with Spark 2.0", "name": "python2-spark20"}, "language_info": {"name": "python", "codemirror_mode": {"version": 2, "name": "ipython"}, "version": "2.7.11", "mimetype": "text/x-python", "file_extension": ".py", "nbconvert_exporter": "python", "pygments_lexer": "ipython2"}}, "nbformat_minor": 0, "cells": [{"metadata": {}, "cell_type": "markdown", "source": "# Options to Access Data from Spark Enterprise Cluster\n\n1. Use data stored in local file system. If this gets slow due to too many users on the system, can try\n2. Pull data from IBM Object Storage. The network connection to IBM Object Storage is quite fast"}, {"metadata": {}, "cell_type": "markdown", "source": "
\n## 1. Using Local Files on Spark Enterprise SETI cluster\n "}, {"execution_count": 123, "outputs": [], "cell_type": "code", "source": "import os\nimport ibmseti\nimport pandas as pd\n\n\n### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results\nmydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name\nif os.path.exists(mydatafolder) is False:\n os.makedirs(mydatafolder)\n ", "metadata": {"collapsed": true}}, {"execution_count": 78, "outputs": [], "cell_type": "code", "source": "# Will will use the Index files to retrieve our data\n# Each line in training set index file contains UUID, SIGNAL_CLASSIFICATION \n\nprimarySmallIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' )\nprimaryMediumIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv')\nbasicIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' )\ntestSetIndex = os.path.join( os.environ['PWD'], 'data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )\n", "metadata": {"collapsed": true}}, {"execution_count": null, "outputs": [], "cell_type": "code", "source": "#define some variables for the different local directories where the data are stored\n# DO NOT WRITE TO THE 'data' DIRECTORY. MAKE SURE YOU ALWAYS USE YOUR TEAM DIRECTORY YOU CREATED ABOVE TO STORE ANY INTERMEDIATE RESULTS\n\nprimarySetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_v2')\nbasicSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_basic_v2')\ntestSetiDataDir = os.path.join( os.environ['PWD'],'data/seti/simsignals_test_mini')", "metadata": {"collapsed": true}}, {"execution_count": 80, "outputs": [], "cell_type": "code", "source": "# define a function that will take a row from the index file, \n# create a path to the local data file\n# retreive that data file\n# take some action\n\ndef get_data_and_process(row):\n \n \n try:\n uuid, classification = row.split(',')\n except:\n uuid = row #this handles the test data since it doesn't have \"SIGNAL_CLASSIFICATION\" in index file\n classification = 'unknown: test data'\n \n \n #create path to local data file\n filename = uuid + '.dat'\n filepath = os.path.join(workingDataDir, filename)\n \n #retrieve that data file\n rawdata = open(filepath).read()\n \n # take some action\n aca = ibmseti.compamp.SimCompamp(rawdata)\n #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps\n \n #do other work here.\n features = [] ## ?? Or other work you want to do on the file\n ## You can also save results at this point to your local 'my_team_folder'! \n #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:\n # fout.write('stuff')\n \n \n try: #catch exception if using testData because it won't have classification information\n header = aca.header()\n classfromfile = header['signal_classification']\n assert classfromfile == classification #this better match!\n except:\n pass\n \n #return something useful\n return (classification, features)", "metadata": {"collapsed": true}}, {"execution_count": 81, "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 70.3 ms, sys: 14.3 ms, total: 84.6 ms\nWall time: 6.23 s\n"}], "cell_type": "code", "source": "# Choose your own Adventure!\nworkingDataDir = primarySetiDataDir\n\n# we parallelize the index file across our worker executors\nrdd = sc.textFile(primarySmallIndex, 120).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header\n\n# then we have each worker executor perform the actions in our function defined above.\n%time myresults = rdd.map(get_data_and_process).collect()", "metadata": {"collapsed": false}}, {"execution_count": 82, "outputs": [], "cell_type": "code", "source": "# massage the data into a Pandas DF (or we could have directly done this from the Spark RDD :P) and make sure the output makes sense.\nmyresult_classes = map(lambda x: x[0], myresults)", "metadata": {"collapsed": false}}, {"execution_count": 83, "outputs": [], "cell_type": "code", "source": "res = pd.DataFrame(myresult_classes, columns=['class'])", "metadata": {"collapsed": true}}, {"execution_count": 84, "outputs": [{"output_type": "execute_result", "execution_count": 84, "data": {"text/plain": "class\nbrightpixel 1000\nnarrowband 1000\nnarrowbanddrd 1000\nnoise 1000\nsquarepulsednarrowband 1000\nsquiggle 1000\nsquigglesquarepulsednarrowband 1000\ndtype: int64"}, "metadata": {}}], "cell_type": "code", "source": "res.groupby(['class']).size()", "metadata": {"collapsed": false}}, {"execution_count": 85, "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 57.8 ms, sys: 12.5 ms, total: 70.3 ms\nWall time: 1.65 s\n"}], "cell_type": "code", "source": "# We do the same thing, but now with the test data set\n\n# set the workingDataDir appropriately\nworkingDataDir = testSetiDataDir\n\n# parallelize the testSetIndex\nrdd = sc.textFile(testSetIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header\n\n# perform the same process, in this case. You'll probably write a different function to pass the test data into your classifier.\n%time myresults = rdd.map(get_data_and_process).collect()", "metadata": {"collapsed": false}}, {"execution_count": 86, "outputs": [], "cell_type": "code", "source": "myresult_classes = map(lambda x: x[0], myresults)", "metadata": {"collapsed": true}}, {"execution_count": 87, "outputs": [], "cell_type": "code", "source": "res = pd.DataFrame(myresult_classes, columns=['class'])", "metadata": {"collapsed": true}}, {"execution_count": 88, "outputs": [{"output_type": "execute_result", "execution_count": 88, "data": {"text/plain": "class\nunknown: test data 1405\ndtype: int64"}, "metadata": {}}], "cell_type": "code", "source": "res.groupby(['class']).size()", "metadata": {"collapsed": false}}, {"execution_count": 89, "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 71.7 ms, sys: 13.2 ms, total: 84.9 ms\nWall time: 7.17 s\n"}], "cell_type": "code", "source": "# Same thing for the basic4 data set\n\nworkingDataDir = basicSetiDataDir\nrdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header\n%time myresults = rdd.map(get_data_and_process).collect()", "metadata": {"collapsed": false}}, {"execution_count": 90, "outputs": [{"output_type": "execute_result", "execution_count": 90, "data": {"text/plain": "class\nnarrowband 1000\nnarrowbanddrd 1000\nnoise 1000\nsquiggle 1000\ndtype: int64"}, "metadata": {}}], "cell_type": "code", "source": "myresult_classes = map(lambda x: x[0], myresults)\nres = pd.DataFrame(myresult_classes, columns=['class'])\nres.groupby(['class']).size()", "metadata": {"collapsed": false}}, {"metadata": {}, "cell_type": "markdown", "source": "
\n\n## 2. Retrieve data from IBM Object Storage\n\nUse this if you feel there are too many groups pulling from the local file system and things are slow. Data processing should take longer than reading the files from the network to Object Storage, so this should not create a bottleneck in your overall workflow on Spark Enterprise.\n\n#### Basic Same Process As Above\n\nExcept in this case we use data entirely on IBM Object Storage. "}, {"execution_count": 130, "outputs": [], "cell_type": "code", "source": "import os\nimport ibmseti\nimport requests\nimport pandas as pd\n\n### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results\nmydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name\nif os.path.exists(mydatafolder) is False:\n os.makedirs(mydatafolder)\n \n \nbaseswiftURL = 'swift2d://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'\nbasehttpURL = 'https://dal05.objectstorage.service.networklayer.com/v1/AUTH_cdbef52bdf7a449c96936e1071f0a46b'\n\n\nprimarySmallIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_small_1june_2017.csv' )\nprimaryMediumIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_medium_1june_2017.csv')\nprimaryFullIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_v2_full_1june_2017.csv' )\nbasicIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_basic_v2_26may_2017.csv' )\ntestSetIndex = os.path.join( baseswiftURL, 'simsignals_files/public_list_primary_testset_mini_1june_2017.csv' )\n", "metadata": {"collapsed": true}}, {"execution_count": 131, "outputs": [], "cell_type": "code", "source": "primarySetiDataDir = os.path.join( basehttpURL,'simsignals_v2')\nbasicSetiDataDir = os.path.join( basehttpURL,'simsignals_basic_v2')\ntestSetiDataDir = os.path.join( basehttpURL,'simsignals_test_mini')\n\nworkingDataDir = primarySetiDataDir", "metadata": {"collapsed": false}}, {"execution_count": 132, "outputs": [], "cell_type": "code", "source": "import requests", "metadata": {"collapsed": true}}, {"execution_count": 133, "outputs": [], "cell_type": "code", "source": "def get_data_fromOS_and_process(row):\n \n \n try:\n uuid, classification = row.split(',')\n except:\n uuid = row #this handles the test data since it doesn't have \"SIGNAL_CLASSIFICATION\" in index file\n classification = 'unknown: test data'\n \n \n filename = uuid + '.dat'\n \n filepath = os.path.join(workingDataDir, filename)\n \n # We use python requests package to get our data\n \n r = requests.get(filepath, timeout=(3.0, 9.0)) #add a timeout just in case\n \n try:\n r.raise_for_status()\n except:\n return (r.status_code, [])\n \n aca = ibmseti.compamp.SimCompamp(r.content)\n \n #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps\n \n #do other work here.\n features = [] ## ?? Or other work you want to do on the file\n ## You can also save results at this point to your local 'my_team_folder'! \n #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:\n # fout.write('stuff')\n \n \n try: #catch exception if using testData because it won't have classification information\n header = aca.header()\n classfromfile = header['signal_classification']\n assert classfromfile == classification #this better match!\n except:\n pass\n \n #return something to your map function\n return (classification, features)", "metadata": {"collapsed": true}}, {"execution_count": 121, "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 72.1 ms, sys: 18.4 ms, total: 90.5 ms\nWall time: 15.2 s\n"}], "cell_type": "code", "source": "# Grab the Basic Data Set\n\nworkingDataDir = basicSetiDataDir\nrdd = sc.textFile(basicIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header\n#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()\n%time myresults = rdd.map(get_data_fromOS_and_process).collect()", "metadata": {"collapsed": false}}, {"execution_count": 122, "outputs": [{"output_type": "execute_result", "execution_count": 122, "data": {"text/plain": "class\nnarrowband 1000\nnarrowbanddrd 1000\nnoise 1000\nsquiggle 1000\ndtype: int64"}, "metadata": {}}], "cell_type": "code", "source": "myresult_classes = map(lambda x: x[0], myresults)\nres = pd.DataFrame(myresult_classes, columns=['class'])\nres.groupby(['class']).size()", "metadata": {"collapsed": false}}, {"execution_count": 134, "outputs": [{"output_type": "stream", "name": "stdout", "text": "CPU times: user 286 ms, sys: 43.8 ms, total: 329 ms\nWall time: 3min 19s\n"}], "cell_type": "code", "source": "# Reading the entire 70k (20 GB) Primary Medium Data Set over Object Storage only takes 3 minutes!!!\n\nworkingDataDir = primarySetiDataDir\nrdd = sc.textFile(primaryMediumIndex, 120).filter(lambda x: x.startswith('UUID') is False) #this removes the header\n#%time myresults = rdd.map(build_file_urls).map(pull_data).filter(filterFailed).map(loadibmseti).collect()\n%time myresults = rdd.map(get_data_fromOS_and_process).collect()", "metadata": {"collapsed": false}}, {"execution_count": 135, "outputs": [{"output_type": "execute_result", "execution_count": 135, "data": {"text/plain": "class\nbrightpixel 10000\nnarrowband 10000\nnarrowbanddrd 10000\nnoise 10000\nsquarepulsednarrowband 10000\nsquiggle 10000\nsquigglesquarepulsednarrowband 10000\ndtype: int64"}, "metadata": {}}], "cell_type": "code", "source": "myresult_classes = map(lambda x: x[0], myresults)\nres = pd.DataFrame(myresult_classes, columns=['class'])\nres.groupby(['class']).size()", "metadata": {"collapsed": false}}, {"metadata": {}, "cell_type": "markdown", "source": "# Access Data from IBM PowerAI Deep Learning Platform on Nimbix Cloud\n\n\nWe have only one real option in this case because network connection from Nimbix Cloud to IBM Object Storage is not as fast. \n\n## Use data stored in shared Nimbix Cloud Vault: `/data`"}, {"execution_count": 126, "outputs": [], "cell_type": "code", "source": "import os\nimport ibmseti\nimport pandas as pd\n\n# Make sure to initialize Spark if you're going to use it on PowerAI systems\nimport findspark\nfindspark.init()\nimport pyspark\n\nsc = pyspark.SparkContext(appName='seti')\n\n### SET YOUR TEAM NAME HERE! Use this folder to save intermediate results\nmydatafolder = os.path.join( os.environ['PWD'], 'my_data_folder' ) #Change my_data_folder to your team name\nif os.path.exists(mydatafolder) is False:\n os.makedirs(mydatafolder)\n \n## REMEMBER, on Nimbix, your local file space is destroyed when your cloud machine is shut down. So be sure to commit/save your work!\n \n# Index Files are in different location \nprimarySmallIndex = '/data/seti/simsignals_files/public_list_primary_v2_small_1june_2017.csv' \nprimaryMediumIndex = '/data/seti/simsignals_files/public_list_primary_v2_medium_1june_2017.csv'\nbasicIndex = '/data/seti/simsignals_files/public_list_basic_v2_26may_2017.csv' \ntestSetIndex = '/data/seti/simsignals_files/public_list_primary_testset_mini_1june_2017.csv' ", "metadata": {"collapsed": true}}, {"execution_count": 129, "outputs": [], "cell_type": "code", "source": "primarySetiDataDir = '/data/seti/simsignals_v2' #THIS ONLY CONTAINS THE SMALL AND MEDIUM DATA FILES! \n# Ask Adam, Patrick or Joseph on Saturday evening if you want the full data set. Hint: It's in simsignals_v2_full_N, for N=1,2,3,4 \n\nbasicSetiDataDir = '/data/seti/simsignals_basic_v2'\ntestSetiDataDir = '/data/seti/simsignals_test_mini'\n\nworkingDataDir = basicSetiDataDir", "metadata": {"collapsed": true}}, {"execution_count": 128, "outputs": [], "cell_type": "code", "source": "# define a function that will take a row from the index file, \n# create a path to the local data file\n# retreive that data file\n# take some action\n\ndef get_data_and_process(row):\n \n \n try:\n uuid, classification = row.split(',')\n except:\n uuid = row #this handles the test data since it doesn't have \"SIGNAL_CLASSIFICATION\" in index file\n classification = 'unknown: test data'\n \n \n #create path to local data file\n filename = uuid + '.dat'\n filepath = os.path.join(workingDataDir, filename)\n \n #retrieve that data file\n rawdata = open(filepath).read()\n \n # take some action\n aca = ibmseti.compamp.SimCompamp(rawdata)\n #spectrogram = aca.get_spectrogram() # or calculate spectrogram with other methods + signal processing steps\n \n #do work here.\n features = [] ## ?? Or other work you want to do on the file\n ## You could also save results at this point to your local mydatafolder\n #with open( os.path.join(mydatafolder, 'some_results_file'), 'w') as fout:\n # fout.write('stuff')\n \n \n try: #catch exception if using testData because it won't have classification information\n header = aca.header()\n classfromfile = header['signal_classification']\n assert classfromfile == classification #this better match!\n except:\n pass\n \n #return something useful\n return (classification, features)", "metadata": {"collapsed": true}}, {"execution_count": null, "outputs": [], "cell_type": "code", "source": "# we parallelize the index file across our worker executors\nrdd = sc.textFile(primarySmallIndex).filter(lambda x: x.startswith('UUID') is False) #the filter removes the header\n\n# then we have each worker executor perform the actions in our function defined above.\n%time myresults = rdd.map(get_data_and_process).collect()", "metadata": {"collapsed": true}}, {"execution_count": null, "outputs": [], "cell_type": "code", "source": "myresult_classes = map(lambda x: x[0], myresults)\nres = pd.DataFrame(myresult_classes, columns=['class'])\nres.groupby(['class']).size()", "metadata": {"collapsed": true}}], "nbformat": 4}