Extending Jaql with Java Functions

Calling Java from Jaql simply requires writing a class with an eval() method that accepts and returns Jaql's represenation of JSON types. The following examples illustrate how to implement the Java functions, register the functions with the Jaql system, and invoke the functions. Note that Jaql also provides extensible read/write data sources, but they use different facilities that are described separately.

Split Example

Split returning an entire array

Suppose your data consists of many file system paths. A useful operation is to split a path according to a delimiter (e.g., "/"). Such functionality is readily available using Java's String[] String.split(String d) method. In Jaql, the same functionality can be exposed through a new function: split("/home/mystuff/stuff", "/"). The following shows one way to define split():
   package com.acme.extensions.fn;

   import com.ibm.impliance.jaql.JArray;
   import com.ibm.impliance.jaql.SpillJArray;
   import com.ibm.impliance.jaql.JString;

1  public class Split1
   {
2    private SpillJArray result = new SpillJArray();
     private JString resultStr = new JString();
     
3    public JArray eval(JString jstr, JString jdelim) throws Exception
     {
4      if( jstr == null || jdelim == null )
       {
         return null;
       }
5      String str = jstr.toString();
       String delim = jdelim.toString();
       
6      String[] splits = str.split(delim);

7      result.clear();
       for( String s: splits )
       {
8        resultStr.set(s);
         result.add(resultStr);
       }

9      return result;
     }
   }
A Jaql function is implemented by creating a class (1). The class can store any local state (2) for the function; however, the jaql compiler assumes that the function can be called repeatedly with the same arguments and get the same result (i.e., the function has no side-effects). The class has an eval() method (3) that takes JaqlType parameters and returns a JaqlType result. The function should assume that the parameters might be null (4). In this case, a null is simply returned; alternatively, the function could throw an exception if a non-null value is required. In many cases, the JaqlType values need to be converted to another form, e.g., converted from JString to a regular Java String (5). With the inputs processed, the function performs is task (6). This function collects all of the substrings into a JArray (7) of JString values (8), and returns the entire array (9).

Registering and calling split in Jaql

The function name and implementing class are registered with Jaql using registerFunction(). The function can then be invoked like any other function in Jaql. How to invoke Jaql such that it can find the Java class files are described separately.
    registerFunction("split1", "com.acme.extensions.fn.Split1");
    $path = '/home/mystuff/stuff';

    split1($path, "/");
    // [ "", "home", "mystuff", "stuff" ]

    count(split1($path, "/"));
    // 4

    split1($path, "/")[1]; 
    // "home"

Split returning an array via an iterator

Functions that return array can either materialize and return an entire array during eval() as above, or eval() may return an JIterator that returns one element at a time. The advantage of using an iterator is that the entire array need not be stored in memory -- or even computed in many cases. The following example is the sample string split function that returns an iterator:
    package com.acme.extensions.fn;

    import com.ibm.impliance.jaql.JIterator;
    import com.ibm.impliance.jaql.JString;

    public class Split2
    {
1     public JIterator eval(JString jstr, JString jdelim) throws Exception
      {
        if( jstr == null || jdelim == null )
        {
          return null;
        }
        String str = jstr.toString();
        String delim = jdelim.toString();
        
        final String[] splits = str.split(delim);

2       return new JIterator() 
        {
3         int i = 0;
          private JString resultStr = new JString();
          
4         public boolean moveNext()
          {
            if( i >= splits.length )
            {
              return false;
            }
            current = resultStr;
            resultStr.set(splits[i]);
            i++;
            return true;
          }
        };
      }
    }
The return type changed to JIterator from JString (1), and the return value produces an anonymous JIterator subclass (2). When returning an iterator, it is important to be aware that multiple invocations of the function may be active at the same time. Therefore, a new iterator (2) is returned and most of the state is stored inside the iterator (3). JIterator is an abstract class that requires a moveNext() method (4) that sets the current value and returns true, or returns false if there is no next value value. For the query writer, this implmentation of split behaves nearly identically to the previous one. The function registration, invocation, and result are similar to the above:
    registerFunction("split2", "com.acme.extensions.fn.Split2");
    $path = '/home/mystuff/stuff';

    split2($path, "/");
    // [ "", "home", "mystuff", "stuff"]

    count(split2($path, "/"));
    // 4

    split2($path, "/")[1];
    // "home"

Grep Example

The next example is similar to the "grep" command in unix; it returns the substrings that match a regular expression over a list of input strings:
    package com.acme.extensions.fn;

    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    import com.ibm.impliance.jaql.JIterator;
    import com.ibm.impliance.jaql.JString;


    public class Grep
    {
1     public JIterator eval(JString regex, JIterator jstrs) throws Exception
      {
2       return eval(regex, null, jstrs);
      }

3     public JIterator eval(JString regex, JString flags, final JIterator jstrs) throws Exception
      {
        if( regex == null || jstrs == null )
        {
          return null;
        }

        int f = 0;
        boolean global1 = false;
        if( flags != null )
        {
          for( int i = 0 ; i < flags.getLength() ; i++ )
          {
            switch( flags.charAt(i) )
            {
              case 'g': global1 = true; break;
              case 'm': f |= Pattern.MULTILINE; break;
              case 'i': f |= Pattern.CASE_INSENSITIVE | Pattern.UNICODE_CASE; break;
              default: throw new IllegalArgumentException("unknown regex flag: "+(char)flags.charAt(i));
            }
          }
        }
        Pattern pattern = Pattern.compile(regex.toString(), f);

        final Matcher matcher = pattern.matcher("");    
        final boolean global = global1;

        final JString resultStr = new JString();

        return new JIterator(resultStr)
        {
          private boolean needInput = true;

          public boolean moveNext() throws Exception
          {
            while( true )
            {
              if( needInput )
              {
                if( ! jstrs.moveNextNonNull() )
                {
                  return false;
                }
                JString jstr = (JString)jstrs.current(); // could raise a cast error
                matcher.reset(jstr.toString());
              }
              if( matcher.find() )
              {
                resultStr.set(matcher.group());
                needInput = ! global;
                return true;
              }
              needInput = true;
            }
          }
        };
      }
    }
This example shows that a class may have multiple eval() methods (1,3). The current implementation supports overloading only on the number of arguments, not based upon the types of the arguments. In this case, the two argument function (1) is supplying default flags for the three argument function (3). The example also illustrates that the function can take array values using an JIterator. This allows a function to process a large array effeciently. The function is not required to process the entire array, which might allow the system to avoid computing the entire array.
    registerFunction("grep", "com.acme.extensions.fn.Grep");
    $data = [ "a1bxa2b", "a3bxa4b", "a5bxa6b", null, "a7bxa8b" ];

    grep("a\\d*b", $data);
    // [ "a1b", "a3b", "a5b", "a7b" ]

    grep("a\\d*b", null, $data );
    // [ "a1b", "a3b", "a5b", "a7b" ]

    grep("a\\d*b", "g", $data );
    // [ "a1b", "a2b", "a3b", "a4b", "a5b", "a6b", "a7b", "a8b" ]
The first call is to the two argument eval() method, which supplies the a default null value, and therefore, the second call is identical to the first.

Greatest Common Divisor Example

The greatest common divisor (gcd) of a set of integers is the largest positive integer that divides all the numbers without remainder. Therefore, the gcd is a type of "aggregate" function because, like sum, it reduces a set of numbers down to a single number.

Holistic aggregate function

The following example implements gcd for Jaql:
    package com.acme.extensions.fn;

    import com.ibm.impliance.jaql.JIterator;
    import com.ibm.impliance.jaql.JLong;
    import com.ibm.impliance.jaql.JNumber;

    public class GCD1
    {
      private long gcd(long a, long b)
      {
        while( b != 0 )
        {
          long c = b;
          b = a % b;
          a = c;
        }
        return a;  
      }

1     public JLong eval(JIterator nums) throws Exception
      {
2       if( nums == null )
        {
          return null;
        }
3       if( ! nums.moveNextNonNull() )
        {
          return null;
        }
        JNumber n = (JNumber)nums.current();
4       long g = n.longValueExact();
        while( nums.moveNextNonNull() )
        {
          n = (JNumber)nums.current();
          long x = n.longValueExact();
          g = gcd(g,x);
        }
        return new JLong(g);
      }
    }
This function is much like the previous examples; a holistic aggregate function is no different than any other function. The function deals with null values (2) and empty arrays (3) by returning null. This example does show one new point: JaqlType values represent JSON values, but a particular type may have multiple encodings. The JSON number type is represented internally by JNumber, but it is abstract. The two subtypes JLong and JDecimal implement the encodings. The conversion to long (4) uses longValueExact() to convert any JNumber to a long, without loss of precision, or it raises an exception. The return type (1) can be either an abstract class or a concrete class, but parameters should always be the general types.
    registerFunction("gcd1", "com.acme.extensions.fn.GCD1");

    gcd1(null); // null
    gcd1([]); // null
    gcd1(3); // correctly produces cast error: array expected
    gcd1([3]); // 3
    gcd1([0,0]); // 0
    gcd1([3,0]); // 3
    gcd1([0,3]); // 3
    gcd1([17,13]); // 1
    gcd1([12,18]); // 6
    gcd1([36,18]); // 18
    gcd1([36,18,12]); // 6
    gcd1( for($i in 1000 to 2000) if( mod($i,3) == 0 ) [$i * 31] ); // 31*3 = 93

Aggregation using combine

Holistic aggregate functions suffer from a performance problem: Jaql can parallelize a holistic aggregate when there are multiple reducers, but Jaql does not know how to perform partial-aggregation in parallel using a "combiner" in a map-reduce job. The next example implements gcd as a pair-wise function that computes the gcd of two numbers:
    package com.acme.extensions.fn;

    import com.ibm.impliance.jaql.JLong;
    import com.ibm.impliance.jaql.JNumber;


    public class GCD2
    {
      private long gcd(long a, long b)
      {
        while( b != 0 )
        {
          long c = b;
          b = a % b;
          a = c;
        }
        return a;  
      }

      public JLong eval(JNumber x, JNumber y)
      {
        long a = x.longValueExact();
        long b = y.longValueExact();
        long g = gcd(a,b);
        return new JLong(g);
      }
    }
The function is registered and invoked as usual:
    registerFunction("gcd2", "com.acme.extensions.fn.GCD2");

    gcd2("x","y"); // correctly produces error: numbers expected
    gcd2(17,13); // 1
    gcd2(12,18); // 6
We can use the combine expression in Jaql to define an aggregate function that behave like gcd1:
    $gcd = fn($nums) combine( $a,$b in $nums ) gcd2($a,$b);

    $gcd( for($i in 1000 to 2000) if( mod($i,3) == 0 ) [$i * 31] ); // 31*3 = 93
The combine expression implements the iteration that was inside of gcd1. Conceptually, combine will take any two numbers from its input array, evaluate the pairwise combining expression with those two numbers, place the result back into the array, and repeat until the array has one item in it. The promise made is that the combining expression is commutative (gcd2($a,$b) == gcd2($b,$a)) and associative gcd2($a,gcd2($b,$c)) == gcd2(gcd2($a,$b), $c)). In other words, combine may call gcd2 with arbitrary subsets of numbers, or with results from earlier invocations.

We could have used gcd1 in a combine expression by making a list out of the two items:

    $gcd = fn($nums) combine( $a,$b in $nums ) gcd1( [$a,$b] );

    $gcd( for($i in 1000 to 2000) if( mod($i,3) == 0 ) [$i * 31]); // 31*3 = 93
When the data lives in Hadoop's HDFS, Jaql considers using map-reduce to evaluate queries. The following writes a bunch of records into HDFS:
    hdfsWrite('/temp/nums', 
      for( $i in 1 to 100, 
           $j in 1 to 100 )
        [{ a: $i, b: $i * $j }]
    );
The following grouping query uses Hadoop's map-reduce to evaluate the gcd. Because gcd1 is a holistic aggregate function (it requires all of the data before it will produce its result) is run in parallel by each of the reducer tasks:
    registerFunction("gcd1", "com.acme.extensions.fn.GCD1");
    $gcd = fn($nums) gcd1( $nums );

    group( $i in hdfsRead('/temp/nums') by $a = $i.a into $is )
        [{ a: $a, g: $gcd($is[*].b) }];
    // [ {a:1, g:1}, {a:2, g:2}, ..., {a:100, g: 100} ]
The next version also uses Hadoop's map-reduce to evaluate the gcd. Because we are now using a combine expression, gcd is run in parallel by each of the map tasks (using a combiner) to produce partial aggregates and again by the reduce tasks to produce the final aggregation:
    registerFunction("gcd2", "com.acme.extensions.fn.GCD2");
    $gcd = fn($nums) combine( $a,$b in $nums ) gcd2( $a,$b );

    group( $i in hdfsRead('/temp/nums') by $a = $i.a into $is )
        [{ a: $a, g: $gcd($is[*].b) }];
    // [ {a:1, g:1}, {a:2, g:2}, ..., {a:100, g: 100} ]
The explain statement can be used to see how Jaql will evaluate a query. The result is a transformed query that is equivalent to the original query. The transformed query typically uses low-level functions of Jaql and contains many generated variables. (The pretty printer isn't in place yet either...)
    explain 
    group( $i in hdfsRead('/temp/nums') by $a = $i.a into $is )
        [{ a: $a, g: $gcd($is[*].b) }];

    // Cleaned up result:
    stRead(
      mrAggregate( {
         input: { type: "hdfs", location: "/temp/nums" }, 
         output: HadoopTemp(),
         init: fn ($x) [[ $x.a, [$x.b] ]],
         combine: fn ($key, $aggs1, $aggs2) [
               ( $x = $aggs1[0], 
                 $y = $aggs2[0],

                 if( isnull($x) ) $y
                 else if ( isnull($y) ) $x
                 else gcd2($x, $y) 
               )],
         final: fn ($key, $aggs) [{ a:$key, g: $aggs[0] }]
     } ));
The mrAggregate function is a Jaql function that runs map/reduce under the covers, but in a particular way. It is designed to run a several algebraic aggregates without making multiple passes over the group. Instead of using map, combine and reduce functions like the mapReduce function, mrAggregate has init, combine and final functions.

The init function, much like map, filters and transforms input records. It produces a list of pairs of the grouping key ($x.a) and a vector (of length one in this case) of initial partial aggregates for one input item ([$x.b]). To be clear, mrAggregate does not require the partial aggregate to be an array (it can be any value), but rather the compilation of the group expression generates arrays to handle multiple aggregates in a single pass. (We can view the vector itself as a partial aggregate for a single "super-aggregate" which runs all the individual aggregates inside of it.)

The combine function takes the grouping key ($key), two vectors of partial aggregates ($aggs1, $aggs2), and produces a new vector of partial aggregates. In this case, it produces a vector of length one using the gcd2 function on the first element of each input vector.

The final function takes the grouping key ($key), one vector of partial aggregates ($aggs), and produces the final result. The final function can also filter and transform the partial aggregates, so in general it produces zero or more results. In this case, it produces a list of one record that contains the grouping key and the gcd for that group.

When mrAggregate is run using map/reduce, init is evaluated in the map call, combine is called repeatedly during both the combine call and the reduce call, and final is called during the reduce call.

JaqlType Heirarchy

The following JaqlType classes implement the extended JSON types in Jaql:
    JRecord
    JArray
    JBool
    JString
    JNumber

    JBinary
    JDate
    JFunction
The last three are not standard JSON. JSON null values are represented by a Java null. In the current implementation, The only type that is abstract is JNumber with the following encodings:
    JDecimal
    JLong
Remember that any function of a number should handle every encoding. Ideally, the methods on JNumber will hide the details of encodings.

Expect changes in this part of the implementation for the next few releases. In particular, JArray will become abstract very soon with two encodings, one for small fixed-sized arrays and one for large arrays that might need to spill to disk. It is also quite likely that JString will be changed to eliminate the dependency on Hadoop's Text class.

Overview | Java Functions | Extending Data-sources | Running Jaql | Roadmap