From ad50c6e7b53e9a4cae6e5f3130585ba301e0de2c Mon Sep 17 00:00:00 2001 From: spamegg Date: Mon, 9 Sep 2024 15:26:40 +0300 Subject: [PATCH] ch08 done! - no idea what's going on --- README.md | 9 +- data.txt | 4 + src/main/scala/ch08/common/Pipe.scala | 8 ++ src/main/scala/ch08/common/otherPipes.scala | 10 +- .../scala/ch08/filePipe/FileOutputPipe.scala | 19 ++-- src/main/scala/ch08/filePipe/examples.scala | 100 +++++++----------- src/main/scala/ch08/syncPipe/main.scala | 17 +++ 7 files changed, 89 insertions(+), 78 deletions(-) create mode 100644 data.txt diff --git a/README.md b/README.md index eeb72aa..08cb041 100644 --- a/README.md +++ b/README.md @@ -285,7 +285,14 @@ which does nothing. There is also val pid = unistd.getpid() ``` -which is never used. There are lots of other examples. There are also many unused / unnecessary imports in the files. Whenever I ran into these, I removed them. +which is never used. There are also illegal things like: + +```scala +val p = SyncPipe(0) +val p = FilePipe(c"./data.txt") +``` + +There are lots of other examples. There are also many unused / unnecessary imports in the files. Whenever I ran into these, I removed them. There is also a lot of code duplication, I suppose, to make each individual file "runnable" by itself. I removed redundant code by adding package declarations, then importing the duplicated code from other files instead. diff --git a/data.txt b/data.txt new file mode 100644 index 0000000..cf0b8fd --- /dev/null +++ b/data.txt @@ -0,0 +1,4 @@ +foo +1 +2 +-4095 diff --git a/src/main/scala/ch08/common/Pipe.scala b/src/main/scala/ch08/common/Pipe.scala index 3651cd3..348b5fd 100644 --- a/src/main/scala/ch08/common/Pipe.scala +++ b/src/main/scala/ch08/common/Pipe.scala @@ -21,6 +21,14 @@ trait Pipe[T, U]: def mapConcat[V](g: U => Seq[V]): Pipe[U, V] = addDestination(ConcatPipe(g)) def mapOption[V](g: U => Option[V]): Pipe[U, V] = addDestination(OptionPipe(g)) + // not sure about this one, the book has Pipe[T] which doesn't exist. + def filter(f: U => Boolean): Pipe[U, U] = addDestination( + mapOption: u => + f(u) match + case true => Some(u) + case false => None + ) + def mapAsync[V](g: U => Future[V])(using ec: ExecutionContext): Pipe[U, V] = addDestination(AsyncPipe(g)) diff --git a/src/main/scala/ch08/common/otherPipes.scala b/src/main/scala/ch08/common/otherPipes.scala index 824cb21..af9d86f 100644 --- a/src/main/scala/ch08/common/otherPipes.scala +++ b/src/main/scala/ch08/common/otherPipes.scala @@ -39,13 +39,13 @@ case class Tokenizer(separator: String) extends Pipe[String, String]: def scan(input: String): Seq[String] = println(s"scanning: '$input'") - buffer = buffer + input - var o: Seq[String] = Seq() + buffer += input + var o = Seq[String]() while buffer.contains(separator) do - val space_position = buffer.indexOf(separator) - val word = buffer.substring(0, space_position) + val spacePosition = buffer.indexOf(separator) + val word = buffer.substring(0, spacePosition) o = o :+ word - buffer = buffer.substring(space_position + 1) + buffer = buffer.substring(spacePosition + 1) o override def feed(input: String): Unit = diff --git a/src/main/scala/ch08/filePipe/FileOutputPipe.scala b/src/main/scala/ch08/filePipe/FileOutputPipe.scala index 6985d8c..f6343e9 100644 --- a/src/main/scala/ch08/filePipe/FileOutputPipe.scala +++ b/src/main/scala/ch08/filePipe/FileOutputPipe.scala @@ -18,10 +18,10 @@ case class FileOutputPipe(fd: Int, serial: Int, async: Boolean) override def feed(input: String): Unit = val outputSize = input.size - val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] + val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] // writeCB frees - val outputBuffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] - outputBuffer._1 = malloc(outputSize.toUSize) // 0.5 + val outputBuffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] // freed where? + outputBuffer._1 = malloc(outputSize) Zone: val outputString = toCString(input) strncpy(outputBuffer._1, outputString, outputSize.toUSize) // 0.5 @@ -29,12 +29,13 @@ case class FileOutputPipe(fd: Int, serial: Int, async: Boolean) outputBuffer._2 = outputSize.toUSize // 0.5 !req = outputBuffer.asInstanceOf[Ptr[Byte]] + // presumably, this frees outputBuffer uv_fs_write(ch07.EventLoop.loop, req, fd, outputBuffer, 1, offset, writeCB) offset += outputSize override def done(): Unit = - val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] - uv_fs_close(ch07.EventLoop.loop, req, fd, null) + val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] // freed where? + uv_fs_close(ch07.EventLoop.loop, req, fd, null) // here? FileOutputPipe.activeStreams -= serial object FileOutputPipe: @@ -55,10 +56,10 @@ object FileOutputPipe: val writeCB = CFuncPtr1.fromScalaFunction[FSReq, Unit]: (req: FSReq) => println("write completed") - val resp_buffer = (!req).asInstanceOf[Ptr[Buffer]] - stdlib.free(resp_buffer._1) - stdlib.free(resp_buffer.asInstanceOf[Ptr[Byte]]) - stdlib.free(req.asInstanceOf[Ptr[Byte]]) + val respBuffer = (!req).asInstanceOf[Ptr[Buffer]] + stdlib.free(respBuffer._1) + stdlib.free(respBuffer.asInstanceOf[Ptr[Byte]]) + stdlib.free(req.asInstanceOf[Ptr[Byte]]) // it's freed here! // def onShutdown(shutdownReq: ShutdownReq, status: Int): Unit = // val client = (!shutdownReq).cast[PipeHandle] diff --git a/src/main/scala/ch08/filePipe/examples.scala b/src/main/scala/ch08/filePipe/examples.scala index 0b96aa1..d2f9673 100644 --- a/src/main/scala/ch08/filePipe/examples.scala +++ b/src/main/scala/ch08/filePipe/examples.scala @@ -2,13 +2,16 @@ package ch08 package filePipe package examples -import scalanative.unsafe.CQuote -import scala.util.Try +import scalanative.unsafe.{CQuote, CString} +import scala.util.{Try, Success, Failure} import ch07.LibUV.*, ch07.LibUVConstants.* +import ch07.examples.ExecutionContext +import ch07.EventLoop @main def fileInputPipeExample: Unit = - val p = FilePipe(c"./data.txt") + val path = c"../data.txt" // replace this with your own path + val p = FilePipe(path) .map: d => println(s"consumed $d") val parsed = Try(d.toInt) @@ -22,10 +25,7 @@ def fileInputPipeExample: Unit = @main def fileOutputPipeExample: Unit = println("hello!") - // val p = SyncPipe(0) - val p = FilePipe(c"./data.txt") - - val q = p + val p = FilePipe(c"../data.txt") .map: d => println(s"consumed $d") val parsed = Try(d.toInt) @@ -36,63 +36,37 @@ def fileOutputPipeExample: Unit = uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) println("done") -// object Stuff: -// import filePipe.* -// import ch07.LibUV.*, ch07.LibUVConstants.* - -// def filter[T](f: T => Boolean): Pipe[T] = -// addDestination( -// mapOption: t => -// f(t) match -// case true => Some(t) -// case false => None -// ) - -// @main -// def stuff: Unit = -// val p1: Pipe[String, String] = ??? -// var counter = 0 -// p1.map: i => -// counter += 1 -// i - -// // ... -// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) -// println(s"saw $counter elements") - -// val p2: Pipe[String] = ??? -// val c = p2.addDestination(Counter()) -// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) -// println(s"saw ${c.counter} elements") +@main +def asyncPipeExample: Unit = + val p4: Pipe[String, CString] = ??? + p4.mapAsync(url => ch07.Curl.get(url))(using EventLoop) + .map(response => println(s"got back result: $response")) -// val p3: Pipe[String] = ??? -// p3.mapConcat(content => content.split("\n")) -// .mapConcat(line => line.split(" ")) -// .map(word => println(s"saw word: ${word}")) +@main +def statefulProcessorExample: Unit = + val p1: Pipe[String, String] = ??? + var counter = 0 + p1.map: i => + counter += 1 + i + // ... + uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) + println(s"saw $counter elements") -// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) -// println(s"saw ${c.counter} elements") +@main +def counterSinkExample: Unit = + // val p2: Pipe[String, String] = ??? // not sure how to make this work. + // val c = p2.addDestination(CounterSink()) + val c = CounterSink() + uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) + println(s"saw ${c.counter} elements") -// SyncPipe(0) -// .map(d => -// println(s"consumed $d") -// d -// ) -// .map(d => -// val parsed = Try { -// d.toInt -// } -// ) -// .filter { -// case Success(i) => -// println(s"saw number $i") -// true -// case Failure(f) => -// println(s"error: $f") -// false -// } -// // ... +@main +def tokenizerExample: Unit = + val p3: Pipe[String, String] = ??? + p3.mapConcat(content => content.split("\n")) + .mapConcat(line => line.split(" ")) + .map(word => println(s"saw word: ${word}")) -// val p4: Pipe[String] = ??? -// p4.mapAsync(url => Curl.get(url)) -// .map(response => println(s"got back result: $response")) + uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT) + // println(s"saw ${c.counter} elements") diff --git a/src/main/scala/ch08/syncPipe/main.scala b/src/main/scala/ch08/syncPipe/main.scala index 590d67f..cb09561 100644 --- a/src/main/scala/ch08/syncPipe/main.scala +++ b/src/main/scala/ch08/syncPipe/main.scala @@ -4,6 +4,23 @@ package syncPipe import scala.util.{Try, Success, Failure} import ch07.LibUV.uv_run, ch07.LibUVConstants.UV_RUN_DEFAULT +@main +def pipeChainFilterExample: Unit = + SyncPipe(0) + .map: d => + println(s"consumed $d") + d + .map: d => + Try(d.toInt) + .filter: + case Success(i) => + println(s"saw number $i") + true + case Failure(f) => + println(s"error: $f") + false + // ... + @main def run: Unit = println("hello!")