"""Example showing how to merge multiple remote data streams. """ # Slightly modified version of: # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509 from __future__ import print_function import heapq from IPython.parallel.error import RemoteError def mergesort(list_of_lists, key=None): """ Perform an N-way merge operation on sorted lists. @param list_of_lists: (really iterable of iterable) of sorted elements (either by naturally or by C{key}) @param key: specify sort key function (like C{sort()}, C{sorted()}) Yields tuples of the form C{(item, iterator)}, where the iterator is the built-in list iterator or something you pass in, if you pre-generate the iterators. This is a stable merge; complexity O(N lg N) Examples:: >>> print list(mergesort([[1,2,3,4], ... [2,3.25,3.75,4.5,6,7], ... [2.625,3.625,6.625,9]])) [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9] # note stability >>> print list(mergesort([[1,2,3,4], ... [2,3.25,3.75,4.5,6,7], ... [2.625,3.625,6.625,9]], ... key=int)) [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9] >>> print list(mergesort([[4, 3, 2, 1], ... [7, 6, 4.5, 3.75, 3.25, 2], ... [9, 6.625, 3.625, 2.625]], ... key=lambda x: -x)) [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1] """ heap = [] for i, itr in enumerate(iter(pl) for pl in list_of_lists): try: item = itr.next() if key: toadd = (key(item), i, item, itr) else: toadd = (item, i, itr) heap.append(toadd) except StopIteration: pass heapq.heapify(heap) if key: while heap: _, idx, item, itr = heap[0] yield item try: item = itr.next() heapq.heapreplace(heap, (key(item), idx, item, itr) ) except StopIteration: heapq.heappop(heap) else: while heap: item, idx, itr = heap[0] yield item try: heapq.heapreplace(heap, (itr.next(), idx, itr)) except StopIteration: heapq.heappop(heap) def remote_iterator(view,name): """Return an iterator on an object living on a remote engine. """ view.execute('it%s=iter(%s)'%(name,name), block=True) while True: try: result = view.apply_sync(lambda x: x.next(), Reference('it'+name)) # This causes the StopIteration exception to be raised. except RemoteError as e: if e.ename == 'StopIteration': raise StopIteration else: raise e else: yield result # Main, interactive testing if __name__ == '__main__': from IPython.parallel import Client, Reference rc = Client() view = rc[:] print('Engine IDs:', rc.ids) # Make a set of 'sorted datasets' a0 = range(5,20) a1 = range(10) a2 = range(15,25) # Now, imagine these had been created in the remote engines by some long # computation. In this simple example, we just send them over into the # remote engines. They will all be called 'a' in each engine. rc[0]['a'] = a0 rc[1]['a'] = a1 rc[2]['a'] = a2 # And we now make a local object which represents the remote iterator aa0 = remote_iterator(rc[0],'a') aa1 = remote_iterator(rc[1],'a') aa2 = remote_iterator(rc[2],'a') # Let's merge them, both locally and remotely: print('Merge the local datasets:') print(list(mergesort([a0,a1,a2]))) print('Locally merge the remote sets:') print(list(mergesort([aa0,aa1,aa2])))