Skip to content

Commit 1136592

Browse files
author
jeronimoalbi
committed
feat: initial implementation
1 parent 4259eb3 commit 1136592

File tree

9 files changed

+637
-2
lines changed

9 files changed

+637
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
build

Makefile

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
.PHONY: tailws
2+
all: build
3+
4+
build: tailws
5+
6+
install: install_tailws
7+
8+
tailws:
9+
@echo "Building tail-ws"
10+
go build -o build/tail-ws ./cmd/tail-ws
11+
12+
install_tailws:
13+
@echo "Installing tail-ws"
14+
go install ./cmd/tail-ws
15+
16+
clean:
17+
rm -rf build

README.md

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,31 @@
1-
Broadcaster for appended file lines
2-
===================================
1+
WebSocket broadcaster for appended file lines
2+
=============================================
33

4+
[![Go Report Card](https://goreportcard.com/badge/github.com/jeronimoalbi/tail-ws)](https://goreportcard.com/report/github.com/jeronimoalbi/tail-ws)
45
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
6+
7+
Installation
8+
------------
9+
10+
Install the binary by running:
11+
12+
```
13+
go install github.com/jeronimoalbi/tail-ws/cmd/tail-ws@latest
14+
```
15+
16+
or alternatively:
17+
18+
```
19+
make install
20+
```
21+
22+
Run
23+
---
24+
25+
To start broadcasting appended lines run:
26+
27+
```
28+
tail-ws FILE
29+
```
30+
31+
New lines are broadcasted by default from the address `ws://127.0.0.1:8080`.

broadcast/connections.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package broadcast
2+
3+
import (
4+
"sync"
5+
6+
"github.com/gorilla/websocket"
7+
)
8+
9+
// NewConnections create a new Websocket connections registry.
10+
func NewConnections() *Connections {
11+
return &Connections{
12+
registry: make(map[*websocket.Conn]struct{}),
13+
}
14+
}
15+
16+
// Connections keeps track of active Websocket connections.
17+
type Connections struct {
18+
mu sync.RWMutex
19+
registry map[*websocket.Conn]struct{}
20+
}
21+
22+
// IsEmpty checks if there are registered connections.
23+
func (c *Connections) IsEmpty() bool {
24+
c.mu.RLock()
25+
defer c.mu.RUnlock()
26+
27+
return len(c.registry) == 0
28+
}
29+
30+
// Add adds a new Websocket connection to the registry.
31+
func (c *Connections) Add(ws *websocket.Conn) {
32+
c.mu.Lock()
33+
c.registry[ws] = struct{}{}
34+
c.mu.Unlock()
35+
}
36+
37+
// Delete removes a Websocket connection from the registry.
38+
// Connections are closed after being removed.
39+
func (c *Connections) Delete(ws *websocket.Conn) error {
40+
c.mu.Lock()
41+
defer c.mu.Unlock()
42+
43+
delete(c.registry, ws)
44+
return ws.Close()
45+
}
46+
47+
// Close closes all connections.
48+
func (c *Connections) Close() {
49+
c.Iter(func(ws *websocket.Conn) bool {
50+
ws.Close()
51+
return true
52+
})
53+
}
54+
55+
// Iter allows iterating the current connections.
56+
// Iteration stops when when false is returned.
57+
func (c *Connections) Iter(fn func(*websocket.Conn) bool) {
58+
c.mu.RLock()
59+
for ws := range c.registry {
60+
if !fn(ws) {
61+
return
62+
}
63+
}
64+
c.mu.RUnlock()
65+
}

broadcast/server.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package broadcast
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"errors"
7+
"log"
8+
"net"
9+
"net/http"
10+
"time"
11+
12+
"github.com/gorilla/websocket"
13+
"golang.org/x/sync/errgroup"
14+
15+
"github.com/jeronimoalbi/tail-ws/watch"
16+
)
17+
18+
var (
19+
// DefaultAddr defines the default Websocket server address.
20+
DefaultAddr = "127.0.0.1:8080"
21+
22+
maxMessageSize int64 = 1024
23+
pingPeriod = (pongWait * 9) / 10
24+
pongWait = 60 * time.Second
25+
writeWait = 8 * time.Second
26+
)
27+
28+
// Option configures transaction broadcast servers.
29+
type Option func(*Server)
30+
31+
// Address sets the server address.
32+
func Address(addr string) Option {
33+
return func(s *Server) {
34+
s.addr = addr
35+
}
36+
}
37+
38+
// Origin sets the allowed origin for incoming requests.
39+
func Origin(origin string) Option {
40+
return func(s *Server) {
41+
s.origin = origin
42+
}
43+
}
44+
45+
// Secure enables secure Websockets (WSS).
46+
func Secure(certFile, keyFile string) Option {
47+
return func(s *Server) {
48+
s.certFile = certFile
49+
s.keyFile = keyFile
50+
}
51+
}
52+
53+
// NewServer creates a new transactions broadcast server.
54+
func NewServer(options ...Option) *Server {
55+
s := Server{
56+
addr: DefaultAddr,
57+
connections: NewConnections(),
58+
}
59+
60+
for _, apply := range options {
61+
apply(&s)
62+
}
63+
64+
s.upgrader.CheckOrigin = func(r *http.Request) bool {
65+
if s.origin != "" {
66+
return s.origin == r.Header.Get("Origin")
67+
}
68+
return true
69+
}
70+
71+
return &s
72+
}
73+
74+
// Server handles Websocket connections and broadcasts new transactions.
75+
// It watches the transactions head file and when new transactions are indexed
76+
// it pushes the new entries to the connected clients.
77+
type Server struct {
78+
addr, origin, certFile, keyFile string
79+
reader watch.Reader
80+
connections *Connections
81+
upgrader websocket.Upgrader
82+
}
83+
84+
// HandleWS is an HTTP handler that upgrades incoming connections to WS or WSS.
85+
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
86+
// TODO: Add authentication support
87+
log.Printf("connection stablished with %s", r.RemoteAddr)
88+
89+
ws, err := s.upgrader.Upgrade(w, r, nil)
90+
if err != nil {
91+
// Upgrade already returns the error to the client on failure
92+
log.Printf("connection from %s failed: %v", r.RemoteAddr, err)
93+
return
94+
}
95+
96+
ws.SetReadLimit(maxMessageSize)
97+
98+
// Prepare keep alive protocol for the new connection
99+
ws.SetReadDeadline(time.Now().Add(pongWait))
100+
ws.SetPongHandler(func(string) error {
101+
ws.SetReadDeadline(time.Now().Add(pongWait))
102+
return nil
103+
})
104+
105+
// Launch a gopher to keep connection alive
106+
ctx, cancel := context.WithCancel(context.Background())
107+
108+
go func() {
109+
ticker := time.NewTicker(pingPeriod)
110+
defer ticker.Stop()
111+
112+
for {
113+
select {
114+
case <-ticker.C:
115+
err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
116+
if err != nil {
117+
log.Printf("error sending ping: %v", err)
118+
ws.Close()
119+
}
120+
case <-ctx.Done():
121+
return
122+
}
123+
}
124+
}()
125+
126+
// Make sure to cleanup connection when closed
127+
ws.SetCloseHandler(func(int, string) error {
128+
log.Printf("closing connextion %s", ws.RemoteAddr())
129+
cancel()
130+
return s.connections.Delete(ws)
131+
})
132+
133+
s.connections.Add(ws)
134+
}
135+
136+
// Start starts a new HTTP server to listen for incoming WS or WSS connections.
137+
func (s *Server) Start(ctx context.Context) error {
138+
g, ctx := errgroup.WithContext(ctx)
139+
server := &http.Server{
140+
Addr: s.addr,
141+
Handler: http.HandlerFunc(s.HandleWS),
142+
BaseContext: func(l net.Listener) context.Context {
143+
return ctx
144+
},
145+
}
146+
147+
g.Go(func() error {
148+
<-ctx.Done()
149+
s.connections.Close()
150+
return server.Close()
151+
})
152+
153+
g.Go(func() error {
154+
var err error
155+
if s.certFile != "" && s.keyFile != "" {
156+
log.Printf("listening for connections -> wss://%s", s.addr)
157+
err = server.ListenAndServeTLS(s.certFile, s.keyFile)
158+
} else {
159+
log.Printf("listening for connections -> ws://%s", s.addr)
160+
err = server.ListenAndServe()
161+
}
162+
163+
if errors.Is(err, http.ErrServerClosed) {
164+
return nil
165+
}
166+
return err
167+
})
168+
169+
return g.Wait()
170+
}
171+
172+
// Watch starts watching a transaction head file and broadcasts
173+
// the newly indexed transactions to all connected peers.
174+
func (s *Server) Watch(ctx context.Context, name string) error {
175+
r := watch.NewReader(watch.SeekEnd())
176+
g, ctx := errgroup.WithContext(ctx)
177+
178+
g.Go(func() error {
179+
scanner := bufio.NewScanner(r)
180+
for scanner.Scan() {
181+
s.broadcast(scanner.Bytes())
182+
}
183+
184+
return scanner.Err()
185+
})
186+
187+
g.Go(func() error {
188+
defer r.Close()
189+
190+
for {
191+
// Keep watching when the file is ovewritten
192+
if err := r.Watch(ctx, name); err != watch.ErrFileOverwritten {
193+
return err
194+
}
195+
}
196+
})
197+
198+
return g.Wait()
199+
}
200+
201+
func (s Server) broadcast(tx []byte) {
202+
s.connections.Iter(func(ws *websocket.Conn) bool {
203+
go func() {
204+
ws.SetWriteDeadline(time.Now().Add(writeWait))
205+
206+
if err := ws.WriteMessage(websocket.BinaryMessage, tx); err != nil {
207+
log.Printf("tx broadcast failed: %v", err)
208+
ws.Close()
209+
}
210+
}()
211+
212+
return true
213+
})
214+
}

0 commit comments

Comments
 (0)