Skip to content

Commit

Permalink
ch08 done!
Browse files Browse the repository at this point in the history
- no idea what's going on
  • Loading branch information
spamegg1 committed Sep 9, 2024
1 parent 08f3126 commit ad50c6e
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 78 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,14 @@ which does nothing. There is also
val pid = unistd.getpid()
```
which is never used. There are lots of other examples. There are also many unused / unnecessary imports in the files. Whenever I ran into these, I removed them.
which is never used. There are also illegal things like:
```scala
val p = SyncPipe(0)
val p = FilePipe(c"./data.txt")
```
There are lots of other examples. There are also many unused / unnecessary imports in the files. Whenever I ran into these, I removed them.
There is also a lot of code duplication, I suppose, to make each individual file "runnable" by itself. I removed redundant code by adding package declarations, then importing the duplicated code from other files instead.
Expand Down
4 changes: 4 additions & 0 deletions data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
foo
1
2
-4095
8 changes: 8 additions & 0 deletions src/main/scala/ch08/common/Pipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ trait Pipe[T, U]:
def mapConcat[V](g: U => Seq[V]): Pipe[U, V] = addDestination(ConcatPipe(g))
def mapOption[V](g: U => Option[V]): Pipe[U, V] = addDestination(OptionPipe(g))

// not sure about this one, the book has Pipe[T] which doesn't exist.
def filter(f: U => Boolean): Pipe[U, U] = addDestination(
mapOption: u =>
f(u) match
case true => Some(u)
case false => None
)

def mapAsync[V](g: U => Future[V])(using ec: ExecutionContext): Pipe[U, V] =
addDestination(AsyncPipe(g))

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/ch08/common/otherPipes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ case class Tokenizer(separator: String) extends Pipe[String, String]:

def scan(input: String): Seq[String] =
println(s"scanning: '$input'")
buffer = buffer + input
var o: Seq[String] = Seq()
buffer += input
var o = Seq[String]()
while buffer.contains(separator) do
val space_position = buffer.indexOf(separator)
val word = buffer.substring(0, space_position)
val spacePosition = buffer.indexOf(separator)
val word = buffer.substring(0, spacePosition)
o = o :+ word
buffer = buffer.substring(space_position + 1)
buffer = buffer.substring(spacePosition + 1)
o

override def feed(input: String): Unit =
Expand Down
19 changes: 10 additions & 9 deletions src/main/scala/ch08/filePipe/FileOutputPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@ case class FileOutputPipe(fd: Int, serial: Int, async: Boolean)

override def feed(input: String): Unit =
val outputSize = input.size
val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq]
val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] // writeCB frees

val outputBuffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]]
outputBuffer._1 = malloc(outputSize.toUSize) // 0.5
val outputBuffer = malloc(sizeof[Buffer]).asInstanceOf[Ptr[Buffer]] // freed where?
outputBuffer._1 = malloc(outputSize)
Zone:
val outputString = toCString(input)
strncpy(outputBuffer._1, outputString, outputSize.toUSize) // 0.5

outputBuffer._2 = outputSize.toUSize // 0.5
!req = outputBuffer.asInstanceOf[Ptr[Byte]]

// presumably, this frees outputBuffer
uv_fs_write(ch07.EventLoop.loop, req, fd, outputBuffer, 1, offset, writeCB)
offset += outputSize

override def done(): Unit =
val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq]
uv_fs_close(ch07.EventLoop.loop, req, fd, null)
val req = stdlib.malloc(uv_req_size(UV_FS_REQ_T)).asInstanceOf[FSReq] // freed where?
uv_fs_close(ch07.EventLoop.loop, req, fd, null) // here?
FileOutputPipe.activeStreams -= serial

object FileOutputPipe:
Expand All @@ -55,10 +56,10 @@ object FileOutputPipe:

val writeCB = CFuncPtr1.fromScalaFunction[FSReq, Unit]: (req: FSReq) =>
println("write completed")
val resp_buffer = (!req).asInstanceOf[Ptr[Buffer]]
stdlib.free(resp_buffer._1)
stdlib.free(resp_buffer.asInstanceOf[Ptr[Byte]])
stdlib.free(req.asInstanceOf[Ptr[Byte]])
val respBuffer = (!req).asInstanceOf[Ptr[Buffer]]
stdlib.free(respBuffer._1)
stdlib.free(respBuffer.asInstanceOf[Ptr[Byte]])
stdlib.free(req.asInstanceOf[Ptr[Byte]]) // it's freed here!

// def onShutdown(shutdownReq: ShutdownReq, status: Int): Unit =
// val client = (!shutdownReq).cast[PipeHandle]
Expand Down
100 changes: 37 additions & 63 deletions src/main/scala/ch08/filePipe/examples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package ch08
package filePipe
package examples

import scalanative.unsafe.CQuote
import scala.util.Try
import scalanative.unsafe.{CQuote, CString}
import scala.util.{Try, Success, Failure}
import ch07.LibUV.*, ch07.LibUVConstants.*
import ch07.examples.ExecutionContext
import ch07.EventLoop

@main
def fileInputPipeExample: Unit =
val p = FilePipe(c"./data.txt")
val path = c"../data.txt" // replace this with your own path
val p = FilePipe(path)
.map: d =>
println(s"consumed $d")
val parsed = Try(d.toInt)
Expand All @@ -22,10 +25,7 @@ def fileInputPipeExample: Unit =
@main
def fileOutputPipeExample: Unit =
println("hello!")
// val p = SyncPipe(0)
val p = FilePipe(c"./data.txt")

val q = p
val p = FilePipe(c"../data.txt")
.map: d =>
println(s"consumed $d")
val parsed = Try(d.toInt)
Expand All @@ -36,63 +36,37 @@ def fileOutputPipeExample: Unit =
uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
println("done")

// object Stuff:
// import filePipe.*
// import ch07.LibUV.*, ch07.LibUVConstants.*

// def filter[T](f: T => Boolean): Pipe[T] =
// addDestination(
// mapOption: t =>
// f(t) match
// case true => Some(t)
// case false => None
// )

// @main
// def stuff: Unit =
// val p1: Pipe[String, String] = ???
// var counter = 0
// p1.map: i =>
// counter += 1
// i

// // ...
// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
// println(s"saw $counter elements")

// val p2: Pipe[String] = ???
// val c = p2.addDestination(Counter())
// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
// println(s"saw ${c.counter} elements")
@main
def asyncPipeExample: Unit =
val p4: Pipe[String, CString] = ???
p4.mapAsync(url => ch07.Curl.get(url))(using EventLoop)
.map(response => println(s"got back result: $response"))

// val p3: Pipe[String] = ???
// p3.mapConcat(content => content.split("\n"))
// .mapConcat(line => line.split(" "))
// .map(word => println(s"saw word: ${word}"))
@main
def statefulProcessorExample: Unit =
val p1: Pipe[String, String] = ???
var counter = 0
p1.map: i =>
counter += 1
i
// ...
uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
println(s"saw $counter elements")

// uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
// println(s"saw ${c.counter} elements")
@main
def counterSinkExample: Unit =
// val p2: Pipe[String, String] = ??? // not sure how to make this work.
// val c = p2.addDestination(CounterSink())
val c = CounterSink()
uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
println(s"saw ${c.counter} elements")

// SyncPipe(0)
// .map(d =>
// println(s"consumed $d")
// d
// )
// .map(d =>
// val parsed = Try {
// d.toInt
// }
// )
// .filter {
// case Success(i) =>
// println(s"saw number $i")
// true
// case Failure(f) =>
// println(s"error: $f")
// false
// }
// // ...
@main
def tokenizerExample: Unit =
val p3: Pipe[String, String] = ???
p3.mapConcat(content => content.split("\n"))
.mapConcat(line => line.split(" "))
.map(word => println(s"saw word: ${word}"))

// val p4: Pipe[String] = ???
// p4.mapAsync(url => Curl.get(url))
// .map(response => println(s"got back result: $response"))
uv_run(ch07.EventLoop.loop, UV_RUN_DEFAULT)
// println(s"saw ${c.counter} elements")
17 changes: 17 additions & 0 deletions src/main/scala/ch08/syncPipe/main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ package syncPipe
import scala.util.{Try, Success, Failure}
import ch07.LibUV.uv_run, ch07.LibUVConstants.UV_RUN_DEFAULT

@main
def pipeChainFilterExample: Unit =
SyncPipe(0)
.map: d =>
println(s"consumed $d")
d
.map: d =>
Try(d.toInt)
.filter:
case Success(i) =>
println(s"saw number $i")
true
case Failure(f) =>
println(s"error: $f")
false
// ...

@main
def run: Unit =
println("hello!")
Expand Down

0 comments on commit ad50c6e

Please sign in to comment.