package com.packtpub.reactive.chapter01; import java.util.concurrent.CountDownLatch; import java.util.regex.Matcher; import java.util.regex.Pattern; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.functions.Func1; import rx.functions.Func2; import rx.observables.ConnectableObservable; import rx.schedulers.Schedulers; import com.packtpub.reactive.common.CreateObservable; import com.packtpub.reactive.common.Program; /** * A demonstration of how to implement a sum that updates automatically when any of its collectors changes. * * @author meddle */ public class ReactiveSumV1 implements Program { /** * The sum is just an Observer, which subscribes to a stream created by combining 'a' and 'b', via summing. * * @author meddle */ public static final class ReactiveSum implements Observer { private CountDownLatch latch = new CountDownLatch(1); private double sum; private Subscription subscription = null; public ReactiveSum(Observable a, Observable b) { this.sum = 0; subscribe(a, b); } private void subscribe(Observable a, Observable b) { // combineLatest creates an Observable, sending notifications on changes of either of its sources. // This notifications are formed using a Func2. this.subscription = Observable.combineLatest(a, b, new Func2() { public Double call(Double a, Double b) { return a + b; } }).subscribeOn(Schedulers.io()).subscribe(this); } public void unsubscribe() { this.subscription.unsubscribe(); this.latch.countDown(); } public void onCompleted() { System.out.println("Exiting last sum was : " + this.sum); this.latch.countDown(); } public void onError(Throwable e) { System.err.println("Got an error!"); e.printStackTrace(); } public void onNext(Double sum) { this.sum = sum; System.out.println("update : a + b = " + sum); } public CountDownLatch getLatch() { return latch; } } /** * The Observable returned by this method, only reacts to values in the form * = or : . * It emits the . */ public static Observable varStream(final String varName, Observable input) { final Pattern pattern = Pattern.compile("^\\s*" + varName + "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)$"); return input.map(new Func1() { public Matcher call(String str) { return pattern.matcher(str); } }).filter(new Func1() { public Boolean call(Matcher matcher) { return matcher.matches() && matcher.group(1) != null; } }).map(new Func1() { public String call(Matcher matcher) { return matcher.group(1); } }).filter(new Func1() { public Boolean call(String str) { return str != null; } }).map(new Func1() { public Double call(String str) { return Double.parseDouble(str); } }); } public String name() { return "Reactive Sum, version 1"; } public void run() { ConnectableObservable input = CreateObservable.from(System.in); Observable a = varStream("a", input); Observable b = varStream("b", input); ReactiveSum sum = new ReactiveSum(a, b); input.connect(); try { sum.getLatch().await(); } catch (InterruptedException e) {} } @Override public int chapter() { return 1; } /** * Here the input is executed on a separate thread, so we block the current one until it sends * a `completed` notification. */ public static void main(String[] args) { System.out.println(); System.out.println("Reacitve Sum. Type 'a: ' and 'b: ' to try it."); new ReactiveSumV1().run(); } }