Jaql Input/Output

Introduction

Jaql has been designed to flexibly read and write data from a variety of data stores and formats. Input and output are handled through read/write expressions; several examples are introduced in the overview. For example, localRead, localWrite access locally stored files, hdfsRead, hdfsWrite access HDFS files, and hbaseRead, hbaseWrite access HBase tables. In this section, we show how to extend Jaql to access new data stores and formats. We first look into data stores that are supported by Jaql: Hadoop's HDFS and HBase as well as input/output streams. Then we look into plugging-in data stores that do not work with Hadoop or stream interfaces.

Hadoop-based Input/Output

In Hadoop's MapReduce, data is accessed using InputFormat and OutputFormat. Classes that implement these interfaces provide enough information to the MapReduce framework so that the data can be partitioned and processed in parallel.

Jaql's I/O framework supports any Input(Output)Format to be plugged-in. However, Input(Output)Formats work with Writables while Jaql expects Items. Thus, the framework makes it easy to convert Writables to and from Items. In order to make the discussion more concrete, lets look under the hood of the hdfsRead('books.jqlb') expression. First, hdfsRead is a macro expression that makes use of a more generic expression called read. The following is the actual read expression that is invoked when using hdfsRead:

  read('hdfs', 'books.jqlb');
The first argument to the read expression is the name associated with a type of data store. Just as names are associated to function implementations in the function registry, names are associated to data store types in the storage registry. The second argument to read is the path of a file stored in HDFS.

For the hdfs data store type, the registry entry specifies default Input(Output)Formats. The defaults for Jaql are SequenceFileInputFormat and SequenceFileOutputFormat. However, suppose that the HDFS file is a text file (i.e, new-line delimitted records). The InputFormat to use in this case is TextInputFormat. For this case, Jaql's default can be overriden as follows:

  read('hdfs', 'books.jql', {format: 'org.apache.hadoop.mapred.TextInputFormat'});
The additional argument to read is a record that specifies which class will implement the InputFormat. More generally, this record can specify any options that are specific to a give data store type. In the example above, it is necessary to specify a converter from Writables to Items. Why? A TextFile's records are Text writables, not Jaql Items (i.e., binary JSON). What is needed is a way to convert a Text Writable into an Item. This is done by implementing the appropriate converter, then specifying it as an option to read. An example converter is implemented as follows:
  public class FromJSONTxtConverter extends HadoopRecordToItem { 
  ...
  @Override
  protected WritableToItem createValConverter() {
    return new WritableToItem() {
      public void convert(Writable src, Item tgt) {
        // expect src is of type Text
        // use a JSON parser to parse it
        // set tgt
      }
      ...
    };
    ...
  }
The FromJSONTxtConverter takes as input a Writable value and sets the Item to the parsed result. The following shows how to use it in read:
  read('hdfs', 'books.jql', {format    : 'org.apache.hadoop.mapred.TextInputFormat',
                             converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
The read expression uses TextInputFormat to read the file specified at 'path' in HDFS. For each record retrieved from the file, it will use FromJSONTxtConverter to convert each Text Writable to an Item. While extensible, the read expression is cumbersome to specify in this manner. There are several options to hide the details. The simplest is to define a function:
  $myRead = fn($path) read('hdfs', $path, 
                            {format    : 'org.apache.hadoop.mapred.TextInputFormat',
                             converter : 'com.acme.extensions.data.FromJSONTxtConverter'});
  
  $myRead('books.jql');
Another option is to register a new data store type. This is done through a storage registry that maps a name to records that specify options for input and output. This is how Jaql keeps track of defaults for each data store. For example, the registry entry for 'hdfs' files is the following:
  {type:       'hdfs',
   inoptions :	{adapter      : 'com.ibm.impliance.jaql.DefaultHadoopInputAdapter', 
                 format       : 'org.apache.hadoop.mapred.SequenceFileInputFormat', 
                 configurator : 'com.ibm.impliance.jaql.FileInputConfigurator'},
   outoptions:	{adapter      : 'com.ibm.impliance.jaql.DefaultHadoopOutputAdapter', 
                 format       : 'org.apache.hadoop.mapred.SequenceFileOutputFormat', 
                 configurator : 'com.ibm.impliance.jaql.FileOutputConfigurator'}};
A data store is named by (type:'hdfs') which is used by read to find associated options. There are two sets of options, one for input and one for output. The default file format is a SequenceFile whose records are key, value pairs whose types are WritableComparable and Writable, respectively. By default, Jaql ignores the key. Since a converter is not specified, Jaql assumes that the value's type is an Item.

The 'hdfs' registry example includes additional options. The first is an Adapter. This is the glue that brings together all other options for a data source and encapsulates how to access data and how to convert the data to Items (if needed). Thus, it produces an Item iterator for Jaql from any data store. For Hadoop data, the adapter to use is DefaultHadoopInput(Output)Adapter. The Hadoop adapter allows any existing Input(Output)Format to be swapped in along with any converter (as shown by the earlier example). Another example of an adapter in Jaql is the StreamAdapter. It allows access directly to byte stream data. Access to other data stores is possible by implementing the Adapter interface.

If an Input(Output)Format can be specified for a given data store, Hadoop's MapReduce can use it as an input(output). Accordingly, the Hadoop adapter informs the Jaql compiler that Hadoop's MapReduce can be used if appropriate. However, Input(Output)Formats may require specific configuration prior to submitting a MapReduce job. In Jaql, this is exposed through the "configurator" option. For example, an Input(Output)Format requires that a file path be specified before the job is submitted. The com.ibm.impliance.jaql.FileInputConfigurator does exactly this: the Adapter passes 'books.jql' and all options to the configurator, which then configures the job appropriately. For many HDFS files, com.ibm.impliance.jaql.FileInputConfigurator is sufficient, but if needed, it can be overriden.

Returning to the example, the new data store type is registered as follows:

  registerAdapter({type     :	'myHDFSFile',
                   inoptions:	{adapter      : 'com.ibm.jaql.io.hadoop.DefaultHadoopInputAdapter', 
                                 format       : 'org.apache.hadoop.mapred.TextInputFormat', 
                                 converter    : 'com.acme.extensions.data.FromJSONTxtConverter',
                                 configurator : 'com.ibm.jaql.io.hadoop.FileInputConfigurator'}});
The Hadoop adapter and configurator are specified, along with the TextInputFormat and custom converter. The newly registered data store is used as follows:
  read('myHDFSFile', 'books.jql');
If a new data store type is very common, it may be convenient to define a Jaql function that hides some of the details:
  $readMyFile = fn($name) read('myHDFSFile', $name);

  $readMyFile('books.jql');

In addition to HDFS files, Jaql supports HBase as a data source. This is supported by the same Hadoop adapter, but parameterized by Input(Output)Formats and configurators that are specific to HBase. For HBase, the 'path' represents a table name. Columns and column families can be specified as additional options.

Finally, in order to do something more interesting with these examples, consider a query that will be rewritten to Map/Reduce. The simplest example is a for-loop over a read expression. Jaql translates such a query into the following Map/Reduce expression:

  $q = for( $i in read('myHDFSFile', 'example.jql') )
         [ {key: $i.publisher, ($i.title): $i.year} ];
  
  hbaseWrite('mytable', $q);
  
  // translates to:
  mapReduce({
    input:  {type: 'myHDFSFile', location: 'example.jql'},
    output: {type: 'hbase'     , location: 'mytable'},
    map:    fn($i) [ [null, {$i.key, $i.abc, $i.xyz}] ] 
  });
  
In this example, an HBase table is loaded in parallel with a projection from a JSON text formatted file. The mapReduce function specifies its input and output using records. Each record specifies the data store type, a location, and possibly additional options.

Stream-based Input/Output

The Hadoop-based Input/Output is useful when processing large data sets. However, we expect that reading from an InputStream or writing to an OutputStream will also be needed when manipulating small data sets. For this purpose, we provide an additional type of adapter: StreamAdapters. StreamAdapeters open an input or output stream given a URI. For example, localRead, localWrite and httpGet expressions are based on StreamAdapters. Just as Hadoop adapters allow for conversions between Writables and Items, Stream adapters also provide for converting between bytes and Items.

For example, consider accessing a local file that is formatted as JSON text. The only class to implement is a converter that can borrow from the previous example:

  public class FromJSONTxtConverter implements StreamToItem {
    ...
    public void setInputStream(InputStream in) {
      // set the input stream
    }
    
    public boolean read(Item v) throws IOException {
      // parse the input stream to get the next v
    }
    ...
  }
The new data source is registered and tested as follows:
  localRead('books.jql', {format: 'com.acme.extensions.data.FromJSONTxtConverter'});

Other Input/Output

Adapters can be extended in order to access data that are not suitable for Hadoop and Stream adapters. An example is access to relational databases, e.g., through a JDBC driver. The following lists the Adapter interface:

  public interface StorableInputAdapter {
    
    protected void initializeFrom(Item args);
    
    public void open() throws IOException;
    
    public abstract ItemReader getItemReader() throws IOException;
    
    public void close() throws IOException;
  }

The initializeFrom method is used to bind-in arguments that are passed in from the expression (e.g., 'path' from myRead('path')). The open sets up access to a data store whereas close releases resources. Finally, the Iter consumes data from the data source and produces Items as input to Jaql.

Overview | Java Functions | Extending Data-sources | Running Jaql | Roadmap