Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions __tests__/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
GraphQLTestClient,
GraphQLTestingState,
initializeGraphQLTesting,
invokeTypedNotificationWorker,
MockContext,
saveFixtures,
testMutationErrorCode,
Expand All @@ -19,6 +20,8 @@ import {
NotificationPreferenceSource,
NotificationPreference,
NotificationAttachmentType,
PostType,
UserPost,
} from '../src/entity';
import type { UserNotificationFlags } from '../src/entity/user/User';
import { DataSource } from 'typeorm';
Expand All @@ -45,6 +48,11 @@ import {
NotificationAttachmentV2,
NotificationAvatarV2,
} from '../src/entity';
import { PollPost } from '../src/entity/posts/PollPost';
import { getTableName } from '../src/workers/cdc/common';
import { PollOption } from '../src/entity/polls/PollOption';
import { pollResultAuthorNotification } from '../src/workers/notifications/pollResultAuthorNotification';
import { pollResultNotification } from '../src/workers/notifications/pollResultNotification';

let app: FastifyInstance;
let con: DataSource;
Expand Down Expand Up @@ -1465,3 +1473,150 @@ describe('streamNotificationUsers', () => {
expect(results).toHaveLength(0);
});
});

describe('poll result notifications', () => {
beforeEach(async () => {
await con.getRepository(User).save(usersFixture);
await saveFixtures(con, Source, sourcesFixture);
});

const createPollPost = async (authorId: string, endsAt?: Date) => {
return con.getRepository(Post).save({
id: 'poll-1',
shortId: 'poll-short',
authorId,
sourceId: 'a',
title: 'What is your favorite framework?',
type: PostType.Poll,
createdAt: new Date('2021-09-22T07:15:51.247Z'),
endsAt,
});
};

const createPollOptions = async (pollId: string) => {
return con.getRepository(PollOption).save([
{
id: '01234567-0123-0123-0123-0123456789ab',
postId: pollId,
text: 'React',
order: 1,
numVotes: 0,
},
{
id: '01234567-0123-0123-0123-0123456789ac',
postId: pollId,
text: 'Vue',
order: 2,
numVotes: 0,
},
]);
};

const createPollVotes = async (
pollId: string,
voterIds: string[],
optionId = '01234567-0123-0123-0123-0123456789ab',
) => {
return con.getRepository(UserPost).save(
voterIds.map((userId) => ({
userId,
postId: pollId,
pollVoteOptionId: optionId,
})),
);
};

it('should send notification to poll author when poll expires', async () => {
const poll = await createPollPost('1'); // user '1' is the author

const result =
await invokeTypedNotificationWorker<'api.v1.delayed-notification-reminder'>(
pollResultAuthorNotification,
{
entityId: poll.id,
entityTableName: getTableName(con, PollPost),
scheduledAtMs: Date.now(),
delayMs: 1000,
},
);

expect(result).toHaveLength(1);
expect(result![0].type).toBe(NotificationType.PollResult);
expect(result![0].ctx.userIds).toEqual(['1']);
});

it('should send notifications to poll voters when poll expires', async () => {
const poll = await createPollPost('1'); // user '1' is the author
await createPollOptions(poll.id);
await createPollVotes(poll.id, ['2', '3', '4']); // users 2, 3, 4 voted

const result =
await invokeTypedNotificationWorker<'api.v1.delayed-notification-reminder'>(
pollResultNotification,
{
entityId: poll.id,
entityTableName: getTableName(con, PollPost),
scheduledAtMs: Date.now(),
delayMs: 1000,
},
);

expect(result).toHaveLength(1);
expect(result![0].type).toBe(NotificationType.PollResult);
expect(result![0].ctx.userIds).toEqual(['2', '3', '4']);
});

it('should exclude poll author from voter notifications', async () => {
const poll = await createPollPost('1'); // user '1' is the author
await createPollOptions(poll.id);
await createPollVotes(poll.id, ['1', '2', '3']); // author also voted

const result =
await invokeTypedNotificationWorker<'api.v1.delayed-notification-reminder'>(
pollResultNotification,
{
entityId: poll.id,
entityTableName: getTableName(con, PollPost),
scheduledAtMs: Date.now(),
delayMs: 1000,
},
);

expect(result).toHaveLength(1);
expect(result![0].type).toBe(NotificationType.PollResult);
expect(result![0].ctx.userIds).toEqual(['2', '3']); // author excluded
});

it('should return nothing if no voters exist', async () => {
const poll = await createPollPost('1');
// No votes created

const result =
await invokeTypedNotificationWorker<'api.v1.delayed-notification-reminder'>(
pollResultNotification,
{
entityId: poll.id,
entityTableName: getTableName(con, PollPost),
scheduledAtMs: Date.now(),
delayMs: 1000,
},
);

expect(result).toBeUndefined();
});

it('should return nothing if poll does not exist', async () => {
const result =
await invokeTypedNotificationWorker<'api.v1.delayed-notification-reminder'>(
pollResultNotification,
{
entityId: 'non-existent-poll',
entityTableName: getTableName(con, PollPost),
scheduledAtMs: Date.now(),
delayMs: 1000,
},
);

expect(result).toBeUndefined();
});
});
4 changes: 3 additions & 1 deletion src/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ export default async function app(): Promise<void> {
),
);

const typedPubsub = new PubSub();

typedWorkers.forEach((worker) =>
workerSubscribe(
logger,
pubsub,
typedPubsub,
connection,
worker.subscription,
(message, con, logger, pubsub) => {
Expand Down
100 changes: 55 additions & 45 deletions src/common/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,50 +501,60 @@ export const workerSubscribe = (
) => Promise<void>,
maxMessages = 1,
): void => {
logger.info(`subscribing to ${subscription}`);
const sub = pubsub.subscription(subscription, {
flowControl: {
maxMessages,
},
batching: { maxMilliseconds: 10 },
});
const childLogger = logger.child({ subscription });
// const histogram = meter.createHistogram('message_processing_time', {
// unit: 'ms',
// description: 'time to process a message',
// });
sub.on('message', async (message) =>
runInRootSpan(
`message: ${subscription}`,
async (span) => {
// const startTime = performance.now();
// let success = true;
addPubsubSpanLabels(span, subscription, message);
try {
await runInSpan('handler', async () =>
handler(message, connection, childLogger, pubsub),
);
message.ack();
} catch (err) {
// success = false;
childLogger.error(
{
messageId: message.id,
data: message.data.toString('utf-8'),
err,
},
'failed to process message',
);
message.nack();
}
// histogram.record(performance.now() - startTime, {
// subscription,
// success,
// });
try {
logger.info(`subscribing to ${subscription}`);
const sub = pubsub.subscription(subscription, {
flowControl: {
maxMessages,
},
{
kind: opentelemetry.SpanKind.CONSUMER,
},
),
);
batching: { maxMilliseconds: 10 },
});
const childLogger = logger.child({ subscription });
// const histogram = meter.createHistogram('message_processing_time', {
// unit: 'ms',
// description: 'time to process a message',
// });

logger.info({ subscription }, 'fml worker subscribed almost');

sub.on('message', async (message) =>
runInRootSpan(
`message: ${subscription}`,
async (span) => {
// const startTime = performance.now();
// let success = true;
addPubsubSpanLabels(span, subscription, message);
try {
await runInSpan('handler', async () =>
handler(message, connection, childLogger, pubsub),
);
message.ack();
} catch (err) {
// success = false;
childLogger.error(
{
messageId: message.id,
data: message.data.toString('utf-8'),
err,
},
'failed to process message',
);
message.nack();
}
// histogram.record(performance.now() - startTime, {
// subscription,
// success,
// });
},
{
kind: opentelemetry.SpanKind.CONSUMER,
},
),
);
sub.on('error', (err) => {
logger.error({ err, subscription }, 'fml worker on error');
});
} catch (error) {
logger.error({ err: error, subscription }, 'fml worker try catch');
}
};
4 changes: 4 additions & 0 deletions src/workers/notifications/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import { userBriefReadyNotification } from './userBriefReadyNotification';
import { userFollowNotification } from './userFollowNotification';
import { candidateOpportunityMatchNotification } from './candidateOpportunityMatchNotification';
import { campaignPostAnalyticsNotification } from './campaignPostAnalyticsNotification';
import { pollResultNotification } from './pollResultNotification';
import { pollResultAuthorNotification } from './pollResultAuthorNotification';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type AnyNotificationWorker = NotificationWorker | TypedNotificationWorker<any>;
Expand Down Expand Up @@ -120,6 +122,8 @@ const notificationWorkers: AnyNotificationWorker[] = [
userFollowNotification,
candidateOpportunityMatchNotification,
campaignPostAnalyticsNotification,
pollResultNotification,
pollResultAuthorNotification,
];

export const workers = [...notificationWorkers.map(notificationWorkerToWorker)];
47 changes: 47 additions & 0 deletions src/workers/notifications/pollResultAuthorNotification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { PollPost } from '../../entity/posts/PollPost';
import { NotificationType } from '../../notifications/common';
import type { NotificationPostContext } from '../../notifications';
import { messageToJson, TypedNotificationWorker } from '../worker';
import { buildPostContext } from './utils';

export const pollResultAuthorNotification: TypedNotificationWorker<'api.v1.delayed-notification-reminder'> =
{
subscription: 'api.poll-result-author-notification',
handler: async (data, con) => {
if (
data.entityTableName !== con.getRepository(PollPost).metadata.tableName
) {
return;
}

const poll = await con.getRepository(PollPost).findOne({
where: { id: data.entityId },
select: ['id', 'authorId'],
});

if (!poll?.authorId) {
return;
}

const postCtx = await buildPostContext(con, poll.id);
if (!postCtx) {
return;
}

const notificationCtx: NotificationPostContext = {
...postCtx,
userIds: [poll.authorId],
};

return [
{
type: NotificationType.PollResult,
ctx: notificationCtx,
},
];
},
parseMessage(message) {
// TODO: Clean this once we move all workers to TypedWorkers
return messageToJson(message);
},
};
Loading
Loading