@@ -3,8 +3,9 @@ import type {
3
3
MessageQueueEnqueueOptions ,
4
4
MessageQueueListenOptions ,
5
5
} from "@fedify/fedify" ;
6
- import type { Sql } from "postgres" ;
6
+ import type { JSONValue , Parameter , Sql } from "postgres" ;
7
7
import postgres from "postgres" ;
8
+ import { driverSerializesJson } from "./utils.ts" ;
8
9
9
10
/**
10
11
* Options for the PostgreSQL message queue.
@@ -61,6 +62,7 @@ export class PostgresMessageQueue implements MessageQueue {
61
62
readonly #channelName: string ;
62
63
readonly #pollIntervalMs: number ;
63
64
#initialized: boolean ;
65
+ #driverSerializesJson = false ;
64
66
65
67
constructor (
66
68
// deno-lint-ignore ban-types
@@ -86,7 +88,7 @@ export class PostgresMessageQueue implements MessageQueue {
86
88
await this . #sql`
87
89
INSERT INTO ${ this . #sql( this . #tableName) } (message, delay)
88
90
VALUES (
89
- ${ this . #sql . json ( message ) } ,
91
+ ${ this . #json( message ) } ,
90
92
${ delay . toString ( ) }
91
93
);
92
94
` ;
@@ -181,6 +183,7 @@ export class PostgresMessageQueue implements MessageQueue {
181
183
throw e ;
182
184
}
183
185
}
186
+ this . #driverSerializesJson = await driverSerializesJson ( this . #sql) ;
184
187
this . #initialized = true ;
185
188
}
186
189
@@ -190,6 +193,11 @@ export class PostgresMessageQueue implements MessageQueue {
190
193
async drop ( ) : Promise < void > {
191
194
await this . #sql`DROP TABLE IF EXISTS ${ this . #sql( this . #tableName) } ;` ;
192
195
}
196
+
197
+ #json( value : unknown ) : Parameter {
198
+ if ( this . #driverSerializesJson) return this . #sql. json ( value as JSONValue ) ;
199
+ return this . #sql. json ( JSON . stringify ( value ) ) ;
200
+ }
193
201
}
194
202
195
203
// cSpell: ignore typname
0 commit comments