{ "cells": [ { "cell_type": "markdown", "metadata": { "toc": true }, "source": [ "
\n",
"EventPolicy eventPolicy = EventPolicy();\n",
"final CommandsPerEventLoop = 50;\n",
"eventPolicy.maxCommandsInProcess = commandsPerEventLoop;\n",
"\n",
"- Select delay queue buffer size in front of the event loop. \n",
"\n",
"maxCommandsInQueue = 50;\n",
"eventPolicy.maxCommandsInQueue = maxCommandsInQueue;\n",
"\n",
"- Create event loops object.\n",
"\n",
"// here we use direct nio and 2 events loops\n",
"numLoops = 2;\n",
"EventLoops eventLoops = new NioEventLoops(eventPolicy, numLoops);\n",
"\n",
"\n",
"In the following cell, the function InitializeEventLoops allows initialization of different types of event loops. The function will be called multiple times later in the notebook to experiment with different settings."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Event loops initialized with num-loops: 2, commands-per-event-loop: 50, delay-queue-size: 50.\n"
]
}
],
"source": [
"import com.aerospike.client.async.EventPolicy;\n",
"import io.netty.channel.nio.NioEventLoopGroup;\n",
"import io.netty.channel.epoll.EpollEventLoopGroup;\n",
"import com.aerospike.client.async.NettyEventLoops;\n",
"import com.aerospike.client.async.EventLoops;\n",
"import com.aerospike.client.async.NioEventLoops;\n",
"\n",
"enum EventLoopType{DIRECT_NIO, NETTY_NIO, NETTY_EPOLL};\n",
"\n",
"// a function to create event loops with specified parameters\n",
"EventLoops InitializeEventLoops(EventLoopType eventLoopType, int numLoops, int commandsPerEventLoop, \n",
" int maxCommandsInQueue) {\n",
" EventPolicy eventPolicy = new EventPolicy();\n",
" eventPolicy.maxCommandsInProcess = commandsPerEventLoop;\n",
" eventPolicy.maxCommandsInQueue = maxCommandsInQueue;\n",
" EventLoops eventLoops = null;\n",
" switch(eventLoopType) {\n",
" case DIRECT_NIO:\n",
" eventLoops = new NioEventLoops(eventPolicy, numLoops);\n",
" break;\n",
" case NETTY_NIO:\n",
" NioEventLoopGroup nioGroup = new NioEventLoopGroup(numLoops);\n",
" eventLoops = new NettyEventLoops(eventPolicy, nioGroup);\n",
" break;\n",
" case NETTY_EPOLL:\n",
" EpollEventLoopGroup epollGroup = new EpollEventLoopGroup(numLoops);\n",
" eventLoops = new NettyEventLoops(eventPolicy, epollGroup);\n",
" break;\n",
" default:\n",
" System.out.println(\"Error: Invalid event loop type\");\n",
" }\n",
" return eventLoops;\n",
"}\n",
"\n",
"// initialize event loops \n",
"final int NumLoops = 2;\n",
"final int CommandsPerEventLoop = 50;\n",
"final int DelayQueueSize = 50;\n",
"\n",
"EventLoops eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, NumLoops, CommandsPerEventLoop, DelayQueueSize);\n",
"\n",
"System.out.format(\"Event loops initialized with num-loops: %s, commands-per-event-loop: %s, delay-queue-size: %s.\\n\",\n",
" NumLoops, CommandsPerEventLoop, DelayQueueSize);;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize Client\n",
"Examine the code snippets below.\n",
"\n",
"- Initialize client policy with event loops.\n",
"\n",
"ClientPolicy policy = new ClientPolicy();\n",
"clientPolicy.eventLoops = eventLoops;\n",
"\n",
" - Set total concurrent connections per node by multiplying concurrency level at event loop (maxCommandsInProcess) by the number of event loops. \n",
"\n",
"concurrentMax = commandsPerEventLoop * numLoops;\n",
"\n",
" - This is the max number of commands or requests per node if all requests go to one node. Adjust the default connection pool size of 300 if concurrentMax is larger.\n",
"\n",
"if (clientPolicy.maxConnsPerNode < concurrentMax) {\n",
" clientPolicy.maxConnsPerNode = concurrentMax; \n",
"}\n",
"\n",
"- Initialize the client with the client policy and seed hosts in cluster.\n",
"\n",
"Host[] hosts = Host.parseHosts(\"localhost\", 3000);\n",
"AerospikeClient client = new AerospikeClient(clientPolicy, hosts);\n",
"\n",
"\n",
"In the following cell, the function InitializeClient allows initialization of the client with specified parameters. "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Client initialized.\n"
]
}
],
"source": [
"import com.aerospike.client.policy.ClientPolicy;\n",
"import com.aerospike.client.Host;\n",
"import com.aerospike.client.AerospikeClient;\n",
"\n",
"// a function to initialize the client with specified parameters\n",
"AerospikeClient InitializeClient(EventLoops eventLoops, int numLoops, int commandsPerEventLoop, Host[] hosts) {\n",
" ClientPolicy clientPolicy = new ClientPolicy();\n",
" clientPolicy.eventLoops = eventLoops;\n",
" int concurrentMax = commandsPerEventLoop * numLoops;\n",
" if (clientPolicy.maxConnsPerNode < concurrentMax) {\n",
" clientPolicy.maxConnsPerNode = concurrentMax; \n",
" }\n",
" AerospikeClient client = new AerospikeClient(clientPolicy, hosts);\n",
" return client;\n",
"}\n",
"\n",
"// initialize the client \n",
"Host[] hosts = Host.parseHosts(\"localhost\", 3000); \n",
"\n",
"AerospikeClient client = InitializeClient(eventLoops, NumLoops, CommandsPerEventLoop, hosts);\n",
"\n",
"System.out.print(\"Client initialized.\\n\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize event loop throttles and atomic operation count.\n",
"The event loop throttles object is initialized with the number of event loops and commands per event loop. It provides two methods \"waitForSlot\" and \"addSlot\" to manage concurrency for an event loop, both take an index parameter that identifies the event loop. \n",
"\n",
"throttles = new Throttles(numLoops, commandsPerEventLoop);\n",
"\n",
"\n",
"The operation count is used to track the number of finished operations. Because multiple callback threads access and increment it concurrently, it is defined as an AtomicInteger, which has support for atomic operation get/increment operations.\n",
"\n",
"AtomicInteger asyncOpCount = new AtomicInteger();\n",
"\n",
"In the following cell, the function InitializeThrottles creates throttles for event loops with specified parameters. "
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Throttles initialized for 2 loops with 50 concurrent operations per loop.\n",
"Atomic operation count initialized."
]
}
],
"source": [
"import com.aerospike.client.async.Throttles;\n",
"\n",
"// creates event loop throttles with specified parameters\n",
"Throttles InitializeThrottles(int numLoops, int commandsPerEventLoop) {\n",
" Throttles throttles = new Throttles(numLoops, commandsPerEventLoop);\n",
" return throttles;\n",
"}\n",
"\n",
"// initialize event loop throttles\n",
"Throttles throttles = InitializeThrottles(NumLoops, CommandsPerEventLoop);\n",
"System.out.format(\"Throttles initialized for %s loops with %s concurrent operations per loop.\\n\", \n",
" NumLoops, CommandsPerEventLoop);\n",
"\n",
"// initialize the atomic integer to keep track of async operations count\n",
"import java.util.concurrent.atomic.AtomicInteger;\n",
"AtomicInteger asyncOpCount = new AtomicInteger();\n",
"System.out.format(\"Atomic operation count initialized.\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define Listener and Handlers\n",
"Define the listener with success and failure handlers to process results. Below, we have MyWriteListener derived from WriteListener to process insertion of records that: \n",
"- implements success and failure handlers\n",
"- releases a slot in the event loop on success or failure for another insert to proceed\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
"- signals completion through monitor on failure or when the write count reaches the expected final count\n",
" monitor.notifyComplete();\n",
"- prints progress every \"progressFreq\" records"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Write listener defined."
]
}
],
"source": [
"import com.aerospike.client.Key;\n",
"import com.aerospike.client.listener.WriteListener;\n",
"import com.aerospike.client.async.Monitor;\n",
"import com.aerospike.client.AerospikeException;\n",
"\n",
"// write listener \n",
"// - implements success and failure handlers\n",
"// - releases a slot on success or failure for another insert to proceed\n",
"// - signals completion through monitor on failure or when the write count reaches the expected final count\n",
"// - prints progress every \"progressFreq\" records*/\n",
"\n",
"class MyWriteListener implements WriteListener {\n",
" private final Key key;\n",
" private final int eventLoopIndex;\n",
" private final int finalCount;\n",
" private Monitor monitor; \n",
" private final int progressFreq;\n",
" \n",
" public MyWriteListener(Key key, int eventLoopIndex, int finalCount, Monitor monitor, int progressFreq) {\n",
" this.key = key;\n",
" this.eventLoopIndex = eventLoopIndex;\n",
" this.finalCount = finalCount;\n",
" this.monitor = monitor;\n",
" this.progressFreq = progressFreq;\n",
" }\n",
"\n",
" // Write success callback.\n",
" public void onSuccess(Key key) {\n",
" // Write succeeded.\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" int currentCount = asyncOpCount.incrementAndGet();\n",
" if ( progressFreq > 0 && currentCount % progressFreq == 0) {\n",
" System.out.format(\"Inserted %s records.\\n\", currentCount); \n",
" }\n",
" if (currentCount == finalCount) {\n",
" monitor.notifyComplete();\n",
" }\n",
" }\n",
"\n",
" // Error callback.\n",
" public void onFailure(AerospikeException e) {\n",
" System.out.format(\"Put failed: namespace=%s set=%s key=%s exception=%s\\n\", \n",
" key.namespace, key.setName, key.userKey, e.getMessage());\n",
" monitor.notifyComplete();\n",
" }\n",
"}\n",
"\n",
"System.out.print(\"Write listener defined.\"); "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit Async Requests Using Throttling\n",
"While submitting async requests it is important to keep below the planned concurrent capacity using throttling.\n",
"\n",
"The function InsertRecords below inserts the specified number of records asynchronously with id-\\\n",
"if (throttles.waitForSlot(eventLoopIndex, 1)) {\n",
" // submit async request\n",
"}\n",
"\n",
"\n",
"After submitting all requests, the main thread must wait for outstanding requests to complete before closing.\n",
"\n",
" monitor.waitTillComplete();\n",
""
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Inserted 25000 records.\n",
"Inserted 50000 records.\n",
"Inserted 75000 records.\n",
"Inserted 100000 records.\n",
"Inserted 100000 records with 2 event-loops and 50 commands-per-loop in 4212 milliseconds.\n"
]
}
],
"source": [
"import java.util.concurrent.TimeUnit;\n",
"import com.aerospike.client.Bin;\n",
"import com.aerospike.client.policy.WritePolicy;\n",
"import com.aerospike.client.async.EventLoop;\n",
"\n",
"long InsertRecords(int numRecords, EventLoops eventLoops, Throttles throttles, int progressFreq) {\n",
" long startTime = System.nanoTime();\n",
" Monitor monitor = new Monitor();\n",
" asyncOpCount.set(0);\n",
" WritePolicy policy = new WritePolicy();\n",
"\n",
" for (int i = 0; i < numRecords; i++) {\n",
" Key key = new Key(Namespace, Set, \"id-\"+i);\n",
" Bin bin1 = new Bin(new String(\"bin1\"), i);\n",
" Bin bin2 = new Bin(new String(\"bin2\"), numRecords*10+i);\n",
" EventLoop eventLoop = eventLoops.next();\n",
" int eventLoopIndex = eventLoop.getIndex();\n",
"\n",
" if (throttles.waitForSlot(eventLoopIndex, 1)) {\n",
" try {\n",
" client.put(eventLoop, new MyWriteListener(key, eventLoopIndex, numRecords, monitor, progressFreq), \n",
" policy, key, bin1, bin2);\n",
" }\n",
" catch (Exception e) {\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" }\n",
" }\n",
" }\n",
" monitor.waitTillComplete(); \n",
" long endTime = System.nanoTime();\n",
" return (endTime - startTime);\n",
"}\n",
"\n",
"final int NumRecords = 100000;\n",
"\n",
"long elapsedTime = InsertRecords(NumRecords, eventLoops, throttles, NumRecords/4);\n",
"\n",
"System.out.format(\"Inserted %s records with %s event-loops and %s commands-per-loop in %s milliseconds.\\n\", \n",
" NumRecords, NumLoops, CommandsPerEventLoop, elapsedTime/1000000);;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Closing\n",
"Both AerospikeClient and EventLoops should be closed before program shutdown. The latest client waits for pending async commands to finish before performing the actual close, so there is no need to externally track pending async commands. Earlier versions provide a waitToComplete() call on Monitor object to ensure async operations are completed. The Cleanup function implemented above truncates the database and closes client and event-loops."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removed data and closed client and event loops.\n"
]
}
],
"source": [
"// truncates database and closes client and event-loops\n",
"Cleanup();\n",
"System.out.println(\"Removed data and closed client and event loops.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Nested and Inline Async Operations\n",
"It is possible to nest a series of async calls, one in the processing logic of another. Some simple examples of such cascaded calls are:\n",
"- Retry the same operation in the failure handler\n",
"- Issue an async read to validate an async write operation\n",
"- Issue an async write to update a record retrieved from an async read operation.\n",
"\n",
"The following code illustrates a simplistic example of how each record retrieved from an async filtered scan is updated asynchronously by incrementing the value of bin2. Note the inline implementation of WriteListener. The scan filter selects records between bin1 values of 1 and 1000. Throttling and progress report are also present as described above."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import com.aerospike.client.policy.ScanPolicy;\n",
"import com.aerospike.client.listener.RecordSequenceListener;\n",
"import com.aerospike.client.Record;\n",
"import com.aerospike.client.exp.Exp;\n",
"\n",
"// Scan callback\n",
"class ScanRecordSequenceListener implements RecordSequenceListener {\n",
" private EventLoops eventLoops;\n",
" private Throttles throttles;\n",
" private Monitor scanMonitor;\n",
" private AtomicInteger writeCount = new AtomicInteger();\n",
" private int scanCount = 0; \n",
" private final int progressFreq;\n",
" \n",
" public ScanRecordSequenceListener(EventLoops eventLoops, Throttles throttles, Monitor scanMonitor, \n",
" int progressFreq) {\n",
" this.eventLoops = eventLoops;\n",
" this.throttles = throttles;\n",
" this.scanMonitor = scanMonitor;\n",
" this.progressFreq = progressFreq;\n",
" }\n",
" \n",
" public void onRecord(Key key, Record record) throws AerospikeException {\n",
" ++scanCount;\n",
" if ( progressFreq > 0 && scanCount % progressFreq == 0) {\n",
" System.out.format(\"Scan returned %s records.\\n\", scanCount); \n",
" }\n",
" // submit async update operation with throttle\n",
" EventLoop eventLoop = eventLoops.next();\n",
" int eventLoopIndex = eventLoop.getIndex();\n",
" if (throttles.waitForSlot(eventLoopIndex, 1)) { // throttle by waiting for an available slot\n",
" try {\n",
" WritePolicy policy = new WritePolicy();\n",
" Bin bin2 = new Bin(new String(\"bin2\"), 1);\n",
" \n",
" client.add(eventLoop, new WriteListener() { // inline write listener\n",
" \n",
" public void onSuccess(final Key key) {\n",
" // Write succeeded.\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" int currentCount = writeCount.incrementAndGet();\n",
" if ( progressFreq > 0 && currentCount % progressFreq == 0) {\n",
" System.out.format(\"Processed %s records.\\n\", currentCount); \n",
" }\n",
" }\n",
"\n",
" public void onFailure(AerospikeException e) {\n",
" System.out.format(\"Put failed: namespace=%s set=%s key=%s exception=%s\\n\", \n",
" key.namespace, key.setName, key.userKey, e.getMessage());\n",
" throttles.addSlot(eventLoopIndex, 1);\n",
" int currentCount = writeCount.incrementAndGet();\n",
" if ( progressFreq > 0 && currentCount % progressFreq == 0) {\n",
" System.out.format(\"Processed %s records.\\n\", currentCount); \n",
" }\n",
" }\n",
" }, \n",
" policy, key, bin2);\n",
" }\n",
" catch (Exception e) {\n",
" System.out.format(\"Error: exception in write listener - %s\", e.getMessage());\n",
" }\n",
" }\n",
" }\n",
"\n",
" public void onSuccess() {\n",
" if (scanCount != writeCount.get()) { // give the last write some time to finish\n",
" try {\n",
" Thread.sleep(100);\n",
" } \n",
" catch(InterruptedException e) {\n",
" System.out.format(\"Error: exception - %s\", e);\n",
" }\n",
" }\n",
" scanMonitor.notifyComplete();\n",
" }\n",
"\n",
" public void onFailure(AerospikeException e) {\n",
" System.out.format(\"Error: scan failed with exception - %s\", e);\n",
" scanMonitor.notifyComplete();\n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Inserted 100000 records.\n",
"Scan returned 100 records.\n",
"Processed 100 records.\n",
"Scan returned 200 records.\n",
"Processed 200 records.\n",
"Scan returned 300 records.\n",
"Processed 300 records.\n",
"Scan returned 400 records.\n",
"Processed 400 records.\n",
"Scan returned 500 records.\n",
"Processed 500 records.\n",
"Scan returned 600 records.\n",
"Processed 600 records.\n",
"Scan returned 700 records.\n",
"Processed 700 records.\n",
"Scan returned 800 records.\n",
"Processed 800 records.\n",
"Scan returned 900 records.\n",
"Processed 900 records.\n",
"Scan returned 1000 records.\n",
"Processed 1000 records.\n",
"Done: nested async scan and update"
]
}
],
"source": [
"// cleanup prior state\n",
"Cleanup();\n",
"\n",
"// initialize data, event loops and client\n",
"int numRecords = 100000;\n",
"int numLoops = 2;\n",
"int commandsPerLoop = 25;\n",
"int delayQueueSize = 0;\n",
"\n",
"eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize);\n",
"client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);\n",
"throttles = InitializeThrottles(numLoops, commandsPerLoop);\n",
"\n",
"InsertRecords(numRecords, eventLoops, throttles, 0);\n",
"System.out.format(\"Inserted %s records.\\n\", numRecords);\n",
"\n",
"EventLoop eventLoop = eventLoops.next();\n",
"Monitor scanMonitor = new Monitor();\n",
"int progressFreq = 100;\n",
"\n",
"// issue async scan that in turn issues async update on each returned record\n",
"ScanPolicy policy = new ScanPolicy();\n",
"policy.filterExp = Exp.build(\n",
" Exp.and(\n",
" Exp.le(Exp.intBin(\"bin1\"), Exp.val(1000)),\n",
" Exp.ge(Exp.intBin(\"bin1\"), Exp.val(1))));\n",
" \n",
"client.scanAll(eventLoop, new ScanRecordSequenceListener(eventLoops, throttles, scanMonitor, progressFreq), \n",
" policy, Namespace, Set);\n",
"scanMonitor.waitTillComplete();\n",
"\n",
"System.out.format(\"Done: nested async scan and update\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Misc Examples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Delay Queue Full Error\n",
"If the delay queue fills up, a request will not be accepted and the client will return “delay queue full” error. Below we simulate this condition by having 25 slots and a delay queue of 20 in 2 event loops each (can handle total 90 outstanding requests) and issuing a hundred concurrent requests. The throttle is effectively turned off by a large setting for the number of requests to go through."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Put failed: namespace=test set=async-ops key=id-90 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-92 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-91 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-93 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-95 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-97 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-99 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-94 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-96 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"Put failed: namespace=test set=async-ops key=id-98 exception=Error -9,0,30000,0,0: Async delay queue is full\n",
"16 ops/ms with event-loops: 2 and commands-per-loop: 25.\n"
]
}
],
"source": [
"// clean up the current state\n",
"Cleanup();\n",
"\n",
"// initialize data, event loops and client\n",
"int numRecords = 100;\n",
"int numLoops = 2;\n",
"int commandsPerLoop = 25;\n",
"int delayQueueSize = 20;\n",
"int noThrottle = 10000; //effectively no throttle\n",
"\n",
"eventLoops = InitializeEventLoops(EventLoopType.DIRECT_NIO, numLoops, commandsPerLoop, delayQueueSize);\n",
"client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);\n",
"throttles = InitializeThrottles(numLoops, noThrottle);\n",
"\n",
"// attempt to insert records above the available slots and delay queue capacity\n",
"long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0);\n",
"\n",
"System.out.format(\"%s ops/ms with event-loops: %s and commands-per-loop: %s.\\n\", \n",
" numRecords/(elapsedTime/1000000), numLoops, commandsPerLoop);;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Comparing Different Settings\n",
"The code below allows comparison of insert throughput with different parameters: event loops type, number of event loops, and concurrency level in each loop. It doesn't produce meaningful results in the default notebook container setting where the client and server are running in the same container. A meaningful comparison can be drawn by pointing to the desired server cluster and also adjusting the client environment."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"27 ops/ms with 2 DIRECT_NIO event-loops and 50 commands-per-loop.\n",
"23 ops/ms with 2 DIRECT_NIO event-loops and 100 commands-per-loop.\n",
"29 ops/ms with 2 DIRECT_NIO event-loops and 200 commands-per-loop.\n",
"34 ops/ms with 4 DIRECT_NIO event-loops and 50 commands-per-loop.\n",
"35 ops/ms with 4 DIRECT_NIO event-loops and 100 commands-per-loop.\n",
"33 ops/ms with 4 DIRECT_NIO event-loops and 200 commands-per-loop.\n",
"23 ops/ms with 8 DIRECT_NIO event-loops and 50 commands-per-loop.\n",
"22 ops/ms with 8 DIRECT_NIO event-loops and 100 commands-per-loop.\n",
"19 ops/ms with 8 DIRECT_NIO event-loops and 200 commands-per-loop.\n",
"21 ops/ms with 2 NETTY_NIO event-loops and 50 commands-per-loop.\n",
"25 ops/ms with 2 NETTY_NIO event-loops and 100 commands-per-loop.\n",
"27 ops/ms with 2 NETTY_NIO event-loops and 200 commands-per-loop.\n",
"32 ops/ms with 4 NETTY_NIO event-loops and 50 commands-per-loop.\n",
"31 ops/ms with 4 NETTY_NIO event-loops and 100 commands-per-loop.\n",
"33 ops/ms with 4 NETTY_NIO event-loops and 200 commands-per-loop.\n",
"22 ops/ms with 8 NETTY_NIO event-loops and 50 commands-per-loop.\n",
"22 ops/ms with 8 NETTY_NIO event-loops and 100 commands-per-loop.\n",
"20 ops/ms with 8 NETTY_NIO event-loops and 200 commands-per-loop.\n",
"20 ops/ms with 2 NETTY_EPOLL event-loops and 50 commands-per-loop.\n",
"23 ops/ms with 2 NETTY_EPOLL event-loops and 100 commands-per-loop.\n",
"24 ops/ms with 2 NETTY_EPOLL event-loops and 200 commands-per-loop.\n",
"25 ops/ms with 4 NETTY_EPOLL event-loops and 50 commands-per-loop.\n",
"25 ops/ms with 4 NETTY_EPOLL event-loops and 100 commands-per-loop.\n",
"26 ops/ms with 4 NETTY_EPOLL event-loops and 200 commands-per-loop.\n",
"23 ops/ms with 8 NETTY_EPOLL event-loops and 50 commands-per-loop.\n",
"23 ops/ms with 8 NETTY_EPOLL event-loops and 100 commands-per-loop.\n",
"21 ops/ms with 8 NETTY_EPOLL event-loops and 200 commands-per-loop.\n",
"Done.\n"
]
}
],
"source": [
"// Throughput with parameterized async insertion\n",
"\n",
"int numRecords = 100000;\n",
"EventLoopType[] eventLoopOptions = {EventLoopType.DIRECT_NIO, EventLoopType.NETTY_NIO, EventLoopType.NETTY_EPOLL};\n",
"int[] numLoopsOptions = {2, 4, 8};\n",
"int[] commandsPerLoopOptions = {50, 100, 200};\n",
"\n",
"for (EventLoopType eventLoopType: eventLoopOptions) {\n",
" for (int numLoops: numLoopsOptions) {\n",
" for (int commandsPerLoop: commandsPerLoopOptions) {\n",
" Cleanup();\n",
" eventLoops = InitializeEventLoops(eventLoopType, numLoops, commandsPerLoop, 0);\n",
" client = InitializeClient(eventLoops, numLoops, commandsPerLoop, hosts);\n",
" throttles = InitializeThrottles(numLoops, commandsPerLoop);\n",
"\n",
" long elapsedTime = InsertRecords(numRecords, eventLoops, throttles, 0);\n",
" System.out.format(\"%s ops/ms with %s %s event-loops and %s commands-per-loop.\\n\", \n",
" numRecords/(elapsedTime/1000000), numLoops, eventLoopType, commandsPerLoop);\n",
" }\n",
" }\n",
"}\n",
"System.out.println(\"Done.\");;"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Takeaways and Conclusion\n",
"The tutorial described the architecture of and key concepts in asynchronous operations in Aerospike client. It presented the programming framework in which async requests can be submitted and handled. It illustrated with code how event loops, throttling, inline async calls are implemented. The trade-offs that a developer needs to make for which execution modes to employ - synchronous, asynchronous, or background - involve multiple factors including the nature of operations, client and server setup, throughput needs, and programming complexity."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up\n",
"Remove tutorial data and close connection."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"ExecuteTime": {
"end_time": "2020-12-29T20:49:19.972650Z",
"start_time": "2020-12-29T20:49:19.967344Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Removed tutorial data and closed server connection.\n"
]
}
],
"source": [
"Cleanup();\n",
"System.out.println(\"Removed tutorial data and closed server connection.\");"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further Exploration and Resources\n",
"Here are some links for further exploration\n",
"\n",
"Resources\n",
"- Related notebooks\n",
" - [Implementing SQL Operations: SELECT](sql_select.ipynb), \n",
" - [Implementing SQL Operations: Aggregates - Part 1](sql_aggregates_1.ipynb) and [Part 2](sql_aggregates_2.ipynb).\n",
" - [Implementing SQL Operations: CREATE, UPDATE, DELETE](sql_updates.ipynb)\n",
" - [Working with Lists](java-working_with_lists.ipynb)\n",
" - [Working with Maps](java-working_with_maps.ipynb)\n",
"- Aerospike Developer Hub\n",
" - [Java Developers Resources](https://developer.aerospike.com/java-developers)\n",
"- Github repos\n",
" - [Java code examples](https://github.com/aerospike/aerospike-client-java/tree/master/examples/src/com/aerospike/examples)\n",
" - [Reactive programming examples for the Java client](https://github.com/aerospike/aerospike-client-java-reactive)\n",
"- Documentation\n",
" - [Java Client](https://www.aerospike.com/docs/client/java/index.html)\n",
" - [Java API Reference](https://www.aerospike.com/apidocs/java/)\n",
" - [Aerospike Documentation](https://docs.aerospike.com/docs/)\n",
"- Blog\n",
" - [Simple Web Application Using Java, Spring Boot, Aerospike and Docker](https://medium.com/aerospike-developer-blog/simple-web-application-using-java-spring-boot-aerospike-database-and-docker-ad13795e0089)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Next steps\n",
"\n",
"Visit [Aerospike notebooks repo](https://github.com/aerospike-examples/interactive-notebooks) to run additional Aerospike notebooks. To run a different notebook, download the notebook from the repo to your local machine, and then click on File->Open in the notebook menu, and select Upload."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Java",
"language": "java",
"name": "java"
},
"language_info": {
"codemirror_mode": "java",
"file_extension": ".jshell",
"mimetype": "text/x-java-source",
"name": "Java",
"pygments_lexer": "java",
"version": "11.0.8+10-LTS"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": true,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 4
}