Skip to content

Commit

Permalink
Misc refactoring of the MQ middleware
Browse files Browse the repository at this point in the history
no refs

Refactorings include:

- Improved comments on not so obvious code
- Improved usage of logging
- Slight reordering of code for better performance (i.e dont validate data when not needed)
- Consolidation of some input validation
- Naming of anonymous functions
  • Loading branch information
mike182uk committed Jan 8, 2025
1 parent 266e98d commit e4fbda3
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 36 deletions.
7 changes: 5 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ import { KnexKvStore } from './knex.kvstore';
import { scopeKvStore } from './kv-helpers';
import {
GCloudPubSubPushMessageQueue,
createHandlePushMessageMiddleware,
createMessageQueue,
handlePushMessage,
} from './mq/gcloud-pubsub-push/mq';

const logging = getLogger(['activitypub']);
Expand Down Expand Up @@ -569,7 +569,10 @@ app.use(async (ctx, next) => {
// This needs to go before the middleware which loads the site
// because this endpoint does not require the site to exist
if (queue instanceof GCloudPubSubPushMessageQueue) {
app.post('/.ghost/activitypub/mq', spanWrapper(handlePushMessage(queue)));
app.post(
'/.ghost/activitypub/mq',
spanWrapper(createHandlePushMessageMiddleware(queue, logging)),
);
}

app.use(async (ctx, next) => {
Expand Down
45 changes: 28 additions & 17 deletions src/mq/gcloud-pubsub-push/mq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,46 +244,57 @@ const IncomingPushMessageSchema = z.object({
* Hono middleware to handle an incoming message from a Pub/Sub push subscription
*
* @param mq {GCloudPubSubPushMessageQueue} Message queue instance
* @param logger {Logger} Logger instance
* @returns {function}
*
* @example
* ```
* import { createMessageQueue, handlePushMessage } from './mq/gcloud-pubsub-push';
* import { createHandlePushMessageMiddleware, createMessageQueue } from './mq/gcloud-pubsub-push';
*
* const queue = await createMessageQueue(...);
*
* app.post('/mq', handlePushMessage(queue));
* app.post('/mq', createHandlePushMessageMiddleware(queue, logging));
* ```
*/
export function handlePushMessage(
export function createHandlePushMessageMiddleware(
mq: GCloudPubSubPushMessageQueue,
logger: Logger,
): (ctx: Context) => Promise<Response> {
return async (ctx: Context) => {
// Check that the incoming JSON is valid
let json: z.infer<typeof IncomingPushMessageSchema>;

try {
json = IncomingPushMessageSchema.parse(
(await ctx.req.json()) as unknown,
return async function handlePushMessage(ctx: Context) {
// Check that the message queue is listening and if not, return a non-200
// response to instruct GCloud Pub/Sub to back off from pushing messages to
// this endpoint - See https://cloud.google.com/pubsub/docs/push#push_backoff
if (mq.isListening === false) {
logger.info(
'Message queue is not listening, cannot handle message',
);
} catch (error) {
return new Response(JSON.stringify(error), { status: 400 });
}

// Check that the message queue is listening
if (mq.isListening === false) {
return new Response(null, { status: 429 });
}

// Validate the incoming data
let json: z.infer<typeof IncomingPushMessageSchema>;
let data: FedifyMessage;

// Attempt to parse the incoming message data
try {
json = IncomingPushMessageSchema.parse(
(await ctx.req.json()) as unknown,
);

// We expect the message data to be base64 encoded JSON - See
// - https://cloud.google.com/pubsub/docs/publish-message-overview#about-messages
// - https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
// (we use https://github.com/googleapis/nodejs-pubsub to publish
// messages which uses the REST API)
data = JSON.parse(
Buffer.from(json.message.data, 'base64').toString(),
);
} catch (error) {
return new Response(null, { status: 500 });
logger.error(`Invalid incoming push message received: ${error}`, {
error,
});

return new Response(null, { status: 400 });
}

// Handle the message
Expand Down
59 changes: 42 additions & 17 deletions src/mq/gcloud-pubsub-push/mq.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest';

import {
GCloudPubSubPushMessageQueue,
createHandlePushMessageMiddleware,
createMessageQueue,
handlePushMessage,
} from './mq';

vi.mock('@google-cloud/pubsub', () => ({
Expand Down Expand Up @@ -289,37 +289,47 @@ describe('handlePushMessage', () => {
} as unknown as PubSub;
});

it('should return a 400 response if the incoming message data is invalid', async () => {
it('should return a 429 response if the message queue is not listening', async () => {
const mq = new GCloudPubSubPushMessageQueue(
mockLogger,
mockPubSubClient,
TOPIC,
);

(ctx.req.json as Mock).mockResolvedValue({
foo: 'bar',
});

mq.listen(vi.fn());
const handlePushMessage = createHandlePushMessageMiddleware(
mq,
mockLogger,
);

const result = await handlePushMessage(mq)(ctx);
const result = await handlePushMessage(ctx);

expect(result.status).toBe(400);
expect(result.status).toBe(429);
});

it('should return a 429 response if the message queue is not listening', async () => {
it('should return a 400 response if the incoming message data is invalid', async () => {
const mq = new GCloudPubSubPushMessageQueue(
mockLogger,
mockPubSubClient,
TOPIC,
);

const result = await handlePushMessage(mq)(ctx);
(ctx.req.json as Mock).mockResolvedValue({
foo: 'bar',
});

expect(result.status).toBe(429);
mq.listen(vi.fn());

const handlePushMessage = createHandlePushMessageMiddleware(
mq,
mockLogger,
);

const result = await handlePushMessage(ctx);

expect(result.status).toBe(400);
});

it('should return a 500 response if the incoming message data cannot be parsed', async () => {
it('should return a 400 response if the incoming message data cannot be parsed', async () => {
const mq = new GCloudPubSubPushMessageQueue(
mockLogger,
mockPubSubClient,
Expand All @@ -336,9 +346,14 @@ describe('handlePushMessage', () => {

mq.listen(vi.fn());

const result = await handlePushMessage(mq)(ctx);
const handlePushMessage = createHandlePushMessageMiddleware(
mq,
mockLogger,
);

expect(result.status).toBe(500);
const result = await handlePushMessage(ctx);

expect(result.status).toBe(400);
});

it('should return a 200 response if the incoming message is successfully handled', async () => {
Expand All @@ -350,7 +365,12 @@ describe('handlePushMessage', () => {

mq.listen(vi.fn().mockResolvedValue(undefined));

const result = await handlePushMessage(mq)(ctx);
const handlePushMessage = createHandlePushMessageMiddleware(
mq,
mockLogger,
);

const result = await handlePushMessage(ctx);

expect(result.status).toBe(200);
});
Expand All @@ -366,7 +386,12 @@ describe('handlePushMessage', () => {
vi.fn().mockRejectedValue(new Error('Failed to handle message')),
);

const result = await handlePushMessage(mq)(ctx);
const handlePushMessage = createHandlePushMessageMiddleware(
mq,
mockLogger,
);

const result = await handlePushMessage(ctx);

expect(result.status).toBe(500);
});
Expand Down

0 comments on commit e4fbda3

Please sign in to comment.