Skip to content

Commit 47a0f6e

Browse files
committed
added test for one-time channel
1 parent 1bcd33b commit 47a0f6e

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.4"
2323
//libraryDependencies += "com.typesafe.akka" %% "akka-stream-experimental" % "0.9"
2424

2525
//testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-n", "Now")
26+
//fork in Test := true
27+
//javaOptions in Test += s"""-javaagent:${System.getProperty("user.home")}/.ivy2/local/com.github.rssh/trackedfuture_2.11/0.3/jars/trackedfuture_2.11-assembly.jar"""
2628

2729
version:="0.99.7-SNAPSHOT"
2830

2931

32+
3033
publishMavenStyle := true
3134

3235
publishTo <<= version { (v: String) =>

src/main/scala/gopher/channels/OneTimeChannel.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@ import java.util.concurrent.atomic._
77
/**
88
* channel, in which only one message can be written,
99
* after which it is automatically closed
10+
*
11+
* Writer is not waiting for reader to start.
1012
*/
1113
class OneTimeChannel[T](override val api:GopherAPI) extends Channel[T]
1214
{
13-
val p = Promise[T]()
14-
val readed = new AtomicBoolean(false)
15+
private[this] val p = Promise[T]()
16+
private[this] val readed = new AtomicBoolean(false)
17+
18+
def future = p.future
19+
def promise = p
1520

1621
def cbread[B](f: ContRead[T,B] => Option[ContRead.In[T] => Future[Continuated[B]]],ft: FlowTermination[B]): Unit =
1722
{
@@ -29,11 +34,11 @@ class OneTimeChannel[T](override val api:GopherAPI) extends Channel[T]
2934
def cbwrite[B](f: ContWrite[T,B] => Option[(T, Future[Continuated[B]])],ft: FlowTermination[B]): Unit =
3035
{
3136
if (p.isCompleted) {
32-
throw new ChannelClosedException()
37+
ft.doThrow(new ChannelClosedException())
3338
} else {
3439
f(ContWrite(f,this,ft)) foreach { case (a, next) =>
3540
if (!p.trySuccess(a)) {
36-
throw new ChannelClosedException()
41+
ft.doThrow(throw new ChannelClosedException())
3742
}
3843
api.continue(next,ft)
3944
}
@@ -45,3 +50,11 @@ class OneTimeChannel[T](override val api:GopherAPI) extends Channel[T]
4550
p failure new ChannelClosedException()
4651

4752
}
53+
54+
object OneTimeChannel
55+
{
56+
57+
def apply[A]()(implicit api:GopherAPI): OneTimeChannel[A] =
58+
new OneTimeChannel[A](api)
59+
60+
}

src/test/scala/gopher/channels/MacroSelectSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,16 @@ class MacroSelectSuite extends FunSuite
324324
assert(s1==3 || s1==10)
325325
}
326326

327+
test("one-time channel make") {
328+
import gopherApi._
329+
val ch = gopherApi.make[OneTimeChannel[Int]]()
330+
val f1 = ch.awrite(1)
331+
val f2 = ch.awrite(2)
332+
val x = Await.result(ch.aread, 10 seconds)
333+
val x2 = Await.result(f2.failed, 10 seconds)
334+
assert(x==1)
335+
assert(x2.isInstanceOf[ChannelClosedException])
336+
}
327337

328338
lazy val gopherApi = CommonTestObjects.gopherApi
329339

0 commit comments

Comments
 (0)