{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tweetaspike: A Simple Application\n",
"\n",
"Tweetaspike is a simple application that illustrates some key aspects of an Aerospike application design.\n",
"\n",
"This notebook requires the Aerospike Database running locally with Java kernel and Aerospike Java Client. To create a Docker container that satisfies the requirements and holds a copy of Aerospike notebooks, visit the [Aerospike Notebooks Repo](https://github.com/aerospike-examples/interactive-notebooks)."
]
},
{
"cell_type": "markdown",
"metadata": {
"hide_input": false
},
"source": [
"# Use magics to load Aerospike Client from POM"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%loadFromPOM\n",
"\n",
" \n",
" com.aerospike\n",
" aerospike-client\n",
" 5.0.1\n",
" \n",
" \n",
" \n",
" commons-cli\n",
" commons-cli\n",
" 1.2\n",
" \n",
" \n",
" \n",
" log4j\n",
" log4j\n",
" 1.2.17\n",
" \n",
" \n",
" \n",
" com.googlecode.json-simple\n",
" json-simple\n",
" 1.1.1\n",
" \n",
" \n",
" junit\n",
" junit\n",
" 4.4\n",
" \n",
""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import java.io.BufferedReader;\n",
"import java.io.Console;\n",
"import java.io.IOException;\n",
"import java.io.InputStreamReader;\n",
"\n",
"\n",
"public class EclipseConsole {\n",
" Console systemConsole = System.console();\n",
" boolean useSystemConsole = false;\n",
"\n",
" public EclipseConsole(){\n",
" this.useSystemConsole = (this.systemConsole != null);\n",
" }\n",
"\n",
" public void printf(String message){\n",
" if (useSystemConsole)\n",
" systemConsole.printf(message);\n",
" else {\n",
" System.out.printf(message);\n",
" }\n",
" }\n",
"\n",
" public String readLine(){\n",
" if (useSystemConsole)\n",
" return systemConsole.readLine();\n",
" else {\n",
" BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));\n",
" String line = \"\";\n",
" try {\n",
" line = bufferedReader.readLine();\n",
" } catch (IOException e) {\n",
" e.printStackTrace();\n",
" }\n",
" return line;\n",
" }\n",
" }\n",
"\n",
"}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"/*******************************************************************************\n",
" * Copyright 2012-2014 by Aerospike.\n",
" *\n",
" * Permission is hereby granted, free of charge, to any person obtaining a copy\n",
" * of this software and associated documentation files (the \"Software\"), to\n",
" * deal in the Software without restriction, including without limitation the\n",
" * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or\n",
" * sell copies of the Software, and to permit persons to whom the Software is\n",
" * furnished to do so, subject to the following conditions:\n",
" *\n",
" * The above copyright notice and this permission notice shall be included in\n",
" * all copies or substantial portions of the Software.\n",
" *\n",
" * THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n",
" * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n",
" * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n",
" * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n",
" * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING\n",
" * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS\n",
" * IN THE SOFTWARE.\n",
" ******************************************************************************/\n",
"\n",
"\n",
"import java.io.PrintWriter;\n",
"import java.io.StringWriter;\n",
"\n",
"import com.aerospike.client.AerospikeClient;\n",
"import com.aerospike.client.AerospikeException;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.query.IndexType;\n",
"import com.aerospike.client.task.IndexTask;\n",
"\n",
"public class UtilityService {\n",
" private AerospikeClient client;\n",
" private EclipseConsole console = new EclipseConsole();\n",
"\n",
" public UtilityService(AerospikeClient c) {\n",
" this.client = c;\n",
" }\n",
"\n",
" public void createSecondaryIndexes() throws AerospikeException,\n",
" InterruptedException {\n",
"\n",
" // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. The recommended way of creating indexes in production env is via AQL\n",
"\n",
" console.printf(\"\\nCreating secondary index on: set=tweets, bin=username...\\n\");\n",
" IndexTask task1 = client.createIndex(null, \"test\", \"tweets\",\n",
" \"username_index\", \"username\", IndexType.STRING);\n",
" task1.waitTillComplete(100);\n",
" console.printf(\"Done creating secondary index on: set=tweets, bin=username\\n\");\n",
"\n",
" console.printf(\"\\nCreating secondary index on: set=tweets, bin=ts...\\n\");\n",
" IndexTask task2 = client.createIndex(null, \"test\", \"tweets\", \"ts_index\",\n",
" \"ts\", IndexType.NUMERIC);\n",
" task2.waitTillComplete(100);\n",
" console.printf(\"Done creating secondary index on: set=tweets, bin=ts\\n\");\n",
"\n",
" console.printf(\"\\nCreating secondary index on: set=users, bin=tweetcount...\\n\");\n",
" IndexTask task3 = client.createIndex(null, \"test\", \"users\",\n",
" \"tweetcount_index\", \"tweetcount\", IndexType.NUMERIC);\n",
" task3.waitTillComplete(100);\n",
" console.printf(\"Done creating secondary index on: set=users, bin=tweetcount\\n\"); \n",
" }\n",
"\n",
" public static String printStackTrace(Exception ex) {\n",
" StringWriter errors = new StringWriter();\n",
" ex.printStackTrace(new PrintWriter(errors));\n",
" return errors.toString();\n",
" }\n",
"\n",
" /*\n",
" * Example functions not in use\n",
" */\n",
" @SuppressWarnings(\"unused\")\n",
" private void add() throws AerospikeException {\n",
" // Java Add\n",
" Key userKey = new Key(\"test\", \"users\", \"user1234\");\n",
" Bin bin2 = new Bin(\"count\", 3);\n",
" client.add(null, userKey, bin2);\n",
" }\n",
"\n",
" @SuppressWarnings(\"unused\")\n",
" private void append() throws AerospikeException {\n",
" // Java Append\n",
" Key userKey = new Key(\"test\", \"users\", \"user1234\");\n",
" Bin bin1 = new Bin(\"greet\", \"hello\");\n",
" Bin bin2 = new Bin(\"greet\", \" world\");\n",
" client.append(null, userKey, bin2);\n",
" }\n",
"\n",
" @SuppressWarnings(\"unused\")\n",
" private void exists() throws AerospikeException {\n",
" // Java Exists\n",
" Key userKey = new Key(\"test\", \"users\", \"user1234\");\n",
" boolean recordKeyExists = client.exists(null, userKey);\n",
" }\n",
"\n",
" @SuppressWarnings(\"unused\")\n",
" private void touch() throws AerospikeException {\n",
" // Java Touch\n",
" Key userKey = new Key(\"test\", \"users\", \"user1234\");\n",
" client.touch(null, userKey);\n",
" }\n",
"\n",
"}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"/*******************************************************************************\n",
" * Copyright 2012-2014 by Aerospike.\n",
" *\n",
" * Permission is hereby granted, free of charge, to any person obtaining a copy\n",
" * of this software and associated documentation files (the \"Software\"), to\n",
" * deal in the Software without restriction, including without limitation the\n",
" * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or\n",
" * sell copies of the Software, and to permit persons to whom the Software is\n",
" * furnished to do so, subject to the following conditions:\n",
" *\n",
" * The above copyright notice and this permission notice shall be included in\n",
" * all copies or substantial portions of the Software.\n",
" *\n",
" * THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n",
" * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n",
" * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n",
" * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n",
" * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING\n",
" * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS\n",
" * IN THE SOFTWARE.\n",
" ******************************************************************************/\n",
"\n",
"\n",
"import java.io.File;\n",
"import java.util.Random;\n",
"\n",
"import com.aerospike.client.AerospikeClient;\n",
"import com.aerospike.client.AerospikeException;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.Language;\n",
"import com.aerospike.client.Operation;\n",
"import com.aerospike.client.Record;\n",
"import com.aerospike.client.ScanCallback;\n",
"import com.aerospike.client.Value;\n",
"import com.aerospike.client.lua.LuaConfig;\n",
"import com.aerospike.client.policy.Priority;\n",
"import com.aerospike.client.policy.RecordExistsAction;\n",
"import com.aerospike.client.policy.ScanPolicy;\n",
"import com.aerospike.client.policy.WritePolicy;\n",
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.query.RecordSet;\n",
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.task.RegisterTask;\n",
"import com.aerospike.client.query.IndexType;\n",
"import com.aerospike.client.task.IndexTask;\n",
"\n",
"public class TweetService {\n",
" private AerospikeClient client;\n",
" private EclipseConsole console = new EclipseConsole();\n",
"\n",
" public TweetService(AerospikeClient client) {\n",
" this.client = client;\n",
" }\n",
"\n",
" public void createTweet() throws AerospikeException, InterruptedException {\n",
"\n",
" console.printf(\"\\n********** Create Tweet **********\\n\");\n",
"\n",
" ///*********************///\n",
" ///*****Data Model*****///\n",
" //Namespace: test\n",
" //Set: tweets\n",
" //Key: >\n",
" //Bins:\n",
" //tweet - string \n",
" //ts - int (Stores epoch timestamp of the tweet)\n",
" //username - string\n",
" \n",
" //Sample Key: dash:1\n",
" //Sample Record:\n",
" //{ tweet: 'Put. A. Bird. On. It.',\n",
" // ts: 1408574221,\n",
" // username: 'dash'\n",
" //}\n",
" ///*********************///\n",
" \n",
" Record userRecord = null;\n",
" Key userKey = null;\n",
" Key tweetKey = null;\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0) {\n",
" // Check if username exists\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null) {\n",
" int nextTweetCount = Integer.parseInt(userRecord.getValue(\n",
" \"tweetcount\").toString()) + 1;\n",
"\n",
" // Get tweet\n",
" String tweet;\n",
" console.printf(\"Enter tweet for \" + username + \":\");\n",
" tweet = console.readLine();\n",
"\n",
" // Write record\n",
" WritePolicy wPolicy = new WritePolicy();\n",
" wPolicy.sendKey = true;\n",
" wPolicy.recordExistsAction = RecordExistsAction.UPDATE;\n",
"\n",
" // Create timestamp to store along with the tweet so we can\n",
" // query, index and report on it\n",
" long ts = getTimeStamp();\n",
"\n",
" tweetKey = new Key(\"test\", \"tweets\", username + \":\"\n",
" + nextTweetCount);\n",
" Bin bin1 = new Bin(\"tweet\", tweet);\n",
" Bin bin2 = new Bin(\"ts\", ts);\n",
" Bin bin3 = new Bin(\"username\", username);\n",
"\n",
" client.put(wPolicy, tweetKey, bin1, bin2, bin3);\n",
" console.printf(\"\\nINFO: Tweet record created!\\n\");\n",
"\n",
" // Update tweet count and last tweet'd timestamp in the user\n",
" // record\n",
" updateUser(client, userKey, wPolicy, ts, nextTweetCount);\n",
" } else {\n",
" console.printf(\"ERROR: User record not found!\\n\");\n",
" }\n",
" }\n",
" } //createTweet\n",
" \n",
" public void queryTweets() throws AerospikeException {\n",
" queryTweetsByUsername();\n",
" queryUsersByTweetCount();\n",
" } //queryTweets\n",
"\n",
" public void queryTweetsByUsername() throws AerospikeException {\n",
" \n",
" console.printf(\"\\n********** Query Tweets By Username **********\\n\");\n",
" \n",
" RecordSet rs = null;\n",
" try {\n",
"\n",
" // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. \n",
" // NOTE: The recommended way of creating indexes in production env is via AQL.\n",
" IndexTask task = client.createIndex(null, \"test\", \"tweets\",\n",
" \"username_index\", \"username\", IndexType.STRING);\n",
" task.waitTillComplete(100);\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0) {\n",
" String[] bins = { \"tweet\" };\n",
" Statement stmt = new Statement();\n",
" stmt.setNamespace(\"test\");\n",
" stmt.setSetName(\"tweets\");\n",
" stmt.setIndexName(\"username_index\");\n",
" stmt.setBinNames(bins);\n",
" stmt.setFilter(Filter.equal(\"username\", username));\n",
"\n",
" console.printf(\"\\nHere's \" + username + \"'s tweet(s):\\n\");\n",
"\n",
" rs = client.query(null, stmt);\n",
" while (rs.next()) {\n",
" Record r = rs.getRecord();\n",
" console.printf(r.getValue(\"tweet\").toString() + \"\\n\");\n",
" }\n",
" } else {\n",
" console.printf(\"ERROR: User record not found!\\n\");\n",
" }\n",
" } finally {\n",
" if (rs != null) {\n",
" // Close record set\n",
" rs.close();\n",
" }\n",
" }\n",
" } //queryTweetsByUsername\n",
"\n",
" public void queryUsersByTweetCount() throws AerospikeException {\n",
"\n",
" console.printf(\"\\n********** Query Users By Tweet Count Range **********\\n\");\n",
"\n",
" RecordSet rs = null;\n",
" try {\n",
"\n",
" // NOTE: Index creation has been included in here for convenience and to demonstrate the syntax. \n",
" // NOTE: The recommended way of creating indexes in production env is via AQL.\n",
" IndexTask task = client.createIndex(null, \"test\", \"users\",\n",
" \"tweetcount_index\", \"tweetcount\", IndexType.NUMERIC);\n",
" task.waitTillComplete(100);\n",
"\n",
" // Get min and max tweet counts\n",
" int min;\n",
" int max;\n",
" console.printf(\"\\nEnter Min Tweet Count:\");\n",
" min = Integer.parseInt(console.readLine());\n",
" console.printf(\"Enter Max Tweet Count:\");\n",
" max = Integer.parseInt(console.readLine());\n",
"\n",
" console.printf(\"\\nList of users with \" + min + \"-\" + max\n",
" + \" tweets:\\n\");\n",
"\n",
" String[] bins = { \"username\", \"tweetcount\", \"gender\" };\n",
" Statement stmt = new Statement();\n",
" stmt.setNamespace(\"test\");\n",
" stmt.setSetName(\"users\");\n",
" stmt.setBinNames(bins);\n",
" stmt.setFilter(Filter.range(\"tweetcount\", min, max));\n",
"\n",
" rs = client.query(null, stmt);\n",
" while (rs.next()) {\n",
" Record r = rs.getRecord();\n",
" console.printf(r.getValue(\"username\") + \" has \"\n",
" + r.getValue(\"tweetcount\") + \" tweets\\n\");\n",
" }\n",
" } finally {\n",
" if (rs != null) {\n",
" // Close record set\n",
" rs.close();\n",
" }\n",
" }\n",
" } //queryUsersByTweetCount\n",
" \n",
" public void scanSomeTweetsForSomeUsers() {\n",
" try {\n",
" // Java Scan\n",
" ScanPolicy policy = new ScanPolicy();\n",
" policy.concurrentNodes = true;\n",
" policy.priority = Priority.LOW;\n",
" policy.includeBinData = true;\n",
" policy.maxRecords = 100;\n",
" policy.sendKey = true;\n",
" client.scanAll(policy, \"test\", \"tweets\", new ScanCallback() {\n",
"\n",
" @Override\n",
" public void scanCallback(Key key, Record record)\n",
" throws AerospikeException {\n",
" console.printf(key.toString() + \" => \");\n",
" \n",
" console.printf(record.getValue(\"username\") + \" \");\n",
" console.printf(record.getValue(\"tweet\") + \"\\n\");\n",
" \n",
"\n",
" }\n",
" }, \"tweet\");\n",
" } catch (AerospikeException e) {\n",
" System.out.println(\"EXCEPTION - Message: \" + e.getMessage());\n",
" System.out.println(\"EXCEPTION - StackTrace: \"\n",
" + UtilityService.printStackTrace(e));\n",
" }\n",
" } //scanSomeTweetsForSomeUsers\n",
"\n",
" private void updateUser(AerospikeClient client, Key userKey,\n",
" WritePolicy policy, long ts, int tweetCount) throws AerospikeException, InterruptedException {\n",
"\n",
" client.put(policy, userKey, new Bin(\"tweetcount\", tweetCount), new Bin(\"lasttweeted\", ts));\n",
" console.printf(\"\\nINFO: The tweet count now is: \" + tweetCount);\n",
" } //updateUser\n",
"\n",
" @SuppressWarnings(\"unused\")\n",
" private void updateUserUsingOperate(AerospikeClient client, Key userKey,\n",
" WritePolicy policy, long ts) throws AerospikeException {\n",
" \n",
" Record record = client.operate(policy, userKey,\n",
" Operation.add(new Bin(\"tweetcount\", 1)),\n",
" Operation.put(new Bin(\"lasttweeted\", ts)),\n",
" Operation.get());\n",
" \n",
" console.printf(\"\\nINFO: The tweet count now is: \" + record.getValue(\"tweetcount\"));\n",
" } //updateUserUsingOperate\n",
"\n",
" public void createTweets() throws AerospikeException {\n",
" String[] randomTweets = {\n",
" \"For just $1 you get a half price download of half of the song and listen to it just once.\",\n",
" \"People tell me my body looks like a melted candle\",\n",
" \"Come on movie! Make it start!\", \"Byaaaayy\",\n",
" \"Please, please, win! Meow, meow, meow!\",\n",
" \"Put. A. Bird. On. It.\",\n",
" \"A weekend wasted is a weekend well spent\",\n",
" \"Would you like to super spike your meal?\",\n",
" \"We have a mean no-no-bring-bag up here on aisle two.\",\n",
" \"SEEK: See, Every, EVERY, Kind... of spot\",\n",
" \"We can order that for you. It will take a year to get there.\",\n",
" \"If you are pregnant, have a soda.\",\n",
" \"Hear that snap? Hear that clap?\",\n",
" \"Follow me and I may follow you\",\n",
" \"Which is the best cafe in Portland? Discuss...\",\n",
" \"Portland Coffee is for closers!\",\n",
" \"Lets get this party started!\",\n",
" \"How about them portland blazers!\", \"You got school'd, yo\",\n",
" \"I love animals\", \"I love my dog\", \"What's up Portland\",\n",
" \"Which is the best cafe in Portland? Discuss...\",\n",
" \"I dont always tweet, but when I do it is on Tweetaspike\" };\n",
" Random rnd1 = new Random();\n",
" Random rnd2 = new Random();\n",
" Random rnd3 = new Random();\n",
" Key userKey;\n",
" Record userRecord;\n",
" int totalUsers = 10000;\n",
" int maxTweets = 20;\n",
" String username;\n",
" long ts = 0;\n",
" \n",
" WritePolicy wPolicy = new WritePolicy();\n",
" wPolicy.recordExistsAction = RecordExistsAction.UPDATE;\n",
"\n",
" console.printf(\"\\nCreate up to \" + maxTweets + \" tweets each for \"\n",
" + totalUsers + \" users.\\n\");\n",
" // console.readLine();\n",
"\n",
" for (int j = 0; j < totalUsers; j++) {\n",
" // Check if user record exists\n",
" username = \"user\" + rnd3.nextInt(100000);\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null) {\n",
" // create up to maxTweets random tweets for this user\n",
" int totalTweets = rnd1.nextInt(maxTweets);\n",
" for (int k = 1; k <= totalTweets; k++) {\n",
" // Create timestamp to store along with the tweet so we can\n",
" // query, index and report on it\n",
" ts = getTimeStamp();\n",
" Key tweetKey = new Key(\"test\", \"tweets\", username + \":\" + k);\n",
" Bin bin1 = new Bin(\"tweet\",\n",
" randomTweets[rnd2.nextInt(randomTweets.length)]);\n",
" Bin bin2 = new Bin(\"ts\", ts);\n",
" Bin bin3 = new Bin(\"username\", username);\n",
"\n",
" client.put(wPolicy, tweetKey, bin1, bin2, bin3);\n",
" }\n",
" if (totalTweets > 0) {\n",
" client.put(wPolicy, userKey, new Bin(\"tweetcount\", totalTweets), new Bin(\"lasttweeted\", ts));\n",
" }\n",
" }\n",
" }\n",
" console.printf(\"\\n\\nDone creating up to \" + maxTweets\n",
" + \" tweets each for \" + totalUsers + \" users!\\n\");\n",
" } //createTweets\n",
"\n",
" private long getTimeStamp() {\n",
" return System.currentTimeMillis();\n",
" } //getTimeStamp\n",
"\n",
"}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"/*******************************************************************************\n",
" * Copyright 2012-2014 by Aerospike.\n",
" *\n",
" * Permission is hereby granted, free of charge, to any person obtaining a copy\n",
" * of this software and associated documentation files (the \"Software\"), to\n",
" * deal in the Software without restriction, including without limitation the\n",
" * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or\n",
" * sell copies of the Software, and to permit persons to whom the Software is\n",
" * furnished to do so, subject to the following conditions:\n",
" *\n",
" * The above copyright notice and this permission notice shall be included in\n",
" * all copies or substantial portions of the Software.\n",
" *\n",
" * THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n",
" * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n",
" * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n",
" * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n",
" * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING\n",
" * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS\n",
" * IN THE SOFTWARE.\n",
" ******************************************************************************/\n",
"\n",
"\n",
"import java.io.File;\n",
"import java.util.ArrayList;\n",
"import java.util.Arrays;\n",
"import java.util.Map;\n",
"import java.util.Random;\n",
"\n",
"import com.aerospike.client.AerospikeClient;\n",
"import com.aerospike.client.AerospikeException;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.Language;\n",
"import com.aerospike.client.Record;\n",
"import com.aerospike.client.Value;\n",
"import com.aerospike.client.lua.LuaConfig;\n",
"import com.aerospike.client.policy.GenerationPolicy;\n",
"import com.aerospike.client.policy.BatchPolicy;\n",
"import com.aerospike.client.policy.RecordExistsAction;\n",
"import com.aerospike.client.policy.WritePolicy;\n",
"import com.aerospike.client.query.Filter;\n",
"import com.aerospike.client.query.ResultSet;\n",
"import com.aerospike.client.query.Statement;\n",
"import com.aerospike.client.task.RegisterTask;\n",
"\n",
"public class UserService {\n",
" private AerospikeClient client;\n",
" private EclipseConsole console = new EclipseConsole();\n",
"\n",
" public UserService(AerospikeClient client) {\n",
" this.client = client;\n",
" }\n",
"\n",
" public void createUser() throws AerospikeException {\n",
" console.printf(\"\\n********** Create User **********\\n\");\n",
"\n",
" ///*********************///\n",
" ///*****Data Model*****///\n",
" //Namespace: test\n",
" //Set: users\n",
" //Key: \n",
" //Bins:\n",
" //username - String\n",
" //password - String (For simplicity password is stored in plain-text)\n",
" //gender - String (Valid values are 'm' or 'f')\n",
" //region - String (Valid values are: 'n' (North), 's' (South), 'e' (East), 'w' (West) -- to keep data entry to minimal we just store the first letter)\n",
" //lasttweeted - int (Stores epoch timestamp of the last/most recent tweet) -- Default to 0\n",
" //tweetcount - int (Stores total number of tweets for the user) -- Default to 0\n",
" //interests - Array of interests\n",
"\n",
" //Sample Key: dash\n",
" //Sample Record:\n",
" //{ username: 'dash',\n",
" // password: 'dash',\n",
" // gender: 'm',\n",
" // region: 'w',\n",
" // lasttweeted: 1408574221,\n",
" // tweetcount: 20,\n",
" // interests: ['photography', 'technology', 'dancing', 'house music]\n",
" //}\n",
" ///*********************///\n",
"\n",
" String username;\n",
" String password;\n",
" String gender;\n",
" String region;\n",
" String interests;\n",
"\n",
" // Get username\n",
" console.printf(\"Enter username: \");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0) {\n",
" // Get password\n",
" console.printf(\"Enter password for \" + username + \":\");\n",
" password = console.readLine();\n",
"\n",
" // Get gender\n",
" console.printf(\"Select gender (f or m) for \" + username + \":\");\n",
" gender = console.readLine().substring(0, 1);\n",
"\n",
" // Get region\n",
" console.printf(\"Select region (north, south, east or west) for \"\n",
" + username + \":\");\n",
" region = console.readLine().substring(0, 1);\n",
"\n",
" // Get interests\n",
" console.printf(\"Enter comma-separated interests for \" + username + \":\");\n",
" interests = console.readLine();\n",
"\n",
" // Write record\n",
" WritePolicy wPolicy = new WritePolicy();\n",
" wPolicy.recordExistsAction = RecordExistsAction.UPDATE;\n",
"\n",
" Key key = new Key(\"test\", \"users\", username);\n",
" Bin bin1 = new Bin(\"username\", username);\n",
" Bin bin2 = new Bin(\"password\", password);\n",
" Bin bin3 = new Bin(\"gender\", gender);\n",
" Bin bin4 = new Bin(\"region\", region);\n",
" Bin bin5 = new Bin(\"lasttweeted\", 0);\n",
" Bin bin6 = new Bin(\"tweetcount\", 0);\n",
" Bin bin7 = new Bin(\"interests\", Arrays.asList(interests.split(\",\")));\n",
"\n",
" client.put(wPolicy, key, bin1, bin2, bin3, bin4, bin5, bin6, bin7);\n",
"\n",
" console.printf(\"\\nINFO: User record created!\");\n",
" }\n",
" } //createUser\n",
"\n",
" public void getUser() throws AerospikeException {\n",
" Record userRecord = null;\n",
" Key userKey = null;\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0) {\n",
" // Check if username exists\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null) {\n",
" console.printf(\"\\nINFO: User record read successfully! Here are the details:\\n\");\n",
" console.printf(\"username: \" + userRecord.getValue(\"username\")\n",
" + \"\\n\");\n",
" console.printf(\"password: \" + userRecord.getValue(\"password\")\n",
" + \"\\n\");\n",
" console.printf(\"gender: \" + userRecord.getValue(\"gender\") + \"\\n\");\n",
" console.printf(\"region: \" + userRecord.getValue(\"region\") + \"\\n\");\n",
" console.printf(\"tweetcount: \" + userRecord.getValue(\"tweetcount\") + \"\\n\");\n",
" console.printf(\"interests: \" + userRecord.getValue(\"interests\") + \"\\n\");\n",
" } else {\n",
" console.printf(\"ERROR: User record not found!\\n\");\n",
" }\n",
" } else {\n",
" console.printf(\"ERROR: User record not found!\\n\");\n",
" }\n",
" } //getUser\n",
"\n",
" public void updatePasswordUsingUDF() throws AerospikeException\n",
" {\n",
" Record userRecord = null;\n",
" Key userKey = null;\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0)\n",
" {\n",
" // Check if username exists\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null)\n",
" {\n",
" // Get new password\n",
" String password;\n",
" console.printf(\"Enter new password for \" + username + \":\");\n",
" password = console.readLine();\n",
"\n",
" // NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL\n",
" LuaConfig.SourceDirectory = \"udf\";\n",
" File udfFile = new File(\"udf/updateUserPwd.lua\");\n",
"\n",
" RegisterTask rt = client.register(null, udfFile.getPath(),\n",
" udfFile.getName(), Language.LUA);\n",
" rt.waitTillComplete(100);\n",
"\n",
" String updatedPassword = client.execute(null, userKey, \"updateUserPwd\", \"updatePassword\", Value.get(password)).toString();\n",
" console.printf(\"\\nINFO: The password has been set to: \" + updatedPassword);\n",
" }\n",
" else\n",
" {\n",
" console.printf(\"ERROR: User record not found!\");\n",
" }\n",
" }\n",
" else\n",
" {\n",
" console.printf(\"ERROR: User record not found!\");\n",
" }\n",
" } //updatePasswordUsingUDF\n",
"\n",
" public void updatePasswordUsingCAS() throws AerospikeException\n",
" {\n",
" Record userRecord = null;\n",
" Key userKey = null;\n",
" Bin passwordBin = null;\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0)\n",
" {\n",
" // Check if username exists\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null)\n",
" {\n",
" // Get new password\n",
" String password;\n",
" console.printf(\"Enter new password for \" + username + \":\");\n",
" password = console.readLine();\n",
"\n",
" WritePolicy writePolicy = new WritePolicy();\n",
" // record generation\n",
" writePolicy.generation = userRecord.generation;\n",
" writePolicy.generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;\n",
" // password Bin\n",
" passwordBin = new Bin(\"password\", Value.get(password));\n",
" client.put(writePolicy, userKey, passwordBin);\n",
"\n",
" console.printf(\"\\nINFO: The password has been set to: \" + password);\n",
" }\n",
" else\n",
" {\n",
" console.printf(\"ERROR: User record not found!\");\n",
" }\n",
" }\n",
" else\n",
" {\n",
" console.printf(\"ERROR: User record not found!\");\n",
" }\n",
" } //updatePasswordUsingCAS\n",
"\n",
" public void batchGetUserTweets() throws AerospikeException {\n",
"\n",
" Record userRecord = null;\n",
" Key userKey = null;\n",
"\n",
" // Get username\n",
" String username;\n",
" console.printf(\"\\nEnter username:\");\n",
" username = console.readLine();\n",
"\n",
" if (username != null && username.length() > 0) {\n",
" // Check if username exists\n",
" userKey = new Key(\"test\", \"users\", username);\n",
" userRecord = client.get(null, userKey);\n",
" if (userRecord != null) {\n",
" // Get how many tweets the user has\n",
" int tweetCount = userRecord.getInt(\"tweetcount\");\n",
"\n",
"\n",
" // Create an array of keys so we can initiate batch read\n",
" // operation\n",
" Key[] keys = new Key[tweetCount];\n",
" for (int i = 0; i < keys.length; i++) {\n",
" keys[i] = new Key(\"test\", \"tweets\",\n",
" (username + \":\" + (i + 1)));\n",
" }\n",
"\n",
" console.printf(\"\\nHere's \" + username + \"'s tweet(s):\\n\");\n",
"\n",
" // Initiate batch read operation\n",
" if (keys.length > 0){\n",
" Record[] records = client.get(new BatchPolicy(), keys);\n",
" for (int j = 0; j < records.length; j++) {\n",
" console.printf(records[j].getValue(\"tweet\").toString() + \"\\n\");\n",
" }\n",
" }\n",
" }\n",
" } else {\n",
" console.printf(\"ERROR: User record not found!\\n\");\n",
" }\n",
" } //batchGetUserTweets\n",
"\n",
" @SuppressWarnings(\"unchecked\")\n",
" public void aggregateUsersByTweetCountByRegion() throws AerospikeException,\n",
" InterruptedException {\n",
" ResultSet rs = null;\n",
" try {\n",
" int min;\n",
" int max;\n",
" console.printf(\"\\nEnter Min Tweet Count:\");\n",
" min = Integer.parseInt(console.readLine());\n",
" console.printf(\"Enter Max Tweet Count:\");\n",
" max = Integer.parseInt(console.readLine());\n",
"\n",
" console.printf(\"\\nAggregating users with \" + min + \"-\"\n",
" + max + \" tweets by region. Hang on...\\n\");\n",
"\n",
" // NOTE: UDF registration has been included here for convenience and to demonstrate the syntax. The recommended way of registering UDFs in production env is via AQL\n",
" LuaConfig.SourceDirectory = \"udf\";\n",
" File udfFile = new File(\"udf/aggregationByRegion.lua\");\n",
"\n",
" RegisterTask rt = client.register(null, udfFile.getPath(),\n",
" udfFile.getName(), Language.LUA);\n",
" rt.waitTillComplete(100);\n",
"\n",
" String[] bins = { \"tweetcount\", \"region\" };\n",
" Statement stmt = new Statement();\n",
" stmt.setNamespace(\"test\");\n",
" stmt.setSetName(\"users\");\n",
" stmt.setIndexName(\"tweetcount_index\");\n",
" stmt.setBinNames(bins);\n",
" stmt.setFilter(Filter.range(\"tweetcount\", min, max));\n",
"\n",
" rs = client.queryAggregate(null, stmt, \"aggregationByRegion\", \"sum\");\n",
"\n",
" if (rs.next()) {\n",
" Map