Jaql Input/Output
Introduction
Jaql has been designed to flexibly read and write data
from a variety of data stores and formats.
Input and output are handled through read/write expressions;
several examples are introduced in the overview.
For example, localRead, localWrite access locally stored files,
hdfsRead, hdfsWrite access
HDFS files, and
hbaseRead, hbaseWrite access HBase tables.
In this section, we show how to extend Jaql to access new data stores and formats.
We first look into data stores that are supported by Jaql: Hadoop's HDFS and HBase
as well as input/output streams.
Then we look into plugging-in data stores that do not work with Hadoop or stream interfaces.
Hadoop-based Input/Output
In Hadoop's MapReduce, data is accessed using
InputFormat
and OutputFormat.
Classes that implement these interfaces provide enough information to the MapReduce framework so that
the data can be partitioned and processed in parallel.
Jaql's I/O framework supports any Input(Output)Format to be plugged-in.
However, Input(Output)Formats work with Writables while Jaql expects Items.
Thus, the framework makes it easy to convert Writables to and from Items.
In order to make the discussion more concrete, lets look under the hood of the
hdfsRead('books.jqlb') expression.
First, hdfsRead is a macro expression that
makes use of a more generic expression called read.
The following is the actual read expression that is invoked
when using hdfsRead:
read('hdfs', 'books.jqlb');
The first argument to the read expression is the name
associated with a type of data store. Just as names are associated to
function implementations in the function registry, names are
associated to data store types in the storage registry. The second
argument to read is the path of a file stored in HDFS.
For the hdfs data store type, the registry entry specifies default Input(Output)Formats.
The defaults for Jaql are SequenceFileInputFormat and SequenceFileOutputFormat.
However, suppose that the HDFS file is a text file (i.e, new-line delimitted records). The InputFormat to use
in this case is TextInputFormat. For this case, Jaql's default can be overriden as follows:
read('hdfs', 'books.jql', {format: 'org.apache.hadoop.mapred.TextInputFormat'});
The additional argument to read is a record that specifies which class
will implement the InputFormat. More generally, this record can specify any options that
are specific to a give data store type. In the example above, it is necessary to specify
a converter from Writables to Items. Why?
A TextFile's records are Text writables, not Jaql Items (i.e., binary JSON).
What is needed is a way to convert a Text Writable into an Item.
This is done by implementing the appropriate converter, then specifying it
as an option to read. An example converter is implemented as follows:
public class FromJSONTxtConverter extends HadoopRecordToItem {
...
@Override
protected WritableToItem createValConverter() {
return new WritableToItem() {
public void convert(Writable src, Item tgt) {
// expect src is of type Text
// use a JSON parser to parse it
// set tgt
}
...
};
...
}
The FromJSONTxtConverter takes as input a Writable value and sets the
Item to the parsed result. The following shows how to use it in read:
read('hdfs', 'books.jql', {format : 'org.apache.hadoop.mapred.TextInputFormat',
converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
The read expression uses TextInputFormat to read the file
specified at 'path' in HDFS. For each record retrieved from the file, it will use
FromJSONTxtConverter to convert each Text Writable to an Item.
While extensible, the read expression is cumbersome to specify in this manner.
There are several options to hide the details. The simplest is to define a function:
$myRead = fn($path) read('hdfs', $path,
{format : 'org.apache.hadoop.mapred.TextInputFormat',
converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
$myRead('books.jql');
Another option is to register a new data store type. This is done through
a storage registry that maps a name to records that specify options for input and output.
This is how Jaql keeps track of defaults for each data store. For example, the registry
entry for 'hdfs' files is the following:
{type: 'hdfs',
inoptions : {adapter : 'com.ibm.impliance.jaql.DefaultHadoopInputAdapter',
format : 'org.apache.hadoop.mapred.SequenceFileInputFormat',
configurator : 'com.ibm.impliance.jaql.FileInputConfigurator'},
outoptions: {adapter : 'com.ibm.impliance.jaql.DefaultHadoopOutputAdapter',
format : 'org.apache.hadoop.mapred.SequenceFileOutputFormat',
configurator : 'com.ibm.impliance.jaql.FileOutputConfigurator'}};
A data store is named by (type:'hdfs') which is used by read to find
associated options. There are two sets of options, one for input and one for output.
The default file format is a SequenceFile whose records are key, value
pairs whose types are WritableComparable and Writable, respectively.
By default, Jaql ignores the key. Since a converter is not specified, Jaql assumes that the
value's type is an Item.
The 'hdfs' registry example includes additional options. The first is an Adapter. This is the glue
that brings together all other options for a data source and encapsulates how to access data and how to
convert the data to Items (if needed). Thus, it produces an Item iterator
for Jaql from any data store. For Hadoop data, the adapter to use is DefaultHadoopInput(Output)Adapter.
The Hadoop adapter allows any existing Input(Output)Format to be swapped in along with any converter
(as shown by the earlier example). Another example of an adapter in Jaql is the StreamAdapter. It allows
access directly to byte stream data. Access to other data stores is possible by implementing the Adapter interface.
If an Input(Output)Format can be specified for a given data store, Hadoop's MapReduce can use it as an input(output).
Accordingly, the Hadoop adapter informs the Jaql compiler that Hadoop's MapReduce can be used if appropriate.
However, Input(Output)Formats may require specific configuration prior to submitting a MapReduce job.
In Jaql, this is exposed through the "configurator" option. For example, an Input(Output)Format requires
that a file path be specified before the job is submitted. The com.ibm.impliance.jaql.FileInputConfigurator
does exactly this: the Adapter passes 'books.jql' and all options to the configurator, which then configures
the job appropriately. For many HDFS files, com.ibm.impliance.jaql.FileInputConfigurator is sufficient,
but if needed, it can be overriden.
Returning to the example, the new data store type is registered as follows:
registerAdapter({type : 'myHDFSFile',
inoptions: {adapter : 'com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter',
format : 'org.apache.hadoop.mapred.TextInputFormat',
converter : 'com.acme.extensions.data.FromJSONTxtConverter',
configurator : 'com.ibm.jaql.io.hadoop.FileInputConfigurator'}});
The Hadoop adapter and configurator are specified, along with the TextInputFormat and custom converter.
The newly registered data store is used as follows:
read('myHDFSFile', 'books.jql');
If a new data store type is very common, it may be convenient to
define a Jaql function that hides some of the details:
$readMyFile = fn($name) read('myHDFSFile', $name);
$readMyFile('books.jql');
In addition to HDFS files, Jaql supports HBase as a data source. This is supported by
the same Hadoop adapter, but parameterized by Input(Output)Formats and configurators that are specific to HBase.
For HBase, the 'path' represents a table name. Columns and column families can be specified as additional options.
Finally, in order to do something more interesting with these examples,
consider a query that will be rewritten to
Map/Reduce. The simplest example is a for-loop over a read expression. Jaql translates such
a query into the following Map/Reduce expression:
$q = for( $i in read('myHDFSFile', 'example.jql') )
[ {key: $i.publisher, ($i.title): $i.year} ];
hbaseWrite('mytable', $q);
// translates to:
mapReduce({
input: {type: 'myHDFSFile', location: 'example.jql'},
output: {type: 'hbase' , location: 'mytable'},
map: fn($i) [ [null, {$i.key, $i.abc, $i.xyz}] ]
});
In this example, an HBase table is loaded in parallel with a projection from a JSON text formatted file.
The mapReduce function specifies its input and output using records. Each record specifies
the data store type, a location, and possibly additional options.
Stream-based Input/Output
The Hadoop-based Input/Output is useful when processing large data sets.
However, we expect that reading from an InputStream
or writing to an OutputStream
will also be needed when manipulating small data sets.
For this purpose, we provide an additional type of adapter: StreamAdapters.
StreamAdapeters open an input or output stream given a URI.
For example, localRead, localWrite and httpGet expressions
are based on StreamAdapters. Just as Hadoop adapters allow for conversions between Writables
and Items, Stream adapters also provide for converting between bytes and Items.
For example, consider accessing a local file that is formatted as JSON text. The only
class to implement is a converter that can borrow from the previous example:
public class FromJSONTxtConverter implements StreamToItem {
...
public void setInputStream(InputStream in) {
// set the input stream
}
public boolean read(Item v) throws IOException {
// parse the input stream to get the next v
}
...
}
The new data source is registered and tested as follows:
localRead('books.jql', {format: 'com.acme.extensions.data.FromJSONTxtConverter'});
Other Input/Output
Adapters can be extended in order to access data that are not suitable for
Hadoop and Stream adapters. An example is access to relational databases, e.g., through a JDBC driver.
The following lists the Adapter interface:
public interface StorableInputAdapter {
protected void initializeFrom(Item args);
public void open() throws IOException;
public abstract ItemReader getItemReader() throws IOException;
public void close() throws IOException;
}
The initializeFrom method is used to bind-in arguments that are passed in from the expression
(e.g., 'path' from myRead('path')).
The open sets up access to a data store whereas close releases resources.
Finally, the Iter consumes data from the data source and produces Items as input to Jaql.
Overview |
Java Functions |
Extending Data-sources |
Running Jaql |
Roadmap