package com.rxjava2.android.samples.ui.operators; import android.os.Bundle; import android.util.Log; import android.view.View; import android.widget.Button; import android.widget.TextView; import com.rxjava2.android.samples.R; import com.rxjava2.android.samples.utils.AppConstant; import androidx.appcompat.app.AppCompatActivity; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.observables.ConnectableObservable; import io.reactivex.subjects.PublishSubject; /** * Created by amitshekhar on 27/08/16. */ public class ReplayExampleActivity extends AppCompatActivity { private static final String TAG = ReplayExampleActivity.class.getSimpleName(); Button btn; TextView textView; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_example); btn = findViewById(R.id.btn); textView = findViewById(R.id.textView); btn.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { doSomeWork(); } }); } /* Using replay operator, replay ensure that all observers see the same sequence * of emitted items, even if they subscribe after the Observable has begun emitting items */ private void doSomeWork() { PublishSubject source = PublishSubject.create(); ConnectableObservable connectableObservable = source.replay(3); // bufferSize = 3 to retain 3 values to replay connectableObservable.connect(); // connecting the connectableObservable connectableObservable.subscribe(getFirstObserver()); source.onNext(1); source.onNext(2); source.onNext(3); source.onNext(4); source.onComplete(); /* * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay */ connectableObservable.subscribe(getSecondObserver()); } private Observer getFirstObserver() { return new Observer() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, " First onSubscribe : " + d.isDisposed()); } @Override public void onNext(Integer value) { textView.append(" First onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" First onError : " + e.getMessage()); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" First onComplete"); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " First onComplete"); } }; } private Observer getSecondObserver() { return new Observer() { @Override public void onSubscribe(Disposable d) { textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed()); Log.d(TAG, " Second onSubscribe : " + d.isDisposed()); textView.append(AppConstant.LINE_SEPARATOR); } @Override public void onNext(Integer value) { textView.append(" Second onNext : value : " + value); textView.append(AppConstant.LINE_SEPARATOR); Log.d(TAG, " Second onNext value : " + value); } @Override public void onError(Throwable e) { textView.append(" Second onError : " + e.getMessage()); Log.d(TAG, " Second onError : " + e.getMessage()); } @Override public void onComplete() { textView.append(" Second onComplete"); Log.d(TAG, " Second onComplete"); } }; } }