Skip to content

Commit

Permalink
std: add face up down (close #95)
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Jan 27, 2025
1 parent 4fea2eb commit 297df29
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 86 deletions.
44 changes: 44 additions & 0 deletions std/engine/face/base_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ type baseFace struct {
local bool
onPkt func(frame []byte) error
onError func(err error) error
onUp func()
onDown func()
sendMut sync.Mutex
}

func newBaseFace(local bool) baseFace {
return baseFace{
local: local,
onUp: func() {},
onDown: func() {},
}
}

func (f *baseFace) IsRunning() bool {
return f.running.Load()
}
Expand All @@ -29,3 +39,37 @@ func (f *baseFace) OnPacket(onPkt func(frame []byte) error) {
func (f *baseFace) OnError(onError func(err error) error) {
f.onError = onError
}

func (f *baseFace) OnUp(onUp func()) {
f.onUp = onUp
}

func (f *baseFace) OnDown(onDown func()) {
f.onDown = onDown
}

// setStateDown sets the face to down state, and makes the down
// callback if the face was previously up.
func (f *baseFace) setStateDown() {
if f.running.Swap(false) {
if f.onDown != nil {
f.onDown()
}
}
}

// setStateUp sets the face to up state, and makes the up
// callback if the face was previously down.
func (f *baseFace) setStateUp() {
if !f.running.Swap(true) {
if f.onUp != nil {
f.onUp()
}
}
}

// setStateClosed sets the face to closed state without
// making the onDown callback. Returns if the face was running.
func (f *baseFace) setStateClosed() bool {
return f.running.Swap(false)
}
1 change: 1 addition & 0 deletions std/engine/face/dummy_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type DummyFace struct {

func NewDummyFace() *DummyFace {
return &DummyFace{
baseFace: newBaseFace(true),
sendPkts: make([]enc.Buffer, 0),
}
}
Expand Down
10 changes: 9 additions & 1 deletion std/engine/face/face.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,18 @@ type Face interface {
// This function should only be called by engine implementations.
OnError(onError func(err error) error)

// Open starts the face.
// Open starts the face and may blocks until it is up.
Open() error
// Close stops the face.
Close() error
// Send sends a packet frame to the face.
Send(pkt enc.Wire) error

// OnUp sets the callback for the face going up.
// The callback may be called multiple times.
OnUp(onUp func())
// OnDown sets the callback for the face going down.
// The callback may be called multiple times.
// The callback will not be called when the face is closed.
OnDown(onDown func())
}
38 changes: 21 additions & 17 deletions std/engine/face/stream_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"net"
"os"

enc "github.com/named-data/ndnd/std/encoding"
ndn_io "github.com/named-data/ndnd/std/utils/io"
Expand All @@ -18,13 +19,16 @@ type StreamFace struct {
}

func NewStreamFace(network string, addr string, local bool) *StreamFace {
return &StreamFace{
baseFace: baseFace{
local: local,
},
network: network,
addr: addr,
s := &StreamFace{
baseFace: newBaseFace(local),
network: network,
addr: addr,
}

// Quit app by default when stream face fails
s.OnDown(func() { os.Exit(106) })

return s
}

func (f *StreamFace) String() string {
Expand All @@ -36,7 +40,7 @@ func (f *StreamFace) Trait() Face {
}

func (f *StreamFace) Open() error {
if f.running.Load() {
if f.IsRunning() {
return errors.New("face is already running")
}

Expand All @@ -50,27 +54,24 @@ func (f *StreamFace) Open() error {
}

f.conn = c
f.running.Store(true)

f.setStateUp()
go f.receive()

return nil
}

func (f *StreamFace) Close() error {
if !f.running.Swap(false) {
return nil
}

if f.conn != nil {
return f.conn.Close()
if f.setStateClosed() {
if f.conn != nil {
return f.conn.Close()
}
}

return nil
}

func (f *StreamFace) Send(pkt enc.Wire) error {
if !f.running.Load() {
if !f.IsRunning() {
return errors.New("face is not running")
}

Expand All @@ -86,14 +87,17 @@ func (f *StreamFace) Send(pkt enc.Wire) error {
}

func (f *StreamFace) receive() {
defer f.setStateDown()

err := ndn_io.ReadTlvStream(f.conn, func(b []byte) bool {
if err := f.onPkt(b); err != nil {
f.Close() // engine error
return false // break
}
return f.IsRunning()
}, nil)

if f.running.Swap(false) {
if f.IsRunning() {
if err != nil {
f.onError(err)
} else {
Expand Down
47 changes: 21 additions & 26 deletions std/engine/face/wasm_sim_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ type WasmSimFace struct {

func NewWasmSimFace() *WasmSimFace {
return &WasmSimFace{
baseFace: baseFace{
local: true,
},
gosim: js.Null(),
baseFace: newBaseFace(true),
gosim: js.Null(),
}
}

Expand All @@ -32,19 +30,6 @@ func (f *WasmSimFace) Trait() Face {
return f
}

func (f *WasmSimFace) Send(pkt enc.Wire) error {
if !f.running.Load() {
return errors.New("face is not running")
}

l := pkt.Length()
arr := js.Global().Get("Uint8Array").New(int(l))
js.CopyBytesToJS(arr, pkt.Join())
f.gosim.Call("sendPkt", arr)

return nil
}

func (f *WasmSimFace) Open() error {
if f.onError == nil || f.onPkt == nil {
return errors.New("face callbacks are not set")
Expand All @@ -59,21 +44,31 @@ func (f *WasmSimFace) Open() error {
f.gosim.Call("setRecvPktCallback", js.FuncOf(f.receive))

log.Info(f, "Face opened")
f.running.Store(true)
f.setStateUp()

return nil
}

func (f *WasmSimFace) Close() error {
if f.gosim.IsNull() {
if f.setStateClosed() {
f.gosim.Call("setRecvPktCallback", js.FuncOf(func(this js.Value, args []js.Value) any {
return nil
}))
f.gosim = js.Null()
}

return nil
}

func (f *WasmSimFace) Send(pkt enc.Wire) error {
if !f.IsRunning() {
return errors.New("face is not running")
}

f.running.Store(false)
f.gosim.Call("setRecvPktCallback", js.FuncOf(func(this js.Value, args []js.Value) any {
return nil
}))
f.gosim = js.Null()
l := pkt.Length()
arr := js.Global().Get("Uint8Array").New(int(l))
js.CopyBytesToJS(arr, pkt.Join())
f.gosim.Call("sendPkt", arr)

return nil
}
Expand All @@ -86,10 +81,10 @@ func (f *WasmSimFace) receive(this js.Value, args []js.Value) any {

buf := make([]byte, pkt.Get("byteLength").Int())
js.CopyBytesToGo(buf, pkt)

err := f.onPkt(buf)
if err != nil {
f.running.Store(false)
log.Error(f, "Unable to handle packet: %+v", err)
f.Close()
}

return nil
Expand Down
43 changes: 20 additions & 23 deletions std/engine/face/wasm_ws_face.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,55 +26,54 @@ func (f *WasmWsFace) Trait() Face {

func NewWasmWsFace(url string, local bool) *WasmWsFace {
return &WasmWsFace{
baseFace: baseFace{
local: local,
},
url: url,
conn: js.Null(),
baseFace: newBaseFace(local),
url: url,
conn: js.Null(),
}
}

func (f *WasmWsFace) Open() error {
if f.IsRunning() {
return errors.New("face is already running")
}

if f.onError == nil || f.onPkt == nil {
return errors.New("face callbacks are not set")
}

if !f.conn.IsNull() {
return errors.New("face is already running")
}
// It seems now Go cannot handle exceptions thrown by JS
conn := js.Global().Get("WebSocket").New(f.url)
conn.Set("binaryType", "arraybuffer")

ch := make(chan struct{}, 1)
// It seems now Go cannot handle exceptions thrown by JS
f.conn = js.Global().Get("WebSocket").New(f.url)
f.conn.Set("binaryType", "arraybuffer")
f.conn.Call("addEventListener", "open", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
conn.Call("addEventListener", "message", js.FuncOf(f.receive))
conn.Call("addEventListener", "open", js.FuncOf(func(this js.Value, args []js.Value) interface{} {
ch <- struct{}{}
close(ch)
return nil
}))

f.conn.Call("addEventListener", "message", js.FuncOf(f.receive))
log.Info(f, "Waiting for WebSocket connection ...")
<-ch
log.Info(f, "WebSocket connected")

f.running.Store(true)
f.conn = conn
f.setStateUp()

return nil
}

func (f *WasmWsFace) Close() error {
if f.conn.IsNull() {
return errors.New("face is not running")
if f.setStateClosed() {
f.conn.Call("close")
f.conn = js.Null()
}
f.running.Store(false)
f.conn.Call("close")
f.conn = js.Null()

return nil
}

func (f *WasmWsFace) Send(pkt enc.Wire) error {
if !f.running.Load() {
if !f.IsRunning() {
return errors.New("face is not running")
}

Expand All @@ -99,9 +98,7 @@ func (f *WasmWsFace) receive(this js.Value, args []js.Value) any {

err := f.onPkt(buf)
if err != nil {
f.running.Store(false)
f.conn.Call("close")
f.conn = js.Null()
f.Close()
}

return nil
Expand Down
Loading

0 comments on commit 297df29

Please sign in to comment.