1
1
package rx .observers ;
2
2
3
- import java .util .ArrayList ;
4
-
5
3
import rx .Observer ;
6
- import rx .operators .NotificationLite ;
7
4
8
5
/**
9
6
* Enforce single-threaded, serialized, ordered execution of onNext, onCompleted, onError.
@@ -22,137 +19,160 @@ public class SerializedObserver<T> implements Observer<T> {
22
19
23
20
private boolean emitting = false ;
24
21
private boolean terminated = false ;
25
- private ArrayList <Object > queue = new ArrayList <Object >();
26
- private NotificationLite <T > on = NotificationLite .instance ();
22
+ private FastList queue ;
23
+
24
+ private static final int MAX_DRAIN_ITERATION = 1 ;
25
+ private static final Object NULL_SENTINEL = new Object ();
26
+ private static final Object COMPLETE_SENTINEL = new Object ();
27
+
28
+ static final class FastList {
29
+ Object [] array ;
30
+ int size ;
31
+
32
+ public void add (Object o ) {
33
+ int s = size ;
34
+ Object [] a = array ;
35
+ if (a == null ) {
36
+ a = new Object [16 ];
37
+ array = a ;
38
+ } else if (s == a .length ) {
39
+ Object [] array2 = new Object [s + (s >> 2 )];
40
+ System .arraycopy (a , 0 , array2 , 0 , s );
41
+ a = array2 ;
42
+ array = a ;
43
+ }
44
+ a [s ] = o ;
45
+ size = s + 1 ;
46
+ }
47
+ }
48
+
49
+ private static final class ErrorSentinel {
50
+ final Throwable e ;
51
+
52
+ ErrorSentinel (Throwable e ) {
53
+ this .e = e ;
54
+ }
55
+ }
27
56
28
57
public SerializedObserver (Observer <? super T > s ) {
29
58
this .actual = s ;
30
59
}
31
60
32
61
@ Override
33
62
public void onCompleted () {
34
- boolean canEmit = false ;
35
- ArrayList <Object > list = null ;
63
+ FastList list ;
36
64
synchronized (this ) {
37
65
if (terminated ) {
38
66
return ;
39
67
}
40
68
terminated = true ;
41
- if (!emitting ) {
42
- // emit immediately
43
- emitting = true ;
44
- canEmit = true ;
45
- if (queue .size () > 0 ) {
46
- list = queue ; // copy reference
47
- queue = new ArrayList <Object >(); // new version;
48
- }
49
- } else {
50
- // someone else is already emitting so just queue it
51
- queue .add (on .completed ());
52
- }
53
- }
54
-
55
- if (canEmit ) {
56
- // we won the right to emit
57
- try {
58
- drainQueue (list );
59
- actual .onCompleted ();
60
- } finally {
61
- synchronized (this ) {
62
- emitting = false ;
69
+ if (emitting ) {
70
+ if (queue == null ) {
71
+ queue = new FastList ();
63
72
}
73
+ queue .add (COMPLETE_SENTINEL );
74
+ return ;
64
75
}
76
+ emitting = true ;
77
+ list = queue ;
78
+ queue = null ;
65
79
}
80
+ drainQueue (list );
81
+ actual .onCompleted ();
66
82
}
67
83
68
84
@ Override
69
85
public void onError (final Throwable e ) {
70
- boolean canEmit = false ;
71
- ArrayList <Object > list = null ;
86
+ FastList list ;
72
87
synchronized (this ) {
73
88
if (terminated ) {
74
89
return ;
75
90
}
76
91
terminated = true ;
77
- if (!emitting ) {
78
- // emit immediately
79
- emitting = true ;
80
- canEmit = true ;
81
- if (queue .size () > 0 ) {
82
- list = queue ; // copy reference
83
- queue = new ArrayList <Object >(); // new version;
84
- }
85
- } else {
86
- // someone else is already emitting so just queue it ... after eliminating the queue to shortcut
87
- queue .clear ();
88
- queue .add (on .error (e ));
89
- }
90
- }
91
- if (canEmit ) {
92
- // we won the right to emit
93
- try {
94
- drainQueue (list );
95
- actual .onError (e );
96
- } finally {
97
- synchronized (this ) {
98
- emitting = false ;
92
+ if (emitting ) {
93
+ if (queue == null ) {
94
+ queue = new FastList ();
99
95
}
96
+ queue .add (new ErrorSentinel (e ));
97
+ return ;
100
98
}
99
+ emitting = true ;
100
+ list = queue ;
101
+ queue = null ;
101
102
}
103
+ drainQueue (list );
104
+ actual .onError (e );
102
105
}
103
106
104
107
@ Override
105
108
public void onNext (T t ) {
106
- boolean canEmit = false ;
107
- ArrayList < Object > list = null ;
109
+ FastList list ;
110
+
108
111
synchronized (this ) {
109
112
if (terminated ) {
110
113
return ;
111
114
}
112
- if (!emitting ) {
113
- // emit immediately
114
- emitting = true ;
115
- canEmit = true ;
116
- if (queue .size () > 0 ) {
117
- list = queue ; // copy reference
118
- queue = new ArrayList <Object >(); // new version;
115
+ if (emitting ) {
116
+ if (queue == null ) {
117
+ queue = new FastList ();
119
118
}
120
- } else {
121
- // someone else is already emitting so just queue it
122
- queue .add (on .next (t ));
119
+ queue .add (t != null ? t : NULL_SENTINEL );
120
+ return ;
123
121
}
122
+ emitting = true ;
123
+ list = queue ;
124
+ queue = null ;
124
125
}
125
- if (canEmit ) {
126
- // we won the right to emit
127
- try {
126
+
127
+ try {
128
+ int iter = MAX_DRAIN_ITERATION ;
129
+ do {
128
130
drainQueue (list );
129
- actual .onNext (t );
130
- } finally {
131
- synchronized (this ) {
132
- if (terminated ) {
133
- list = queue ; // copy reference
134
- queue = new ArrayList <Object >(); // new version;
135
- } else {
136
- // release this thread
137
- emitting = false ;
138
- canEmit = false ;
131
+ if (iter == MAX_DRAIN_ITERATION ) {
132
+ actual .onNext (t );
133
+ }
134
+ --iter ;
135
+ if (iter > 0 ) {
136
+ synchronized (this ) {
137
+ list = queue ;
138
+ queue = null ;
139
+ }
140
+ if (list == null ) {
141
+ break ;
139
142
}
140
143
}
144
+ } while (iter > 0 );
145
+ } finally {
146
+ synchronized (this ) {
147
+ if (terminated ) {
148
+ list = queue ;
149
+ queue = null ;
150
+ } else {
151
+ emitting = false ;
152
+ list = null ;
153
+ }
141
154
}
142
- }
143
-
144
- // if terminated this will still be true so let's drain the rest of the queue
145
- if (canEmit ) {
146
155
drainQueue (list );
147
156
}
148
157
}
149
158
150
- public void drainQueue (ArrayList < Object > list ) {
151
- if (list == null || list .size () == 0 ) {
159
+ void drainQueue (FastList list ) {
160
+ if (list == null || list .size == 0 ) {
152
161
return ;
153
162
}
154
- for (Object v : list ) {
155
- on .accept (actual , v );
163
+ for (Object v : list .array ) {
164
+ if (v == null ) {
165
+ break ;
166
+ }
167
+ if (v == NULL_SENTINEL ) {
168
+ actual .onNext (null );
169
+ } else if (v == COMPLETE_SENTINEL ) {
170
+ actual .onCompleted ();
171
+ } else if (v .getClass () == ErrorSentinel .class ) {
172
+ actual .onError (((ErrorSentinel ) v ).e );
173
+ } else {
174
+ actual .onNext ((T ) v );
175
+ }
156
176
}
157
177
}
158
- }
178
+ }
0 commit comments