Skip to content

Commit a190f72

Browse files
authored
Support segmentId in sync workflows (#2972)
1 parent 5e719f4 commit a190f72

File tree

9 files changed

+106
-71
lines changed

9 files changed

+106
-71
lines changed

services/apps/script_executor_worker/src/activities/sync/entity-index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo'
33

44
import { svc } from '../../main'
55

6-
export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise<void> {
6+
export async function deleteIndexedEntities(
7+
entityType: IndexedEntityType,
8+
segmentIds?: string[],
9+
): Promise<void> {
710
try {
811
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
9-
await indexingRepo.deleteIndexedEntities(entityType)
12+
await indexingRepo.deleteIndexedEntities(entityType, segmentIds)
1013
} catch (error) {
1114
svc.log.error(error, 'Error deleting indexed entities')
1215
throw error

services/apps/script_executor_worker/src/activities/sync/member.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo'
55

66
import { svc } from '../../main'
77

8-
export async function getMembersForSync(batchSize: number): Promise<string[]> {
8+
export async function getMembersForSync(
9+
batchSize: number,
10+
segmentIds?: string[],
11+
): Promise<string[]> {
912
try {
1013
const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log)
11-
return memberRepo.getMembersForSync(batchSize)
14+
return memberRepo.getMembersForSync(batchSize, segmentIds)
1215
} catch (error) {
1316
svc.log.error(error, 'Error getting members for sync')
1417
throw error

services/apps/script_executor_worker/src/activities/sync/organization.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ import { OrganizationRepository } from '@crowd/opensearch/src/repo/organization.
55

66
import { svc } from '../../main'
77

8-
export async function getOrganizationsForSync(batchSize: number): Promise<string[]> {
8+
export async function getOrganizationsForSync(
9+
batchSize: number,
10+
segmentIds?: string[],
11+
): Promise<string[]> {
912
try {
1013
const organizationRepo = new OrganizationRepository(svc.postgres.reader, svc.log)
11-
return organizationRepo.getOrganizationsForSync(batchSize)
14+
return organizationRepo.getOrganizationsForSync(batchSize, null, segmentIds)
1215
} catch (error) {
1316
svc.log.error(error, 'Error getting organizations for sync')
1417
throw error

services/apps/script_executor_worker/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export interface IScriptBatchTestArgs {
3535
}
3636

3737
export interface ISyncArgs extends IScriptBatchTestArgs {
38+
segmentIds?: string[]
3839
chunkSize?: number
3940
clean?: boolean
4041
withAggs?: boolean

services/apps/script_executor_worker/src/workflows/sync/members.ts

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,51 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'
22

33
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
44

5-
import * as entityIndexActivities from '../../activities/sync/entity-index'
6-
import * as memberSyncActivities from '../../activities/sync/member'
5+
import * as activities from '../../activities'
76
import { ISyncArgs } from '../../types'
87

9-
const memberSyncActivity = proxyActivities<typeof memberSyncActivities>({
10-
startToCloseTimeout: '30 minutes',
11-
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
12-
})
13-
14-
const entityIndexActivity = proxyActivities<typeof entityIndexActivities>({
15-
startToCloseTimeout: '10 minutes',
16-
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
17-
})
8+
const { deleteIndexedEntities, markEntitiesIndexed, getMembersForSync, syncMembersBatch } =
9+
proxyActivities<typeof activities>({
10+
startToCloseTimeout: '30 minutes',
11+
})
1812

1913
export async function syncMembers(args: ISyncArgs): Promise<void> {
2014
const BATCH_SIZE = args.batchSize ?? 100
2115
const WITH_AGGS = args.withAggs ?? true
2216

17+
console.log('Starting syncMembers with args:', { ...args })
18+
2319
if (args.clean) {
24-
await entityIndexActivity.deleteIndexedEntities(IndexedEntityType.MEMBER)
20+
await deleteIndexedEntities(IndexedEntityType.MEMBER, args.segmentIds)
2521
console.log('Deleted indexed entities for members!')
2622
}
2723

28-
const memberIds = await memberSyncActivity.getMembersForSync(BATCH_SIZE)
24+
const memberIds = await getMembersForSync(BATCH_SIZE, args.segmentIds)
2925

3026
if (memberIds.length === 0) {
3127
console.log('No more members to sync!')
3228
return
3329
}
3430

3531
const batchStartTime = new Date()
36-
const { memberCount } = await memberSyncActivity.syncMembersBatch(
37-
memberIds,
38-
WITH_AGGS,
39-
args.chunkSize,
40-
)
32+
const { memberCount } = await syncMembersBatch(memberIds, WITH_AGGS, args.chunkSize)
4133

4234
const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000
4335

4436
console.log(
4537
`Synced ${memberCount} members! Speed: ${Math.round(memberCount / diffInSeconds)} members/second!`,
4638
)
4739

48-
await entityIndexActivity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds)
40+
await markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds)
4941

5042
if (args.testRun) {
5143
console.log('Test run completed - stopping after first batch!')
5244
return
5345
}
5446

55-
// Continue as new for the next batch
56-
await continueAsNew<typeof syncMembers>(args)
47+
// Continue as new for the next batch, but without the clean flag to avoid infinite cleaning
48+
await continueAsNew<typeof syncMembers>({
49+
...args,
50+
clean: false,
51+
})
5752
}

services/apps/script_executor_worker/src/workflows/sync/organizations.ts

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,38 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'
22

33
import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'
44

5-
import * as entityIndexActivities from '../../activities/sync/entity-index'
6-
import * as orgSyncActivities from '../../activities/sync/organization'
5+
import * as activities from '../../activities'
76
import { ISyncArgs } from '../../types'
87

9-
const orgSyncActivity = proxyActivities<typeof orgSyncActivities>({
8+
const {
9+
deleteIndexedEntities,
10+
markEntitiesIndexed,
11+
getOrganizationsForSync,
12+
syncOrganizationsBatch,
13+
} = proxyActivities<typeof activities>({
1014
startToCloseTimeout: '30 minutes',
11-
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
12-
})
13-
14-
const entityIndexActivity = proxyActivities<typeof entityIndexActivities>({
15-
startToCloseTimeout: '10 minutes',
16-
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
1715
})
1816

1917
export async function syncOrganizations(args: ISyncArgs): Promise<void> {
2018
const BATCH_SIZE = args.batchSize ?? 100
2119
const WITH_AGGS = args.withAggs ?? true
2220

21+
console.log('Starting syncOrganizations with args:', { ...args })
22+
2323
if (args.clean) {
24-
await entityIndexActivity.deleteIndexedEntities(IndexedEntityType.ORGANIZATION)
24+
await deleteIndexedEntities(IndexedEntityType.ORGANIZATION, args.segmentIds)
2525
console.log('Deleted indexed entities for organizations!')
2626
}
2727

28-
const organizationIds = await orgSyncActivity.getOrganizationsForSync(BATCH_SIZE)
28+
const organizationIds = await getOrganizationsForSync(BATCH_SIZE, args.segmentIds)
2929

3030
if (organizationIds.length === 0) {
3131
console.log('No more organizations to sync!')
3232
return
3333
}
3434

3535
const batchStartTime = new Date()
36-
const { organizationCount } = await orgSyncActivity.syncOrganizationsBatch(
36+
const { organizationCount } = await syncOrganizationsBatch(
3737
organizationIds,
3838
WITH_AGGS,
3939
args.chunkSize,
@@ -47,13 +47,16 @@ export async function syncOrganizations(args: ISyncArgs): Promise<void> {
4747
)} organizations/second!`,
4848
)
4949

50-
await entityIndexActivity.markEntitiesIndexed(IndexedEntityType.ORGANIZATION, organizationIds)
50+
await markEntitiesIndexed(IndexedEntityType.ORGANIZATION, organizationIds)
5151

5252
if (args.testRun) {
5353
console.log('Test run completed - stopping after first batch!')
5454
return
5555
}
5656

57-
// Continue as new for the next batch
58-
await continueAsNew<typeof syncOrganizations>(args)
57+
// Continue as new for the next batch, but without the clean flag to avoid infinite cleaning
58+
await continueAsNew<typeof syncOrganizations>({
59+
...args,
60+
clean: false,
61+
})
5962
}

services/libs/opensearch/src/repo/indexing.repo.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,41 @@ export class IndexingRepository extends RepositoryBase<IndexingRepository> {
88
super(dbStore, parentLog)
99
}
1010

11-
public async deleteIndexedEntities(type: IndexedEntityType): Promise<void> {
11+
public async deleteIndexedEntities(
12+
type: IndexedEntityType,
13+
segmentIds?: string[],
14+
): Promise<void> {
15+
let segmentCondition = ''
16+
17+
if (segmentIds) {
18+
const materializedView =
19+
type === IndexedEntityType.MEMBER ? 'member_segments_mv' : 'organization_segments_mv'
20+
const entityColumn = type === IndexedEntityType.MEMBER ? '"memberId"' : '"organizationId"'
21+
22+
segmentCondition = `
23+
USING ${materializedView} mv
24+
WHERE mv.${entityColumn} = indexed_entities.entity_id
25+
AND mv."segmentId" IN ($(segmentIds:csv))
26+
`
27+
}
28+
1229
await this.db().none(
1330
`
14-
delete from indexed_entities where type = $(type)
31+
DELETE FROM indexed_entities
32+
${segmentCondition}
33+
AND indexed_entities.type = $(type)
1534
`,
1635
{
1736
type,
37+
segmentIds,
1838
},
1939
)
2040
}
2141

2242
public async markEntitiesIndexed(type: IndexedEntityType, data: string[]): Promise<void> {
2343
if (data.length > 0) {
24-
const values = data.map((d) => `('${type}', '${d}')`)
44+
const uniqueRecords = [...new Set(data)]
45+
const values = uniqueRecords.map((d) => `('${type}', '${d}')`)
2546
const query = `
2647
insert into indexed_entities(type, entity_id)
2748
values ${values.join(',\n')}

services/libs/opensearch/src/repo/member.repo.ts

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,25 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
3232
return results
3333
}
3434

35-
public async getMembersForSync(perPage: number): Promise<string[]> {
36-
const results = await this.db().any(
37-
`
38-
SELECT m.id
39-
FROM members m
40-
WHERE NOT EXISTS (
41-
SELECT 1
42-
FROM indexed_entities ie
43-
WHERE ie.entity_id = m.id
44-
AND ie.type = $(type)
45-
)
46-
ORDER BY m.id
47-
LIMIT ${perPage};
48-
`,
49-
{
50-
type: IndexedEntityType.MEMBER,
51-
},
52-
)
35+
public async getMembersForSync(perPage: number, segmentIds?: string[]): Promise<string[]> {
36+
const segmentCondition = segmentIds
37+
? 'INNER JOIN member_segments_mv ms ON ms."memberId" = m.id AND ms."segmentId" in ($(segmentIds:csv))'
38+
: ''
39+
40+
const query = `
41+
SELECT DISTINCT m.id FROM members m
42+
${segmentCondition}
43+
LEFT JOIN indexed_entities ie ON ie.entity_id = m.id AND ie.type = $(type)
44+
WHERE ie.entity_id IS NULL
45+
ORDER BY m.id LIMIT $(perPage);
46+
`
47+
48+
const results = await this.db().query(query, {
49+
segmentIds,
50+
perPage,
51+
type: IndexedEntityType.MEMBER,
52+
})
53+
5354
return results.map((r) => r.id)
5455
}
5556

services/libs/opensearch/src/repo/organization.repo.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,31 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
2929
public async getOrganizationsForSync(
3030
perPage: number,
3131
previousBatchIds?: string[],
32+
segmentIds?: string[],
3233
): Promise<string[]> {
3334
const notInClause =
3435
previousBatchIds?.length > 0 ? `AND o.id NOT IN ($(previousBatchIds:csv))` : ''
36+
37+
const segmentCondition = segmentIds
38+
? 'INNER JOIN organization_segments_mv osm ON osm."organizationId" = o.id AND osm."segmentId" in ($(segmentIds:csv))'
39+
: ''
40+
3541
const results = await this.db().any(
3642
`
3743
SELECT o.id
3844
FROM organizations o
45+
${segmentCondition}
46+
LEFT JOIN indexed_entities ie ON ie.entity_id = o.id AND ie.type = $(type)
3947
WHERE o."deletedAt" is null
40-
${notInClause}
41-
AND NOT EXISTS (
42-
SELECT 1
43-
FROM indexed_entities ie
44-
WHERE ie.entity_id = o.id
45-
AND ie.type = $(type)
46-
)
48+
AND ie.entity_id IS NULL
49+
${notInClause}
4750
ORDER BY o.id
48-
LIMIT ${perPage}`,
51+
LIMIT $(perPage)`,
4952
{
5053
type: IndexedEntityType.ORGANIZATION,
5154
previousBatchIds,
55+
segmentIds,
56+
perPage,
5257
},
5358
)
5459

0 commit comments

Comments
 (0)