66import rx .Subscriber ;
77import rx .Subscription ;
88import rx .functions .Action1 ;
9+ import rx .functions .Action2 ;
910import rx .functions .Actions ;
1011import rx .functions .Func1 ;
1112import rx .functions .Functions ;
1718 *
1819 * @author gscampbell
1920 */
20- public class DebugHook extends RxJavaObservableExecutionHook {
21+ public class DebugHook < C > extends RxJavaObservableExecutionHook {
2122 private final Func1 onNextHook ;
22- private final Action1 <DebugNotification > events ;
23+ private final Func1 <DebugNotification , C > start ;
24+ private final Action1 <C > complete ;
25+ private final Action2 <C , Throwable > error ;
2326
2427 /**
2528 * Creates a new instance of the DebugHook RxJava plug-in that can be passed into
@@ -31,18 +34,26 @@ public class DebugHook extends RxJavaObservableExecutionHook {
3134 * @param events
3235 * This action is invoked as each notification is generated
3336 */
34- public DebugHook (Func1 onNextDataHook , Action1 <DebugNotification > events ) {
37+ public DebugHook (Func1 onNextDataHook , Func1 <DebugNotification , C > start , Action1 <C > complete , Action2 <C , Throwable > error ) {
38+ this .complete = complete ;
39+ this .error = error ;
3540 this .onNextHook = onNextDataHook == null ? Functions .identity () : onNextDataHook ;
36- this .events = events == null ? Actions .empty () : events ;
41+ this .start = ( Func1 < DebugNotification , C >) ( start == null ? Actions .empty () : start ) ;
3742 }
3843
3944 @ Override
40- public <T > OnSubscribe <T > onSubscribeStart (Observable <? extends T > observableInstance , final OnSubscribe <T > f ) {
45+ public <T > OnSubscribe <T > onSubscribeStart (final Observable <? extends T > observableInstance , final OnSubscribe <T > f ) {
4146 return new OnSubscribe <T >() {
4247 @ Override
4348 public void call (Subscriber <? super T > o ) {
44- events .call (DebugNotification .createSubscribe (o , f ));
45- f .call (wrapOutbound (null , o ));
49+ C context = start .call (DebugNotification .createSubscribe (o , observableInstance , f ));
50+ try {
51+ f .call (wrapOutbound (null , o ));
52+ complete .call (context );
53+ }
54+ catch (Throwable e ) {
55+ error .call (context , e );
56+ }
4657 }
4758 };
4859 }
@@ -54,12 +65,7 @@ public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInst
5465
5566 @ Override
5667 public <T > OnSubscribe <T > onCreate (final OnSubscribe <T > f ) {
57- return new OnSubscribe <T >() {
58- @ Override
59- public void call (Subscriber <? super T > o ) {
60- f .call (wrapInbound (null , o ));
61- }
62- };
68+ return new OnCreateWrapper <T >(f );
6369 }
6470
6571 @ Override
@@ -81,19 +87,36 @@ public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
8187 private <R > Subscriber <? super R > wrapOutbound (Operator <? extends R , ?> bind , Subscriber <? super R > o ) {
8288 if (o instanceof DebugSubscriber ) {
8389 if (bind != null )
84- ((DebugSubscriber <R >) o ).setFrom (bind );
90+ ((DebugSubscriber <R , C >) o ).setFrom (bind );
8591 return o ;
8692 }
87- return new DebugSubscriber <R >(onNextHook , events , o , bind , null );
93+ return new DebugSubscriber <R , C >(onNextHook , start , complete , error , o , bind , null );
8894 }
8995
9096 @ SuppressWarnings ("unchecked" )
9197 private <T > Subscriber <? super T > wrapInbound (Operator <?, ? super T > bind , Subscriber <? super T > o ) {
9298 if (o instanceof DebugSubscriber ) {
9399 if (bind != null )
94- ((DebugSubscriber <T >) o ).setTo (bind );
100+ ((DebugSubscriber <T , C >) o ).setTo (bind );
95101 return o ;
96102 }
97- return new DebugSubscriber <T >(onNextHook , events , o , null , bind );
103+ return new DebugSubscriber <T , C >(onNextHook , start , complete , error , o , null , bind );
104+ }
105+
106+ public final class OnCreateWrapper <T > implements OnSubscribe <T > {
107+ private final OnSubscribe <T > f ;
108+
109+ private OnCreateWrapper (OnSubscribe <T > f ) {
110+ this .f = f ;
111+ }
112+
113+ @ Override
114+ public void call (Subscriber <? super T > o ) {
115+ f .call (wrapInbound (null , o ));
116+ }
117+
118+ public OnSubscribe <T > getActual () {
119+ return f ;
120+ }
98121 }
99122}
0 commit comments