TPC-C Benchmark using HBase
Introduction
For this Project I used some of the TPC-C benchmark tables. The TPC-C benchmark simulates the activity of any company that must manage, sell, and distribute products or services. A full description of the database can be found here:
http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-c_v5.11.0.pdf. The goal of this project is to develop a Java program using HBase implementing four different queries:
- Query 1: List the customers with an order from a given warehouse and district during time interval specified by a START_DATE (included) and END_DATE (excluded).
- Query 2: Insert/update (up to 6 times) the discount for a given customer, warehouse and district.
- Query 3: Show the latest 4 discounts for a given customer, warehouse and district.
- Query 4: List all the customers from a given list of districts in a specified warehouse.
The App was developed using
Oracle Java 8, Hbase and deployed using
Ubuntu.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Initially I created the name of the tables, and converted them in Bytes. Then, I created the object Configuration, set the server and the port, and finally created the HBaseAdmin. This last object is responsible for creating the tables, or connecting them in case they already exist.
private Configuration config;
private HBaseAdmin hBaseAdmin;
//Assign names of Column Families
byte[] CF = Bytes.toBytes("CF");
//Assign Table Names
byte[] TABLE_A = Bytes.toBytes("Warehouse");
byte[] TABLE_B = Bytes.toBytes("District");
byte[] TABLE_C = Bytes.toBytes("Item");
byte[] TABLE_D = Bytes.toBytes("New_order");
byte[] TABLE_E = Bytes.toBytes("Orders");
byte[] TABLE_F = Bytes.toBytes("History");
byte[] TABLE_G = Bytes.toBytes("Customer");
byte[] TABLE_H = Bytes.toBytes("Stock");
byte[] TABLE_I = Bytes.toBytes("Order_line");
/**
* The Constructor. Establishes the connection with HBase.
* @param zkHost
* @throws IOException
*/
public HBaseTPCC(String zkHost) throws IOException {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort", "2181");
HBaseConfiguration.addHbaseResources(config);
this.hBaseAdmin = new HBaseAdmin(config);
}
Create tables
The first function creates the tables in the server. These tables are generated according to the TPC-C schema. The Column Family is set as 6 versions, and this features will be used for Query 3.
public void createTPCCTables() throws IOException {
HTableDescriptor table_A = new HTableDescriptor(TableName.valueOf(TABLE_A));
HTableDescriptor table_B = new HTableDescriptor(TableName.valueOf(TABLE_B));
HTableDescriptor table_C = new HTableDescriptor(TableName.valueOf(TABLE_C));
HTableDescriptor table_D = new HTableDescriptor(TableName.valueOf(TABLE_D));
HTableDescriptor table_E = new HTableDescriptor(TableName.valueOf(TABLE_E));
HTableDescriptor table_F = new HTableDescriptor(TableName.valueOf(TABLE_F));
HTableDescriptor table_G = new HTableDescriptor(TableName.valueOf(TABLE_G));
HTableDescriptor table_H = new HTableDescriptor(TableName.valueOf(TABLE_H));
HTableDescriptor table_I = new HTableDescriptor(TableName.valueOf(TABLE_I));
HColumnDescriptor A = new HColumnDescriptor(CF);
A.setMaxVersions(6);
if(hBaseAdmin.tableExists(table_A.getName())){
hBaseAdmin.disableTable(table_A.getName());
hBaseAdmin.deleteTable(table_A.getName());
}
if(hBaseAdmin.tableExists(table_B.getName())){
hBaseAdmin.disableTable(table_B.getName());
hBaseAdmin.deleteTable(table_B.getName());
}
if(hBaseAdmin.tableExists(table_C.getName())){
hBaseAdmin.disableTable(table_C.getName());
hBaseAdmin.deleteTable(table_C.getName());
}
if(hBaseAdmin.tableExists(table_D.getName())){
hBaseAdmin.disableTable(table_D.getName());
hBaseAdmin.deleteTable(table_D.getName());
}
if(hBaseAdmin.tableExists(table_E.getName())){
hBaseAdmin.disableTable(table_E.getName());
hBaseAdmin.deleteTable(table_E.getName());
}
if(hBaseAdmin.tableExists(table_F.getName())){
hBaseAdmin.disableTable(table_F.getName());
hBaseAdmin.deleteTable(table_F.getName());
}
if(hBaseAdmin.tableExists(table_G.getName())){
hBaseAdmin.disableTable(table_G.getName());
hBaseAdmin.deleteTable(table_G.getName());
}
if(hBaseAdmin.tableExists(table_H.getName())){
hBaseAdmin.disableTable(table_H.getName());
hBaseAdmin.deleteTable(table_H.getName());
}
if(hBaseAdmin.tableExists(table_I.getName())){
hBaseAdmin.disableTable(table_I.getName());
hBaseAdmin.deleteTable(table_I.getName());
}
table_A.addFamily(A);
table_B.addFamily(A);
table_C.addFamily(A);
table_D.addFamily(A);
table_E.addFamily(A);
table_F.addFamily(A);
table_G.addFamily(A);
table_H.addFamily(A);
table_I.addFamily(A);
this.hBaseAdmin.createTable(table_A);
this.hBaseAdmin.createTable(table_B);
this.hBaseAdmin.createTable(table_C);
this.hBaseAdmin.createTable(table_D);
this.hBaseAdmin.createTable(table_E);
this.hBaseAdmin.createTable(table_F);
this.hBaseAdmin.createTable(table_G);
this.hBaseAdmin.createTable(table_H);
this.hBaseAdmin.createTable(table_I);
}
Load tables
Once the tables are created, I first create connections to them using the HConnectionManager. Then, I loaded the content of the .csv files inside them. The function colName(i) receives as input a number from 0 to 8 (representing the 9 tables), and returns an array of String containing all the variables presented in each table.
public void loadTables(String folder)throws IOException {
HConnection conn = HConnectionManager.createConnection(config);
//Assign names of Column Families
byte[] CF = Bytes.toBytes("CF");
//Assign Table Names
byte[] TABLE_A = Bytes.toBytes("Warehouse");
byte[] TABLE_B = Bytes.toBytes("District");
byte[] TABLE_C = Bytes.toBytes("Item");
byte[] TABLE_D = Bytes.toBytes("New_order");
byte[] TABLE_E = Bytes.toBytes("Orders");
byte[] TABLE_F = Bytes.toBytes("History");
byte[] TABLE_G = Bytes.toBytes("Customer");
byte[] TABLE_H = Bytes.toBytes("Stock");
byte[] TABLE_I = Bytes.toBytes("Order_line");
HTable table_A = new HTable(TableName.valueOf(TABLE_A), conn);
HTable table_B = new HTable(TableName.valueOf(TABLE_B), conn);
HTable table_C = new HTable(TableName.valueOf(TABLE_C), conn);
HTable table_D = new HTable(TableName.valueOf(TABLE_D), conn);
HTable table_E = new HTable(TableName.valueOf(TABLE_E), conn);
HTable table_F = new HTable(TableName.valueOf(TABLE_F), conn);
HTable table_G = new HTable(TableName.valueOf(TABLE_G), conn);
HTable table_H = new HTable(TableName.valueOf(TABLE_H), conn);
HTable table_I = new HTable(TableName.valueOf(TABLE_I), conn);
System.out.println("Insert 1");
String currentLine = null;
BufferedReader br = new BufferedReader(
new FileReader(folder + "/warehouse.csv"));
int cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(0)[i]), Bytes.toBytes(line[i]));
}
table_A.put(p);
}
System.out.println("Insert 2");
currentLine = null;
br = new BufferedReader(
new FileReader(folder + "/district.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{1,0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(1)[i]), Bytes.toBytes(line[i]));
}
table_B.put(p);
}
System.out.println("Insert 3");
currentLine = null;
br = new BufferedReader(
new FileReader(folder + "/item.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(2)[i]), Bytes.toBytes(line[i]));
}
table_C.put(p);
}
System.out.println("Insert 4");
br = new BufferedReader(
new FileReader(folder + "/new_order.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{2,1,0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(3)[i]), Bytes.toBytes(line[i]));
}
table_D.put(p);
}
System.out.println("Insert 5");
br = new BufferedReader(
new FileReader(folder + "/orders.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{2,1,4};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(4)[i]), Bytes.toBytes(line[i]));
}
table_E.put(p);
}
System.out.println("Insert 6");
br = new BufferedReader(
new FileReader(folder + "/history.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{4,3,5};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(5)[i]), Bytes.toBytes(line[i]));
}
table_F.put(p);
}
System.out.println("Insert 7");
br = new BufferedReader(
new FileReader(folder + "/customer.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{2,1,0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(6)[i]), Bytes.toBytes(line[i]));
}
table_G.put(p);
}
System.out.println("Insert 8");
br = new BufferedReader(
new FileReader(folder + "/stock.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{1,0};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(7)[i]), Bytes.toBytes(line[i]));
}
table_H.put(p);
}
System.out.println("Insert 9");
br = new BufferedReader(
new FileReader(folder + "/order_line.csv"));
cont = 0;
while ((currentLine = br.readLine()) != null) {
String[] line = currentLine.split(",");
int[] poskey1 = new int[]{2,1,0,3};
Put p = new Put(getKey(line, poskey1));
cont = cont+1;
for (int i = 0; i < (line.length); i++) {
p.add(CF, Bytes.toBytes(colName(8)[i]), Bytes.toBytes(line[i]));
}
table_I.put(p);
}
Query 1
public List<String> query1(String warehouseId, String districtId, String startDate, String endDate) throws IOException {
HConnection conn = HConnectionManager.createConnection(config);
HTable table_E = new HTable(TableName.valueOf(TABLE_E),conn);
byte[] startKey = getKey(new String[] {warehouseId,districtId, startDate}, new int[] {0,1,2});
byte[] endKey = getKey(new String[] {warehouseId,districtId, endDate}, new int[] {0,1,2});
Scan scan = new Scan(startKey,endKey);
ResultScanner rs = table_E.getScanner(scan);
Result res = rs.next();
ArrayList<String> CustomerID = new ArrayList<>();
while (res!=null && !res.isEmpty()){
CustomerID.add(Bytes.toString(res.getValue(CF,Bytes.toBytes("O_C_ID"))));
res = rs.next();
}
return CustomerID;
}
Query 2
public void query2(String warehouseId, String districtId, String customerId, String[] discounts) throws IOException {
HConnection conn = HConnectionManager.createConnection(config);
HTable table_G = new HTable(TableName.valueOf(TABLE_G),conn);
for (int i=0; i<discounts.length; i++){
Put put = new Put(getKey(new String[] {warehouseId,districtId,customerId}, new int[] {0,1,2}));
put.add(CF,Bytes.toBytes("C_DISCOUNT"),Bytes.toBytes(discounts[i]));
table_G.put(put);
}
}
Query 3
public String[] query3(String warehouseId, String districtId, String customerId) throws IOException {
HConnection conn = HConnectionManager.createConnection(config);
HTable table_G = new HTable(TableName.valueOf(TABLE_G), conn);
Get get = new Get(getKey(new String[]{warehouseId, districtId, customerId}, new int[]{0, 1, 2}));
get.addColumn(CF, Bytes.toBytes("C_DISCOUNT"));
get.setMaxVersions(4);
Result result = table_G.get(get);
ArrayList<String> Discount_ID = new ArrayList<>();
if (result != null && !result.isEmpty()) {
CellScanner scanner = result.cellScanner();
while (scanner.advance()) {
Cell cell = scanner.current();
byte[] value = CellUtil.cloneValue(cell);
Discount_ID.add(Bytes.toString(value));
}
}
String[] array = Discount_ID.toArray(new String[Discount_ID.size()]);
return array;
}
Query 4
public List<Integer> query4(String warehouseId, String[] districtIds) throws IOException {
HConnection conn = HConnectionManager.createConnection(config);
HTable table_G = new HTable(TableName.valueOf(TABLE_G), conn);
int endKey2 = Integer.parseInt(warehouseId);
endKey2 = endKey2 + 1;
String endKey = Integer.toString(endKey2);
Scan scan = new Scan(Bytes.toBytes(warehouseId), Bytes.toBytes(endKey));
FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for(int i=0;i < districtIds.length;i++) {
SingleColumnValueFilter filter1 = new SingleColumnValueFilter(
CF, Bytes.toBytes("C_D_ID") ,
CompareFilter.CompareOp.EQUAL,
Bytes.toBytes(districtIds[i]));
list.addFilter(filter1);
}
scan.setFilter(list);
ResultScanner rs = table_G.getScanner(scan);
Result res = rs.next();
ArrayList<Integer> CustomerID = new ArrayList<>();
while (res!=null && !res.isEmpty()){
CustomerID.add(Integer.parseInt(Bytes.toString(res.getValue(CF,Bytes.toBytes("C_ID")))));
res = rs.next();
}
return CustomerID;
}