Skip to content

Commit

Permalink
List regeneration performance improvements (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
pushchris authored Feb 9, 2025
1 parent be400ec commit a9791ff
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 37 deletions.
2 changes: 1 addition & 1 deletion apps/platform/src/campaigns/CampaignService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ export const generateSendList = async (campaign: SentCampaign) => {
}

const query = recipientQuery(campaign)
await chunk<CampaignSendParams>(query, 100, async (items) => {
await chunk<CampaignSendParams>(query, 25, async (items) => {
await CampaignSend.query()
.insert(items)
.onConflict(['campaign_id', 'user_id', 'reference_id'])
Expand Down
71 changes: 35 additions & 36 deletions apps/platform/src/lists/ListService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { RequestError } from '../core/errors'
import RuleError from '../rules/RuleError'
import ListEvaluateUserJob from './ListEvaluateUserJob'
import ListStatsJob from './ListStatsJob'
import { PassThrough } from 'stream'

export const CacheKeys = {
memberCount: (list: List) => `list:${list.id}:${list.version}:count`,
Expand Down Expand Up @@ -250,15 +251,15 @@ interface UserListEventEvaluation {

interface UserListEvaluation {
list: List
scroll: AsyncGenerator<User[], any, any>
stream: PassThrough & AsyncIterable<unknown>
since?: Date | null
handleRule: (evaluation: UserListEventEvaluation) => Promise<void>
handleEntry: (user: User, result: boolean) => Promise<void>
}

const scrollUserListForEvaluation = async ({
const streamUserListForEvaluation = async ({
list,
scroll,
stream,
since,
handleRule,
handleEntry,
Expand All @@ -267,33 +268,31 @@ const scrollUserListForEvaluation = async ({
const rule = await fetchAndCompileRule(list.rule_id) as RuleTree
const { eventRules, userRules } = splitRuleTree(rule)

for await (const users of scroll) {

// For each user, evaluate parts and batch enqueue
for (const user of users) {

const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules, since)

for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)
await handleRule({
rule_id: rule.id!,
user_id: user.id,
result,
})
parts.push({
...rule,
result,
})
}
// For each user, evaluate parts and batch enqueue
for await (const rawUser of stream) {
const user = User.fromJson(rawUser)

const parts: RuleWithEvaluationResult[] = []
const events = await getUserEventsForRules([user.id], eventRules, since)

const result = checkRules(user, rule, [...parts, ...userRules])
await handleEntry(user, result)
for (const rule of eventRules) {
const result = check({
user: user.flatten(),
events: events.map(e => e.flatten()),
}, rule)
await handleRule({
rule_id: rule.id!,
user_id: user.id,
result,
})
parts.push({
...rule,
result,
})
}

const result = checkRules(user, rule, [...parts, ...userRules])
await handleEntry(user, result)
}
}

Expand Down Expand Up @@ -406,22 +405,22 @@ export const refreshList = async (list: List, types: DateRuleTypes) => {
const { id } = list
await updateListState(id, { state: 'loading' })

const scroll = User.scroll(q =>
q.leftJoin('user_list', 'user_list.user_id', 'users.id')
.where('project_id', list.project_id)
.where('user_list.list_id', list.id)
.select('users.*'),
)
const stream = UserList.query()
.leftJoin('users', 'user_list.user_id', 'users.id')
.where('project_id', list.project_id)
.where('list_id', list.id)
.select('users.*')
.stream()

const userChunker = new Chunker<number>(async userIds => {
await UserList.delete(qb => qb.whereIn('user_id', userIds)
.where('list_id', list.id))
await cacheDecr(App.main.redis, CacheKeys.memberCount(list), userIds.length)
}, 50)

await scrollUserListForEvaluation({
await streamUserListForEvaluation({
list,
scroll,
stream,
since: types.value,
handleRule: async ({ rule_id, user_id, result }) => {
if (!result) {
Expand Down

0 comments on commit a9791ff

Please sign in to comment.