16
16
17
17
package com .rabbitmq .utility ;
18
18
19
+ import java .util .concurrent .CountDownLatch ;
19
20
import java .util .concurrent .TimeoutException ;
21
+ import java .util .concurrent .atomic .AtomicReference ;
22
+
23
+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
20
24
21
25
/**
22
26
* Simple one-shot IPC mechanism. Essentially a one-place buffer that cannot be emptied once filled.
23
27
*/
24
28
public class BlockingCell <T > {
25
- /** Indicator of not-yet-filledness */
26
- private boolean _filled = false ;
27
29
28
- /** Will be null until a value is supplied, and possibly still then. */
29
- private T _value ;
30
+ private final AtomicReference < T > value = new AtomicReference <>();
31
+ private final CountDownLatch latch = new CountDownLatch ( 1 ) ;
30
32
31
33
private static final long NANOS_IN_MILLI = 1000L * 1000L ;
32
34
33
35
private static final long INFINITY = -1 ;
34
36
35
- /** Instantiate a new BlockingCell waiting for a value of the specified type. */
36
- public BlockingCell () {
37
- // no special handling required in default constructor
38
- }
39
-
40
37
/**
41
38
* Wait for a value, and when one arrives, return it (without clearing it). If there's already a value present, there's no need to wait - the existing value
42
39
* is returned.
43
40
* @return the waited-for value
44
41
*
45
42
* @throws InterruptedException if this thread is interrupted
46
43
*/
47
- public synchronized T get () throws InterruptedException {
48
- while (!_filled ) {
49
- wait ();
50
- }
51
- return _value ;
44
+ public T get () throws InterruptedException {
45
+ this .latch .await ();
46
+ return this .value .get ();
52
47
}
53
48
54
49
/**
@@ -60,30 +55,27 @@ public synchronized T get() throws InterruptedException {
60
55
* @return the waited-for value
61
56
* @throws InterruptedException if this thread is interrupted
62
57
*/
63
- public synchronized T get (long timeout ) throws InterruptedException , TimeoutException {
58
+ public T get (long timeout ) throws InterruptedException , TimeoutException {
64
59
if (timeout == INFINITY ) return get ();
65
60
66
61
if (timeout < 0 ) {
67
62
throw new IllegalArgumentException ("Timeout cannot be less than zero" );
68
63
}
69
64
70
- long now = System .nanoTime () / NANOS_IN_MILLI ;
71
- long maxTime = now + timeout ;
72
- while (!_filled && (now = (System .nanoTime () / NANOS_IN_MILLI )) < maxTime ) {
73
- wait (maxTime - now );
74
- }
65
+ boolean done = this .latch .await (10 , MILLISECONDS );
75
66
76
- if (!_filled )
67
+ if (!done ) {
77
68
throw new TimeoutException ();
69
+ }
78
70
79
- return _value ;
71
+ return this . value . get () ;
80
72
}
81
73
82
74
/**
83
75
* As get(), but catches and ignores InterruptedException, retrying until a value appears.
84
76
* @return the waited-for value
85
77
*/
86
- public synchronized T uninterruptibleGet () {
78
+ public T uninterruptibleGet () {
87
79
boolean wasInterrupted = false ;
88
80
try {
89
81
while (true ) {
@@ -128,21 +120,19 @@ public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
128
120
Thread .currentThread ().interrupt ();
129
121
}
130
122
}
131
-
132
123
throw new TimeoutException ();
133
124
}
134
125
135
126
/**
136
127
* Store a value in this BlockingCell, throwing {@link IllegalStateException} if the cell already has a value.
137
128
* @param newValue the new value to store
138
129
*/
139
- public synchronized void set (T newValue ) {
140
- if (_filled ) {
130
+ public void set (T newValue ) {
131
+ if (this .value .compareAndSet (null , newValue )) {
132
+ this .latch .countDown ();
133
+ } else {
141
134
throw new IllegalStateException ("BlockingCell can only be set once" );
142
135
}
143
- _value = newValue ;
144
- _filled = true ;
145
- notifyAll ();
146
136
}
147
137
148
138
/**
@@ -151,10 +141,11 @@ public synchronized void set(T newValue) {
151
141
* @param newValue the new value to store
152
142
*/
153
143
public synchronized boolean setIfUnset (T newValue ) {
154
- if (_filled ) {
144
+ if (this .value .compareAndSet (null , newValue )) {
145
+ this .latch .countDown ();
146
+ return true ;
147
+ } else {
155
148
return false ;
156
149
}
157
- set (newValue );
158
- return true ;
159
150
}
160
151
}
0 commit comments