Skip to content

Commit d523946

Browse files
committed
Poll many messages at once
1 parent bb96183 commit d523946

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

src/mq.ts

+12-8
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ export class PostgresMessageQueue implements MessageQueue {
9898
await this.initialize();
9999
const { signal } = options;
100100
const poll = async () => {
101-
if (signal?.aborted) return;
102-
const query = this.#sql`
101+
while (!signal?.aborted) {
102+
const query = this.#sql`
103103
DELETE FROM ${this.#sql(this.#tableName)}
104104
WHERE id = (
105105
SELECT id
@@ -110,13 +110,17 @@ export class PostgresMessageQueue implements MessageQueue {
110110
)
111111
RETURNING message;
112112
`.execute();
113-
const cancel = query.cancel.bind(query);
114-
signal?.addEventListener("abort", cancel);
115-
for (const message of await query) {
116-
if (signal?.aborted) return;
117-
await handler(JSON.parse(message.message));
113+
const cancel = query.cancel.bind(query);
114+
signal?.addEventListener("abort", cancel);
115+
let i = 0;
116+
for (const message of await query) {
117+
if (signal?.aborted) return;
118+
await handler(JSON.parse(message.message));
119+
i++;
120+
}
121+
signal?.removeEventListener("abort", cancel);
122+
if (i < 1) break;
118123
}
119-
signal?.removeEventListener("abort", cancel);
120124
};
121125
const timeouts = new Set<ReturnType<typeof setTimeout>>();
122126
const listen = await this.#sql.listen(

0 commit comments

Comments
 (0)