-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasb_test.go
118 lines (94 loc) · 3.28 KB
/
asb_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"context"
"os"
"testing"
)
const (
// TestQueueName is the name of the queue to use for testing
TestNamespace = "DEDAZ_TEST_NAMESPACE"
TestQueueName = "DEDAZ_TEST_QUEUE_NAME"
)
func TestSendMessagesIntegration(t *testing.T) {
client, queueName := setup(t)
testMessages := generateTestMessages(100)
res, err := client.SendMessages(context.Background(), queueName, testMessages)
if err != nil {
t.Errorf("Error sending messages to queue %s: %v", queueName, err)
}
if res != len(testMessages) {
t.Errorf("Failed to send all messages to queue %s", queueName)
}
}
func TestGetAndDeleteMessagesIntegration(t *testing.T) {
client, queueName := setup(t)
expectedMsgCount := 250
testMessages := generateTestMessages(expectedMsgCount)
seededCount, err := client.SeedTestMessages(context.Background(), queueName, testMessages)
if err != nil {
t.Errorf("Error seeding messages to queue %s: %v", queueName, err)
}
if seededCount != expectedMsgCount {
t.Errorf("Failed to seed all messages to queue %s", queueName)
}
ctx := context.Background()
messages, err := client.PeekMessages(ctx, queueName)
if err != nil {
t.Fatalf("Could not peek messages: %v", err)
}
if len(messages) != expectedMsgCount {
t.Errorf("Expected %d messages, got %d", expectedMsgCount, len(messages))
}
sequenceNumbers := make([]int64, 0, len(messages))
for _, msg := range messages {
sequenceNumbers = append(sequenceNumbers, *msg.SequenceNumber)
}
deletedMsgs, err := client.DeleteMessages(ctx, queueName, sequenceNumbers)
if err != nil {
t.Fatalf("Could not delete messages from queue %s: %v", queueName, err)
}
if len(deletedMsgs) != len(messages) {
t.Errorf("Expected %d messages to be deleted, got %d", len(messages), len(deletedMsgs))
}
}
func setup(t *testing.T) (*AsbClient, string) {
ns := os.Getenv(TestNamespace)
qn := os.Getenv(TestQueueName)
if testing.Short() {
t.Skip("skipping integration test")
}
if ns == "" || qn == "" {
t.Skipf("Skipping integration test; %s and %s must be set to run integration tests", TestNamespace, TestQueueName)
}
client, err := NewClient(ns)
if err != nil {
t.Fatalf("Unexpected error when setting up client on namespace %s: %v", ns, err)
}
ctx := context.Background()
props, err := client.getQueueRuntimeProperties(ctx, qn)
if err != nil {
t.Fatalf("Unexpected error when accessing queue %s on namespace %s: %v", qn, ns, err)
}
if props == nil {
t.Fatalf("Could not find queue %s on namespace %s", qn, ns)
}
activeMessages := int(props.ActiveMessageCount)
deadLetteredMessages := int(props.DeadLetterMessageCount)
t.Logf("Active messages: %d, Deadlettered messages: %d", activeMessages, deadLetteredMessages)
if activeMessages != 0 {
mainPurgedCount, err := client.purgeQueue(ctx, qn, activeMessages, MainQueue)
if err != nil {
t.Fatalf("Failed to purge active messages: %v", err)
}
if mainPurgedCount < activeMessages {
t.Fatalf("Failed to purge all active messages; got %d, expected %d", mainPurgedCount, activeMessages)
}
}
if deadLetteredMessages != 0 {
dlqPurgedCount, err := client.purgeQueue(ctx, qn, deadLetteredMessages, DeadLetterQueue)
if err != nil || dlqPurgedCount != deadLetteredMessages {
t.Fatalf("Could not delete messages from deadletter queue: %v", err)
}
}
return client, qn
}