Skip to content

Commit c56db99

Browse files
committed
feat: wip
1 parent ec31a77 commit c56db99

File tree

14 files changed

+413
-71
lines changed

14 files changed

+413
-71
lines changed

.github/workflows/go.yml

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
# This workflow will build a golang project
2-
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go
3-
4-
name: Go
1+
name: Test Akumu
52

63
on:
74
push:
@@ -10,7 +7,8 @@ on:
107
branches: ["main"]
118

129
jobs:
13-
build:
10+
test:
11+
name: Test
1412
runs-on: ubuntu-latest
1513
steps:
1614
- uses: actions/checkout@v3

akumu.go

+80-6
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,97 @@
11
package akumu
22

33
import (
4-
"fmt"
4+
"io"
5+
"log/slog"
56
netHttp "net/http"
67

78
"github.com/go-chi/chi/v5"
9+
"github.com/studiolambda/akumu/event"
810
)
911

12+
type Config struct {
13+
address string
14+
events *event.System
15+
logger *slog.Logger
16+
}
17+
1018
type Akumu struct {
11-
router chi.Router
19+
Config
20+
server netHttp.Server
21+
}
22+
23+
type Builder func(*Config)
24+
25+
const (
26+
EventStart event.Event = "akumu.start"
27+
EventStop event.Event = "akumu.stop"
28+
)
29+
30+
func WithLogger(logger *slog.Logger) Builder {
31+
return func(config *Config) {
32+
config.logger = logger
33+
}
34+
}
35+
36+
func WithAddress(address string) Builder {
37+
return func(config *Config) {
38+
config.address = address
39+
}
40+
}
41+
42+
func WithEvents(events *event.System) Builder {
43+
return func(config *Config) {
44+
config.events = events
45+
}
1246
}
1347

14-
func New() *Akumu {
48+
func New(builders ...Builder) *Akumu {
49+
config := Config{
50+
address: ":3000",
51+
logger: slog.New(
52+
slog.NewTextHandler(io.Discard, nil),
53+
),
54+
}
55+
56+
for _, builder := range builders {
57+
builder(&config)
58+
}
59+
1560
return &Akumu{
16-
router: chi.NewRouter(),
61+
Config: config,
62+
server: netHttp.Server{
63+
Addr: config.address,
64+
Handler: chi.NewRouter(),
65+
DisableGeneralOptionsHandler: false,
66+
ReadTimeout: 0,
67+
ReadHeaderTimeout: 0,
68+
WriteTimeout: 0,
69+
IdleTimeout: 0,
70+
MaxHeaderBytes: 0,
71+
},
1772
}
1873
}
1974

2075
func (akumu *Akumu) Start() {
21-
fmt.Printf("Starting server on: http://localhost:3000\n")
22-
netHttp.ListenAndServe(":3000", akumu.router)
76+
akumu.logger.Info("starting server", "address", akumu.address)
77+
78+
if akumu.events != nil {
79+
akumu.events.Emit(EventStart, nil)
80+
}
81+
82+
if err := akumu.server.ListenAndServe(); err != nil && err != netHttp.ErrServerClosed {
83+
akumu.logger.Error("error while starting server", "error", err)
84+
}
85+
}
86+
87+
func (akumu *Akumu) Stop() {
88+
akumu.logger.Info("stopping server")
89+
90+
if akumu.events != nil {
91+
akumu.events.Emit(EventStop, nil)
92+
}
93+
94+
if err := akumu.server.Close(); err != nil {
95+
akumu.logger.Error("error while stopping server", "error", err)
96+
}
2397
}

event/event.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package event
2+
3+
type Event string
4+
5+
type Payload struct {
6+
Event Event
7+
Data any
8+
}
9+
10+
type Subscriber chan<- Payload

event/service.go

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package event
2+
3+
type Service interface {
4+
Emit(event Event, payload any) error
5+
Subscribe(subscriber Subscriber, events ...Event)
6+
Unsubscribe(subscriber Subscriber, events ...Event)
7+
IsSubscribed(subscriber Subscriber, events ...Event) bool
8+
}

event/system.go

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package event
2+
3+
import (
4+
"context"
5+
"io"
6+
"log/slog"
7+
"sync"
8+
"time"
9+
)
10+
11+
type Config struct {
12+
timeout time.Duration
13+
logger *slog.Logger
14+
}
15+
16+
type System struct {
17+
Config
18+
subscribers map[Event][]Subscriber
19+
mutex sync.RWMutex
20+
}
21+
22+
type Builder func(*Config)
23+
24+
func WithLogger(logger *slog.Logger) Builder {
25+
return func(config *Config) {
26+
config.logger = logger
27+
}
28+
}
29+
30+
func WithTimeout(timeout time.Duration) Builder {
31+
return func(config *Config) {
32+
config.timeout = timeout
33+
}
34+
}
35+
36+
func New(builders ...Builder) *System {
37+
config := Config{
38+
timeout: 5 * time.Second,
39+
logger: slog.New(
40+
slog.NewTextHandler(io.Discard, nil),
41+
),
42+
}
43+
44+
for _, builder := range builders {
45+
builder(&config)
46+
}
47+
48+
return &System{
49+
Config: config,
50+
subscribers: make(map[Event][]Subscriber),
51+
mutex: sync.RWMutex{},
52+
}
53+
}
54+
55+
func (system *System) Emit(event Event, payload any) error {
56+
system.mutex.RLock()
57+
defer system.mutex.RUnlock()
58+
59+
ctx, cancel := context.WithTimeout(context.Background(), system.timeout)
60+
defer cancel()
61+
62+
subscribers := system.subscribers[event]
63+
64+
for _, subscriber := range subscribers {
65+
select {
66+
case subscriber <- Payload{Event: event, Data: payload}:
67+
continue
68+
case <-ctx.Done():
69+
return ctx.Err()
70+
}
71+
}
72+
73+
system.logger.Debug("event emitted", "event", event, "subscribers", len(subscribers))
74+
75+
return nil
76+
}
77+
78+
func (system *System) Subscribe(subscriber Subscriber, events ...Event) {
79+
system.mutex.Lock()
80+
defer system.mutex.Unlock()
81+
82+
for _, event := range events {
83+
system.subscribers[event] = append(system.subscribers[event], subscriber)
84+
}
85+
86+
system.logger.Debug("subscribed", "events", events)
87+
}
88+
89+
func (system *System) Unsubscribe(subscriber Subscriber, events ...Event) {
90+
system.mutex.Lock()
91+
defer system.mutex.Unlock()
92+
93+
for _, event := range events {
94+
for index, sub := range system.subscribers[event] {
95+
if sub == subscriber {
96+
system.subscribers[event] = append(
97+
system.subscribers[event][:index],
98+
system.subscribers[event][index+1:]...,
99+
)
100+
}
101+
}
102+
}
103+
104+
system.logger.Debug("unsubscribed", "events", events)
105+
}
106+
107+
func (system *System) IsSubscribed(subscriber Subscriber, events ...Event) bool {
108+
system.mutex.RLock()
109+
defer system.mutex.RUnlock()
110+
111+
for _, event := range events {
112+
for _, sub := range system.subscribers[event] {
113+
if sub == subscriber {
114+
return true
115+
}
116+
}
117+
}
118+
119+
return false
120+
}

event/system_test.go

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package event_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
"github.com/studiolambda/akumu/event"
8+
)
9+
10+
func TestCanCreateSystem(t *testing.T) {
11+
system := event.New()
12+
13+
require.NotNil(t, system)
14+
}
15+
16+
func TestCanSubscribeToEvents(t *testing.T) {
17+
t.Run("subscribe to one event", func(t *testing.T) {
18+
system := event.New()
19+
subscriber := make(event.Subscriber)
20+
e := event.Event("test.event")
21+
22+
system.Subscribe(subscriber, e)
23+
24+
require.True(t, system.IsSubscribed(subscriber, e))
25+
})
26+
27+
t.Run("subscribe to multiple events", func(t *testing.T) {
28+
system := event.New()
29+
subscriber := make(event.Subscriber)
30+
e1 := event.Event("test.event1")
31+
e2 := event.Event("test.event2")
32+
33+
system.Subscribe(subscriber, e1, e2)
34+
35+
require.True(t, system.IsSubscribed(subscriber, e1))
36+
require.True(t, system.IsSubscribed(subscriber, e2))
37+
})
38+
39+
t.Run("subscribe to same event multiple times", func(t *testing.T) {
40+
system := event.New()
41+
subscriber := make(event.Subscriber)
42+
e := event.Event("test.event")
43+
44+
system.Subscribe(subscriber, e)
45+
system.Subscribe(subscriber, e)
46+
47+
require.True(t, system.IsSubscribed(subscriber, e))
48+
})
49+
50+
t.Run("subscribe to same event multiple times with different subscribers", func(t *testing.T) {
51+
system := event.New()
52+
subscriber1 := make(event.Subscriber)
53+
subscriber2 := make(event.Subscriber)
54+
e := event.Event("test.event")
55+
56+
system.Subscribe(subscriber1, e)
57+
system.Subscribe(subscriber2, e)
58+
59+
require.True(t, system.IsSubscribed(subscriber1, e))
60+
require.True(t, system.IsSubscribed(subscriber2, e))
61+
})
62+
63+
t.Run("dont subscribe", func(t *testing.T) {
64+
system := event.New()
65+
subscriber := make(event.Subscriber)
66+
e := event.Event("test.event")
67+
68+
require.False(t, system.IsSubscribed(subscriber, e))
69+
})
70+
}
71+
72+
func TestCanUnsubscribeFromEvents(t *testing.T) {
73+
t.Run("unsubscribe from one event", func(t *testing.T) {
74+
system := event.New()
75+
subscriber := make(event.Subscriber)
76+
e := event.Event("test.event")
77+
78+
system.Subscribe(subscriber, e)
79+
system.Unsubscribe(subscriber, e)
80+
81+
require.False(t, system.IsSubscribed(subscriber, e))
82+
})
83+
84+
t.Run("unsubscribe from multiple events", func(t *testing.T) {
85+
system := event.New()
86+
subscriber := make(event.Subscriber)
87+
e1 := event.Event("test.event1")
88+
e2 := event.Event("test.event2")
89+
90+
system.Subscribe(subscriber, e1, e2)
91+
system.Unsubscribe(subscriber, e1, e2)
92+
93+
require.False(t, system.IsSubscribed(subscriber, e1))
94+
require.False(t, system.IsSubscribed(subscriber, e2))
95+
})
96+
97+
t.Run("unsubscribe from same event multiple times", func(t *testing.T) {
98+
system := event.New()
99+
subscriber := make(event.Subscriber)
100+
e := event.Event("test.event")
101+
102+
system.Subscribe(subscriber, e)
103+
system.Unsubscribe(subscriber, e)
104+
system.Unsubscribe(subscriber, e)
105+
106+
require.False(t, system.IsSubscribed(subscriber, e))
107+
})
108+
109+
t.Run("unsubscribe from same event multiple times with different subscribers", func(t *testing.T) {
110+
system := event.New()
111+
subscriber1 := make(event.Subscriber)
112+
subscriber2 := make(event.Subscriber)
113+
e := event.Event("test.event")
114+
115+
system.Subscribe(subscriber1, e)
116+
system.Subscribe(subscriber2, e)
117+
system.Unsubscribe(subscriber1, e)
118+
system.Unsubscribe(subscriber2, e)
119+
120+
require.False(t, system.IsSubscribed(subscriber1, e))
121+
require.False(t, system.IsSubscribed(subscriber2, e))
122+
})
123+
124+
t.Run("dont unsubscribe", func(t *testing.T) {
125+
system := event.New()
126+
subscriber := make(event.Subscriber)
127+
e := event.Event("test.event")
128+
129+
system.Unsubscribe(subscriber, e)
130+
131+
require.False(t, system.IsSubscribed(subscriber, e))
132+
})
133+
}

0 commit comments

Comments
 (0)