Skip to content

Commit 52f98f4

Browse files
committed
Added GCloud Pub/Sub message queue implementation
refs [AP-566](https://linear.app/ghost/issue/AP-566/implement-a-pubsub-backed-queue-for-fedify) Added and configured a GCloud Pub/Sub message queue implementation for Fedify
1 parent b694767 commit 52f98f4

File tree

7 files changed

+572
-2
lines changed

7 files changed

+572
-2
lines changed

docker-compose.yml

+24-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ services:
1313
- NODE_ENV=testing
1414
- ALLOW_PRIVATE_ADDRESS=true
1515
- SKIP_SIGNATURE_VERIFICATION=true
16+
- USE_MQ=true
17+
- MQ_PUBSUB_PROJECT_ID=activitypub
18+
- MQ_PUBSUB_HOST=pubsub:8085
19+
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
20+
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
1621
command: yarn build:watch
1722
depends_on:
1823
migrate:
@@ -74,7 +79,13 @@ services:
7479
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
7580
ports:
7681
- "8085:8085"
77-
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
82+
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
83+
volumes:
84+
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
85+
environment:
86+
- PROJECT_ID=activitypub
87+
- TOPIC_NAME=activitypub_topic_changeme
88+
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
7889
healthcheck:
7990
test: "curl -f http://localhost:8085 || exit 1"
8091
interval: 1s
@@ -98,6 +109,11 @@ services:
98109
- SKIP_SIGNATURE_VERIFICATION=true
99110
- ALLOW_PRIVATE_ADDRESS=true
100111
- NODE_TLS_REJECT_UNAUTHORIZED=0
112+
- USE_MQ=true
113+
- MQ_PUBSUB_PROJECT_ID=activitypub
114+
- MQ_PUBSUB_HOST=pubsub-testing:8085
115+
- MQ_PUBSUB_TOPIC_NAME=activitypub_topic_changeme
116+
- MQ_PUBSUB_SUBSCRIPTION_NAME=activitypub_subscription_changeme
101117
command: yarn build:watch
102118
depends_on:
103119
mysql-testing:
@@ -157,7 +173,13 @@ services:
157173
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:499.0.0-emulators
158174
ports:
159175
- "8086:8085"
160-
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085 --project=activitypub
176+
command: /bin/bash -c "/opt/activitypub/start-pubsub.sh"
177+
volumes:
178+
- ./pubsub/start.sh:/opt/activitypub/start-pubsub.sh
179+
environment:
180+
- PROJECT_ID=activitypub
181+
- TOPIC_NAME=activitypub_topic_changeme
182+
- SUBSCRIPTION_NAME=activitypub_subscription_changeme
161183
healthcheck:
162184
test: "curl -f http://localhost:8085 || exit 1"
163185
interval: 1s

pubsub/start.sh

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/bin/bash
2+
3+
# This script is used to start the Pub/Sub emulator and create the required
4+
# topic and subscription upfront (defined in the environment variables)
5+
#
6+
# See:
7+
# https://cloud.google.com/pubsub/docs/emulator
8+
# https://cloud.google.com/pubsub/docs/create-topic#pubsub_create_topic-rest
9+
# https://cloud.google.com/pubsub/docs/create-push-subscription#pubsub_create_push_subscription-rest
10+
11+
# Ensure we explicitly set the host to 0.0.0.0:8085 so that the emulator will
12+
# listen on all ip addresses and not just IPv6 (which is the default)
13+
HOST=0.0.0.0:8085
14+
15+
# Start the emulator
16+
gcloud beta emulators pubsub start --host-port=${HOST} --project=${PROJECT_ID} &
17+
18+
# Wait for the emulator to be ready
19+
until curl -f http://${HOST}; do
20+
echo "Waiting for Pub/Sub emulator to start..."
21+
22+
sleep 1
23+
done
24+
25+
# Create the topic via REST API
26+
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/topics/${TOPIC_NAME} | grep -q "200"; then
27+
echo "Topic created: ${TOPIC_NAME}"
28+
else
29+
echo "Failed to create topic: ${TOPIC_NAME}"
30+
exit 1
31+
fi
32+
33+
# Create the subscription via REST API
34+
if curl -s -o /dev/null -w "%{http_code}" -X PUT http://${HOST}/v1/projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_NAME} \
35+
-H "Content-Type: application/json" \
36+
-d '{
37+
"topic": "projects/'${PROJECT_ID}'/topics/'${TOPIC_NAME}'"
38+
}' | grep -q "200"; then
39+
echo "Subscription created: ${SUBSCRIPTION_NAME}"
40+
else
41+
echo "Failed to create subscription: ${SUBSCRIPTION_NAME}"
42+
exit 1
43+
fi
44+
45+
# Keep the container running
46+
tail -f /dev/null

src/app.ts

+57
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
Follow,
1212
type KvStore,
1313
Like,
14+
type MessageQueue,
1415
Note,
1516
Undo,
1617
Update,
@@ -75,6 +76,7 @@ import {
7576
undoDispatcher,
7677
updateDispatcher,
7778
} from './dispatchers';
79+
import { GCloudPubSubMessageQueue } from './fedify/mq/gcloud-pubsub-mq';
7880
import { KnexKvStore } from './knex.kvstore';
7981
import { scopeKvStore } from './kv-helpers';
8082

@@ -88,6 +90,13 @@ import {
8890
unlikeAction,
8991
} from './handlers';
9092

93+
import { PubSub } from '@google-cloud/pubsub';
94+
import {
95+
getFullSubscriptionIdentifier,
96+
getFullTopicIdentifier,
97+
subscriptionExists,
98+
topicExists,
99+
} from 'helpers/gcloud-pubsub';
91100
import { getTraceAndSpanId } from './helpers/context-header';
92101
import { getRequestData } from './helpers/request-data';
93102
import { spanWrapper } from './instrumentation';
@@ -153,8 +162,56 @@ export type ContextData = {
153162

154163
const fedifyKv = await KnexKvStore.create(client, 'key_value');
155164

165+
let messageQueue: MessageQueue | undefined;
166+
167+
if (process.env.USE_MQ === 'true') {
168+
logging.info('Message queue is enabled');
169+
170+
try {
171+
const pubSubClient = new PubSub({
172+
projectId: process.env.MQ_PUBSUB_PROJECT_ID,
173+
apiEndpoint: process.env.MQ_PUBSUB_HOST,
174+
emulatorMode: process.env.NODE_ENV !== 'production',
175+
});
176+
177+
const topicName = process.env.MQ_PUBSUB_TOPIC_NAME ?? 'unknown_topic';
178+
const subscriptionName =
179+
process.env.MQ_PUBSUB_SUBSCRIPTION_NAME ?? 'unknown_subscription';
180+
181+
const topicIdentifier = getFullTopicIdentifier(pubSubClient, topicName);
182+
const subscriptionIdentifier = getFullSubscriptionIdentifier(
183+
pubSubClient,
184+
subscriptionName,
185+
);
186+
187+
if (!(await topicExists(pubSubClient, topicIdentifier))) {
188+
throw new Error(`Topic does not exist: ${topicName}`);
189+
}
190+
191+
if (!(await subscriptionExists(pubSubClient, subscriptionIdentifier))) {
192+
throw new Error(`Subscription does not exist: ${subscriptionName}`);
193+
}
194+
195+
messageQueue = new GCloudPubSubMessageQueue(
196+
pubSubClient,
197+
topicIdentifier,
198+
subscriptionIdentifier,
199+
logging,
200+
);
201+
} catch (err) {
202+
logging.error('Failed to initialise message queue {error}', {
203+
error: err,
204+
});
205+
206+
process.exit(1);
207+
}
208+
} else {
209+
logging.info('Message queue is disabled');
210+
}
211+
156212
export const fedify = createFederation<ContextData>({
157213
kv: fedifyKv,
214+
queue: messageQueue,
158215
skipSignatureVerification:
159216
process.env.SKIP_SIGNATURE_VERIFICATION === 'true' &&
160217
process.env.NODE_ENV === 'testing',

src/fedify/mq/gcloud-pubsub-mq.ts

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import type { Message, PubSub } from '@google-cloud/pubsub';
2+
3+
import type {
4+
MessageQueue,
5+
MessageQueueEnqueueOptions,
6+
MessageQueueListenOptions,
7+
} from '@fedify/fedify';
8+
import type { Logger } from '@logtape/logtape';
9+
10+
export class GCloudPubSubMessageQueue implements MessageQueue {
11+
private pubSubClient: PubSub;
12+
private topicIdentifier: string;
13+
private subscriptionIdentifier: string;
14+
private logger: Logger;
15+
16+
constructor(
17+
pubSubClient: PubSub,
18+
topicIdentifier: string,
19+
subscriptionIdentifier: string,
20+
logger: Logger,
21+
) {
22+
this.pubSubClient = pubSubClient;
23+
this.topicIdentifier = topicIdentifier;
24+
this.subscriptionIdentifier = subscriptionIdentifier;
25+
this.logger = logger;
26+
}
27+
28+
async enqueue(
29+
message: any,
30+
options?: MessageQueueEnqueueOptions,
31+
): Promise<void> {
32+
const delay = options?.delay?.total('millisecond') ?? 0;
33+
34+
this.logger.info(
35+
`Enqueuing message [FedifyID: ${message.id}] with delay: ${delay}ms`,
36+
);
37+
38+
if (delay > 0) {
39+
await new Promise((resolve) => setTimeout(resolve, delay));
40+
}
41+
42+
try {
43+
const messageId = await this.pubSubClient
44+
.topic(this.topicIdentifier)
45+
.publishMessage({
46+
json: message,
47+
attributes: {
48+
fedifyId: message.id,
49+
},
50+
});
51+
52+
this.logger.info(
53+
`Message [FedifyID: ${message.id}] was enqueued [PubSubID: ${messageId}]`,
54+
);
55+
} catch (error) {
56+
this.logger.error(
57+
`Failed to enqueue message [FedifyID: ${message.id}]: ${error}`,
58+
);
59+
}
60+
}
61+
62+
async listen(
63+
handler: (message: any) => Promise<void> | void,
64+
options: MessageQueueListenOptions = {},
65+
): Promise<void> {
66+
const subscription = this.pubSubClient.subscription(
67+
this.subscriptionIdentifier,
68+
);
69+
70+
subscription.on('message', async (message: Message) => {
71+
const fedifyId = message.attributes.fedifyId ?? 'unknown';
72+
73+
this.logger.info(
74+
`Handling message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`,
75+
);
76+
77+
try {
78+
const json = JSON.parse(message.data.toString());
79+
80+
await handler(json);
81+
82+
message.ack();
83+
84+
this.logger.info(
85+
`Acknowledged message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]`,
86+
);
87+
} catch (error) {
88+
message.nack();
89+
90+
this.logger.error(
91+
`Failed to handle message [FedifyID: ${fedifyId}, PubSubID: ${message.id}]: ${error}`,
92+
);
93+
}
94+
});
95+
96+
return await new Promise((resolve) => {
97+
options.signal?.addEventListener('abort', () => {
98+
subscription
99+
.removeAllListeners()
100+
.close()
101+
.then(() => resolve());
102+
});
103+
});
104+
}
105+
}

0 commit comments

Comments
 (0)