Skip to content

Commit a2162f2

Browse files
Merge branch 'debug' of github.com:abersnaze/RxJava into rxjava-debug
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 485c22a + 35e4507 commit a2162f2

File tree

13 files changed

+641
-5
lines changed

13 files changed

+641
-5
lines changed
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apply plugin: 'osgi'
2+
3+
sourceCompatibility = JavaVersion.VERSION_1_6
4+
targetCompatibility = JavaVersion.VERSION_1_6
5+
6+
dependencies {
7+
compile project(':rxjava-core')
8+
testCompile project(":rxjava-core").sourceSets.test.output
9+
provided 'junit:junit-dep:4.10'
10+
provided 'org.mockito:mockito-core:1.8.5'
11+
}
12+
13+
javadoc {
14+
options {
15+
doclet = "org.benjchristensen.doclet.DocletExclude"
16+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
17+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
18+
windowTitle = "RxJava Javadoc ${project.version}"
19+
}
20+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
21+
}
22+
23+
jar {
24+
manifest {
25+
name = 'rxjava-debug'
26+
instruction 'Bundle-Vendor', 'Netflix'
27+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
28+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package rx.operators;
2+
3+
import rx.Observer;
4+
import rx.Subscriber;
5+
import rx.plugins.DebugNotification;
6+
import rx.util.functions.Action1;
7+
import rx.util.functions.Func1;
8+
9+
public final class DebugSubscriber<T> extends Subscriber<T> {
10+
private final Func1<T, T> onNextHook;
11+
final Action1<DebugNotification> events;
12+
final Observer<? super T> o;
13+
Operator<T, ?> from = null;
14+
Operator<?, T> to = null;
15+
16+
public DebugSubscriber(
17+
Func1<T, T> onNextHook,
18+
Action1<DebugNotification> _events,
19+
Subscriber<? super T> _o,
20+
Operator<T, ?> _out,
21+
Operator<?, T> _in) {
22+
super(_o);
23+
this.events = _events;
24+
this.o = _o;
25+
this.onNextHook = onNextHook;
26+
this.from = _out;
27+
this.to = _in;
28+
this.add(new DebugSubscription<T>(this));
29+
}
30+
31+
@Override
32+
public void onCompleted() {
33+
events.call(DebugNotification.createOnCompleted(o, from, to));
34+
o.onCompleted();
35+
}
36+
37+
@Override
38+
public void onError(Throwable e) {
39+
events.call(DebugNotification.createOnError(o, from, e, to));
40+
o.onError(e);
41+
}
42+
43+
@Override
44+
public void onNext(T t) {
45+
events.call(DebugNotification.createOnNext(o, from, t, to));
46+
o.onNext(onNextHook.call(t));
47+
}
48+
49+
public Operator<T, ?> getFrom() {
50+
return from;
51+
}
52+
53+
public void setFrom(Operator<T, ?> op) {
54+
this.from = op;
55+
}
56+
57+
public Operator<?, T> getTo() {
58+
return to;
59+
}
60+
61+
public void setTo(Operator<?, T> op) {
62+
this.to = op;
63+
}
64+
65+
public Observer<? super T> getActual() {
66+
return o;
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package rx.operators;
2+
3+
import rx.Subscription;
4+
import rx.plugins.DebugNotification;
5+
6+
final class DebugSubscription<T> implements Subscription {
7+
private final DebugSubscriber<T> debugObserver;
8+
9+
DebugSubscription(DebugSubscriber<T> debugObserver) {
10+
this.debugObserver = debugObserver;
11+
}
12+
13+
@Override
14+
public void unsubscribe() {
15+
debugObserver.events.call(DebugNotification.<T> createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to));
16+
debugObserver.unsubscribe();
17+
}
18+
19+
@Override
20+
public boolean isUnsubscribed() {
21+
return debugObserver.isUnsubscribed();
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package rx.plugins;
2+
3+
import rx.Observable;
4+
import rx.Observable.OnSubscribe;
5+
import rx.Subscriber;
6+
import rx.Subscription;
7+
import rx.operators.DebugSubscriber;
8+
import rx.operators.Operator;
9+
import rx.util.functions.Action1;
10+
import rx.util.functions.Actions;
11+
import rx.util.functions.Func1;
12+
import rx.util.functions.Functions;
13+
14+
/**
15+
* Implements hooks into the {@link Observable} chain to emit a detailed account of all the events
16+
* that happened.
17+
*
18+
* @author gscampbell
19+
*/
20+
public class DebugHook extends RxJavaObservableExecutionHook {
21+
private final Func1 onNextHook;
22+
private final Action1<DebugNotification> events;
23+
24+
/**
25+
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
26+
* {@link RxJavaPlugins} registerObservableExecutionHook(hook) method.
27+
*
28+
* @param onNextDataHook
29+
* all of the onNext values are passed through this function to allow for
30+
* manipulation of the values
31+
* @param events
32+
* This action is invoked as each notification is generated
33+
*/
34+
public DebugHook(Func1 onNextDataHook, Action1<DebugNotification> events) {
35+
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
36+
this.events = events == null ? Actions.empty() : events;
37+
}
38+
39+
@Override
40+
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
41+
return new OnSubscribe<T>() {
42+
@Override
43+
public void call(Subscriber<? super T> o) {
44+
events.call(DebugNotification.createSubscribe(o, f));
45+
f.call(wrapOutbound(null, o));
46+
}
47+
};
48+
}
49+
50+
@Override
51+
public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInstance, Subscription subscription) {
52+
return subscription;
53+
}
54+
55+
@Override
56+
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
57+
return new OnSubscribe<T>() {
58+
@Override
59+
public void call(Subscriber<? super T> o) {
60+
f.call(wrapInbound(null, o));
61+
}
62+
};
63+
}
64+
65+
@Override
66+
public <T, R> Operator<R, T> onLift(final Operator<R, T> bind) {
67+
return new Operator<R, T>() {
68+
@Override
69+
public Subscriber<? super T> call(final Subscriber<? super R> o) {
70+
return wrapInbound(bind, bind.call(wrapOutbound(bind, o)));
71+
}
72+
};
73+
}
74+
75+
@Override
76+
public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
77+
return s;
78+
}
79+
80+
@SuppressWarnings("unchecked")
81+
private <R> Subscriber<? super R> wrapOutbound(Operator<R, ?> bind, Subscriber<? super R> o) {
82+
if (o instanceof DebugSubscriber) {
83+
if (bind != null)
84+
((DebugSubscriber<R>) o).setFrom(bind);
85+
return o;
86+
}
87+
return new DebugSubscriber<R>(onNextHook, events, o, bind, null);
88+
}
89+
90+
@SuppressWarnings("unchecked")
91+
private <T> Subscriber<? super T> wrapInbound(Operator<?, T> bind, Subscriber<? super T> o) {
92+
if (o instanceof DebugSubscriber) {
93+
if (bind != null)
94+
((DebugSubscriber<T>) o).setTo(bind);
95+
return o;
96+
}
97+
return new DebugSubscriber<T>(onNextHook, events, o, null, bind);
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package rx.plugins;
2+
3+
import rx.Notification;
4+
import rx.Observable.OnSubscribe;
5+
import rx.Observer;
6+
import rx.observers.SafeSubscriber;
7+
import rx.operators.DebugSubscriber;
8+
import rx.operators.Operator;
9+
import rx.plugins.DebugNotification.Kind;
10+
11+
public class DebugNotification<T> {
12+
public static enum Kind {
13+
OnNext, OnError, OnCompleted, Subscribe, Unsubscribe
14+
}
15+
16+
private final OnSubscribe<T> source;
17+
private final Operator<T, ?> from;
18+
private final Kind kind;
19+
private final Notification<T> notification;
20+
private final Operator<?, T> to;
21+
private final long nanoTime;
22+
private final long threadId;
23+
private Observer o;
24+
25+
public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
26+
Operator<?, T> to = null;
27+
Operator<T, ?> from = null;
28+
if (o instanceof DebugSubscriber) {
29+
to = ((DebugSubscriber<T>) o).getTo();
30+
from = ((DebugSubscriber<T>) o).getFrom();
31+
o = ((DebugSubscriber) o).getActual();
32+
}
33+
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
34+
}
35+
36+
public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<T, ?> from, T t, Operator<?, T> to) {
37+
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
38+
}
39+
40+
public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<T, ?> from, Throwable e, Operator<?, T> to) {
41+
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
42+
}
43+
44+
public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
45+
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
46+
}
47+
48+
public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<T, ?> from, Operator<?, T> to) {
49+
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
50+
}
51+
52+
private DebugNotification(Observer o, Operator<T, ?> from, Kind kind, Notification<T> notification, Operator<?, T> to, OnSubscribe<T> source) {
53+
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
54+
this.from = from;
55+
this.kind = kind;
56+
this.notification = notification;
57+
this.to = to;
58+
this.source = source;
59+
this.nanoTime = System.nanoTime();
60+
this.threadId = Thread.currentThread().getId();
61+
}
62+
63+
public Operator<T, ?> getFrom() {
64+
return from;
65+
}
66+
67+
public Notification<T> getNotification() {
68+
return notification;
69+
}
70+
71+
public Operator<?, T> getTo() {
72+
return to;
73+
}
74+
75+
public long getNanoTime() {
76+
return nanoTime;
77+
}
78+
79+
public long getThreadId() {
80+
return threadId;
81+
}
82+
83+
public Kind getKind() {
84+
return kind;
85+
}
86+
87+
@Override
88+
public String toString() {
89+
final StringBuilder s = new StringBuilder("{");
90+
s.append(" \"nano\": ").append(nanoTime);
91+
s.append(", \"thread\": ").append(threadId);
92+
s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\"");
93+
s.append(", \"type\": \"").append(kind).append("\"");
94+
if (notification != null) {
95+
if (notification.hasValue())
96+
s.append(", \"value\": \"").append(notification.getValue()).append("\"");
97+
if (notification.hasThrowable())
98+
s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
99+
}
100+
if (source != null)
101+
s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\"");
102+
if (from != null)
103+
s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\"");
104+
if (to != null)
105+
s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\"");
106+
s.append("}");
107+
return s.toString();
108+
}
109+
}

0 commit comments

Comments
 (0)