1
- import type { Message , PubSub } from '@google-cloud/pubsub' ;
2
-
3
1
import type {
4
2
MessageQueue ,
5
3
MessageQueueEnqueueOptions ,
6
4
MessageQueueListenOptions ,
7
5
} from '@fedify/fedify' ;
6
+ import type { Message , PubSub } from '@google-cloud/pubsub' ;
8
7
import type { Logger } from '@logtape/logtape' ;
8
+ import * as Sentry from '@sentry/node' ;
9
9
10
10
export class GCloudPubSubMessageQueue implements MessageQueue {
11
11
private pubSubClient : PubSub ;
@@ -56,6 +56,10 @@ export class GCloudPubSubMessageQueue implements MessageQueue {
56
56
this . logger . error (
57
57
`Failed to enqueue message [FedifyID: ${ message . id } ]: ${ error } ` ,
58
58
) ;
59
+
60
+ Sentry . captureException ( error ) ;
61
+
62
+ throw error ;
59
63
}
60
64
}
61
65
@@ -67,31 +71,49 @@ export class GCloudPubSubMessageQueue implements MessageQueue {
67
71
this . subscriptionIdentifier ,
68
72
) ;
69
73
70
- subscription . on ( 'message' , async ( message : Message ) => {
71
- const fedifyId = message . attributes . fedifyId ?? 'unknown' ;
74
+ subscription
75
+ . on ( 'message' , async ( message : Message ) => {
76
+ const fedifyId = message . attributes . fedifyId ?? 'unknown' ;
72
77
73
- this . logger . info (
74
- `Handling message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ]` ,
75
- ) ;
78
+ this . logger . info (
79
+ `Handling message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ]` ,
80
+ ) ;
76
81
77
- try {
78
- const json = JSON . parse ( message . data . toString ( ) ) ;
82
+ try {
83
+ const json = JSON . parse ( message . data . toString ( ) ) ;
79
84
80
- await handler ( json ) ;
85
+ await handler ( json ) ;
81
86
82
- message . ack ( ) ;
87
+ message . ack ( ) ;
83
88
84
- this . logger . info (
85
- `Acknowledged message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ]` ,
86
- ) ;
87
- } catch ( error ) {
88
- message . nack ( ) ;
89
+ this . logger . info (
90
+ `Acknowledged message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ]` ,
91
+ ) ;
92
+ } catch ( error ) {
93
+ message . nack ( ) ;
94
+
95
+ this . logger . error (
96
+ `Failed to handle message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ]: ${ error } ` ,
97
+ ) ;
89
98
99
+ Sentry . captureException ( error ) ;
100
+ }
101
+ } )
102
+ . on ( 'error' , ( error ) => {
90
103
this . logger . error (
91
- `Failed to handle message [FedifyID: ${ fedifyId } , PubSubID: ${ message . id } ] : ${ error } ` ,
104
+ `Subscription [ ${ this . subscriptionIdentifier } ] error occurred : ${ error } ` ,
92
105
) ;
93
- }
94
- } ) ;
106
+
107
+ Sentry . captureException ( error ) ;
108
+
109
+ // This is a fatal error, so we should throw to stop the listener / process
110
+ throw error ;
111
+ } )
112
+ . on ( 'close' , ( ) => {
113
+ this . logger . info (
114
+ `Subscription [${ this . subscriptionIdentifier } ] closed` ,
115
+ ) ;
116
+ } ) ;
95
117
96
118
return await new Promise ( ( resolve ) => {
97
119
options . signal ?. addEventListener ( 'abort' , ( ) => {
0 commit comments