-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathqueue.go
More file actions
313 lines (259 loc) · 6.97 KB
/
queue.go
File metadata and controls
313 lines (259 loc) · 6.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package sqliteq
import (
"database/sql"
"fmt"
"sync/atomic"
"time"
"github.com/lucsky/cuid"
_ "github.com/mattn/go-sqlite3"
)
// Queue implements the Queue interface using SQLite as the storage backend
type Queue struct {
client *sql.DB
tableName string
removeOnComplete bool
closed atomic.Bool
}
// newQueue creates a new SQLite-based queue
func newQueue(db *sql.DB, tableName string, opts ...Option) (*Queue, error) {
q := &Queue{
client: db,
tableName: tableName,
removeOnComplete: true, // Default to removing completed items
}
// Apply any provided options
for _, opt := range opts {
opt(q)
}
if err := q.initTable(); err != nil {
db.Close()
return nil, fmt.Errorf("failed to initialize table: %w", err)
}
q.RequeueNoAckRows()
return q, nil
}
// initTable initializes the queue table if it doesn't exist
func (q *Queue) initTable() error {
createTableSQL := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %[1]s (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data BLOB NOT NULL,
status TEXT NOT NULL,
ack_id TEXT UNIQUE,
ack BOOLEAN DEFAULT 0,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE INDEX IF NOT EXISTS %[2]s ON %[1]s (status, created_at);
CREATE INDEX IF NOT EXISTS %[3]s ON %[1]s (status, ack);
CREATE INDEX IF NOT EXISTS %[4]s ON %[1]s (ack_id);
`,
quoteIdent(q.tableName),
quoteIdent(q.tableName+"_status_idx"),
quoteIdent(q.tableName+"_status_ack_idx"),
quoteIdent(q.tableName+"_ack_id_idx"))
_, err := q.client.Exec(createTableSQL)
return err
}
func (q *Queue) RequeueNoAckRows() {
tx, err := q.client.Begin()
defer func() {
if err != nil {
tx.Rollback()
}
}()
_, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'pending', updated_at = ? WHERE status = 'processing' AND ack = 0",
quoteIdent(q.tableName)),
time.Now().UTC(),
)
err = tx.Commit()
}
// Enqueue adds an item to the queue
// It serializes the item to JSON and stores it in the database
// Returns true if the operation was successful
func (q *Queue) Enqueue(item any) bool {
if q.closed.Load() {
return false
}
now := time.Now().UTC()
tx, err := q.client.Begin()
if err != nil {
return false
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
_, err = tx.Exec(
fmt.Sprintf("INSERT INTO %s (data, status, ack, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
quoteIdent(q.tableName)), item, "pending", 0, now, now)
if err != nil {
return false
}
err = tx.Commit()
return err == nil
}
// dequeueInternal is a helper function for both Dequeue and DequeueWithAckId
// It handles the common operations of finding and retrieving an item from the queue
// If withAckId is true, it will generate and store an ack ID
func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID string) {
if q.closed.Load() {
return nil, false, ""
}
tx, err := q.client.Begin()
if err != nil {
return nil, false, ""
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// Get the oldest pending item
var id int64
var data []byte
// Only dequeue pending items in FIFO order
row := tx.QueryRow(fmt.Sprintf(
"SELECT id, data, ack_id FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1",
quoteIdent(q.tableName),
))
// Use NullString to handle NULL values from database
var nullAckID sql.NullString
// Scan the row data
err = row.Scan(&id, &data, &nullAckID) // ackID may be NULL for pending items
// Extract the string value if valid
if nullAckID.Valid {
ackID = nullAckID.String
}
if err != nil {
return nil, false, ""
}
// Update the status to 'processing' or delete the item, based on withAckId
now := time.Now().UTC()
if withAckId {
if ackID == "" {
ackID = cuid.New()
}
// Update the item to processing status
_, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?",
quoteIdent(q.tableName)),
ackID, now, id,
)
} else {
// For regular Dequeue, just delete the item immediately
_, err = tx.Exec(
fmt.Sprintf("DELETE FROM %s WHERE id = ?", quoteIdent(q.tableName)),
id,
)
}
if err != nil {
return nil, false, ""
}
err = tx.Commit()
if err != nil {
return nil, false, ""
}
return data, true, ackID
}
// Dequeue removes and returns the next item from the queue
// Returns the item and a boolean indicating if the operation was successful
func (q *Queue) Dequeue() (any, bool) {
item, success, _ := q.dequeueInternal(false)
return item, success
}
// DequeueWithAckId removes and returns the next item from the queue with an acknowledgment ID
// Returns the item, a boolean indicating if the operation was successful, and the acknowledgment ID
func (q *Queue) DequeueWithAckId() (any, bool, string) {
return q.dequeueInternal(true)
}
// Acknowledge marks an item as completed
// Returns true if the item was successfully acknowledged, false otherwise
func (q *Queue) Acknowledge(ackID string) bool {
tx, err := q.client.Begin()
if err != nil {
return false
}
var rowsAffected int64
defer func() {
if err != nil || rowsAffected == 0 {
tx.Rollback()
}
}()
var result sql.Result
if q.removeOnComplete {
// If removeOnComplete is true, delete the acknowledged item
result, err = tx.Exec(
fmt.Sprintf("DELETE FROM %s WHERE ack_id = ? ", quoteIdent(q.tableName)),
ackID,
)
} else {
// Otherwise, mark it as completed and set ack to 1 (true in SQLite)
result, err = tx.Exec(
fmt.Sprintf("UPDATE %s SET status = 'completed', ack = 1, updated_at = ? WHERE ack_id = ?", quoteIdent(q.tableName)),
time.Now().UTC(), ackID,
)
}
if err != nil {
return false
}
rowsAffected, err = result.RowsAffected()
if err != nil || rowsAffected == 0 {
return false
}
err = tx.Commit()
return err == nil
}
// Len returns the number of pending items in the queue
func (q *Queue) Len() int {
var count int
row := q.client.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE status = 'pending'", quoteIdent(q.tableName)))
err := row.Scan(&count)
if err != nil {
return 0
}
return count
}
// Values returns all pending items in the queue
func (q *Queue) Values() []any {
rows, err := q.client.Query(fmt.Sprintf("SELECT data FROM %s WHERE status = 'pending' ORDER BY created_at ASC", quoteIdent(q.tableName)))
if err != nil {
return nil
}
defer rows.Close()
var items []any
for rows.Next() {
var data []byte
if err := rows.Scan(&data); err != nil {
continue
}
// Now we just add the byte array directly as we're storing byte arrays
// instead of JSON-serialized data
items = append(items, data)
}
return items
}
// Purge removes all items from the queue
func (q *Queue) Purge() {
tx, err := q.client.Begin()
if err != nil {
return
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
_, err = tx.Exec(fmt.Sprintf("DELETE FROM %s", quoteIdent(q.tableName)))
if err != nil {
return
}
err = tx.Commit()
}
// Close closes the queue and its database connection
func (q *Queue) Close() error {
q.closed.Store(true)
return nil
}