Skip to content

Commit 1bcd33b

Browse files
committed
added one-time channel
1 parent 4434eb1 commit 1bcd33b

File tree

1 file changed

+47
-0
lines changed

1 file changed

+47
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package gopher.channels
2+
3+
import scala.concurrent._
4+
import gopher._
5+
import java.util.concurrent.atomic._
6+
7+
/**
8+
* channel, in which only one message can be written,
9+
* after which it is automatically closed
10+
*/
11+
class OneTimeChannel[T](override val api:GopherAPI) extends Channel[T]
12+
{
13+
val p = Promise[T]()
14+
val readed = new AtomicBoolean(false)
15+
16+
def cbread[B](f: ContRead[T,B] => Option[ContRead.In[T] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit =
17+
{
18+
p.future.foreach{ a =>
19+
f(ContRead(f,this,ft)) foreach { g =>
20+
if (readed.compareAndSet(false,true)) {
21+
api.continue(g(ContRead.Value(a)),ft)
22+
} else{
23+
api.continue(g(ContRead.Skip),ft)
24+
}
25+
}
26+
}(api.executionContext)
27+
}
28+
29+
def cbwrite[B](f: ContWrite[T,B] => Option[(T, Future[Continuated[B]])],ft: FlowTermination[B]): Unit =
30+
{
31+
if (p.isCompleted) {
32+
throw new ChannelClosedException()
33+
} else {
34+
f(ContWrite(f,this,ft)) foreach { case (a, next) =>
35+
if (!p.trySuccess(a)) {
36+
throw new ChannelClosedException()
37+
}
38+
api.continue(next,ft)
39+
}
40+
}
41+
}
42+
43+
44+
def close(): Unit =
45+
p failure new ChannelClosedException()
46+
47+
}

0 commit comments

Comments
 (0)