-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathintegration_test.go
207 lines (166 loc) · 5.16 KB
/
integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package test_local
import (
"context"
"encoding/gob"
"os"
"testing"
"github.com/joho/godotenv"
"github.com/soheilhy/cmux"
"github.com/stretchr/testify/assert"
amtest "github.com/pancsta/asyncmachine-go/internal/testing"
ssTest "github.com/pancsta/asyncmachine-go/internal/testing/states"
"github.com/pancsta/asyncmachine-go/internal/testing/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/telemetry"
"github.com/pancsta/asyncmachine-go/tools/debugger"
"github.com/pancsta/asyncmachine-go/tools/debugger/server"
ss "github.com/pancsta/asyncmachine-go/tools/debugger/states"
)
// worker is a local worker with imported data, which listens for new
// telemetry connections
var worker *debugger.Debugger
// make sure these ports aren't used in other tests
var workerAddr = "localhost:" + utils.RandPort(52001, 53000)
func init() {
_ = godotenv.Load()
if os.Getenv(am.EnvAmTestDebug) != "" {
amhelp.EnableDebugging(false)
os.Setenv(am.EnvAmLogFile, "1")
}
var err error
gob.Register(server.GetField(0))
// worker
worker, err = amtest.NewDbgWorker(false, debugger.Opts{
ID: "loc-worker"})
if err != nil {
panic(err)
}
// init am-dbg telemetry server
muxCh := make(chan cmux.CMux, 1)
defer close(muxCh)
go server.StartRpc(worker.Mach, workerAddr, muxCh, nil)
// wait for mux
<-muxCh
}
func TestUserFwd(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mach := worker.Mach
// fixtures
cursorTx := 20
amhelp.Add1AsyncBlock(ctx, mach, ss.SwitchedClientTx, ss.SwitchingClientTx,
am.A{"Client.id": "sim", "Client.cursorTx": cursorTx})
// test
res := amhelp.Add1Block(ctx, mach, ss.UserFwd, nil)
// assert
assert.NotEqual(t, res, am.Canceled)
assert.Equal(t, cursorTx+1, worker.C.CursorTx1)
}
func TestUserFwd100(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mach := worker.Mach
// fixtures
cursorTx := 20
amhelp.Add1AsyncBlock(ctx, mach, ss.SwitchedClientTx, ss.SwitchingClientTx,
am.A{"Client.id": "sim", "Client.cursorTx": cursorTx})
// test
// add ss.UserFwd 100 times in a series
for i := 0; i < 100; i++ {
// wait for Fwd to de-activate each time (Fwd is implied by UserFwd)
when := mach.WhenTicks(ss.Fwd, 2, ctx)
res := mach.Add1(ss.UserFwd, nil)
if res == am.Canceled {
t.Fatal(res)
}
<-when
}
// assert that at least 100 txs got scrolled (more bc of def filters)
assert.GreaterOrEqual(t, worker.C.CursorTx1, cursorTx+100)
}
func TestTailModeFLAKY(t *testing.T) {
// TODO flaky
if os.Getenv(amhelp.EnvAmTestRunner) != "" {
t.Skip("FLAKY")
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create a listener for ConnectEvent
whenConn := worker.Mach.WhenTicks(ss.ConnectEvent, 1, ctx)
// fixture machine
mach := utils.NewRels(t, nil)
mach.SetLoggerEmpty(am.LogOps)
err := telemetry.TransitionsToDbg(mach, workerAddr)
if err != nil {
t.Fatal(err)
}
// wait for the fixture machine to connect
<-whenConn
whenDelivered := worker.Mach.WhenTicks(ss.ClientMsg, 2, ctx)
// generate fixture events
// mach.SetLoggerSimple(t.Logf, am.LogOps)
mach.Add1(ssTest.C, nil)
mach.Add1(ssTest.D, nil)
mach.Add1(ssTest.A, nil)
// wait for the msg
for {
<-whenDelivered
// because of receive batching, sometimes txs come in 1, sometimes in 2
// msgs
c := len(worker.C.MsgTxs)
if c >= 3 {
break
}
// wait more
t.Logf("waiting for more txs (%d)", c)
whenDelivered = worker.Mach.WhenTicks(ss.ClientMsg, 2, ctx)
}
// go back 2 txs
worker.Mach.Add1(ss.UserBack, nil)
worker.Mach.Add1(ss.UserBack, nil)
// switch to tail mode
worker.Mach.Add1(ss.TailMode, nil)
// assert.NotEqual(t, res, am.Canceled)
assert.Equal(t, 4, len(worker.C.MsgTxs), "tx count")
assert.Equal(t, 4, worker.C.CursorTx1, "cursorTx")
// TODO assert tree clocks
// TODO assert log highlight
}
func TestUserBack(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mach := worker.Mach
// fixtures
cursorTx := 20
amhelp.Add1AsyncBlock(ctx, mach, ss.SwitchedClientTx, ss.SwitchingClientTx,
am.A{"Client.id": "sim", "Client.cursorTx": cursorTx})
// test
res := amhelp.Add1Block(ctx, mach, ss.UserBack, nil)
// assert
assert.NotEqual(t, res, am.Canceled)
assert.Equal(t, cursorTx-1, worker.C.CursorTx1)
}
func TestStepsResetAfterStateJump(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mach := worker.Mach
// fixtures
state := "PublishMessage"
cursorTx := 20
amhelp.Add1AsyncBlock(ctx, mach, ss.SwitchedClientTx, ss.SwitchingClientTx,
am.A{"Client.id": "ps-2", "Client.cursorTx": cursorTx})
// test
amhelp.Add1Block(ctx, mach, ss.StateNameSelected, am.A{"state": state})
amhelp.Add1Block(ctx, mach, ss.UserFwdStep, nil)
amhelp.Add1Block(ctx, mach, ss.UserFwdStep, nil)
// trigger a state jump and wait for the next scroll
amhelp.Add1AsyncBlock(ctx, mach, ss.ScrollToTx, ss.ScrollToMutTx, am.A{
"state": state,
"fwd": true,
})
// assert
assert.Equal(t, 0, worker.C.CursorStep, "Steps timeline should reset")
// TODO assert not playing
}