Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support segmentId in sync workflows #2972

Merged
merged 17 commits into from
Apr 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ import { IndexingRepository } from '@crowd/opensearch/src/repo/indexing.repo'

import { svc } from '../../main'

export async function deleteIndexedEntities(entityType: IndexedEntityType): Promise<void> {
export async function deleteIndexedEntities(
entityType: IndexedEntityType,
segmentIds?: string[],
): Promise<void> {
try {
const indexingRepo = new IndexingRepository(svc.postgres.writer, svc.log)
await indexingRepo.deleteIndexedEntities(entityType)
await indexingRepo.deleteIndexedEntities(entityType, segmentIds)
} catch (error) {
svc.log.error(error, 'Error deleting indexed entities')
throw error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import { MemberRepository } from '@crowd/opensearch/src/repo/member.repo'

import { svc } from '../../main'

export async function getMembersForSync(batchSize: number): Promise<string[]> {
export async function getMembersForSync(
batchSize: number,
segmentIds?: string[],
): Promise<string[]> {
try {
const memberRepo = new MemberRepository(svc.redis, svc.postgres.reader, svc.log)
return memberRepo.getMembersForSync(batchSize)
return memberRepo.getMembersForSync(batchSize, segmentIds)
} catch (error) {
svc.log.error(error, 'Error getting members for sync')
throw error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import { OrganizationRepository } from '@crowd/opensearch/src/repo/organization.

import { svc } from '../../main'

export async function getOrganizationsForSync(batchSize: number): Promise<string[]> {
export async function getOrganizationsForSync(
batchSize: number,
segmentIds?: string[],
): Promise<string[]> {
try {
const organizationRepo = new OrganizationRepository(svc.postgres.reader, svc.log)
return organizationRepo.getOrganizationsForSync(batchSize)
return organizationRepo.getOrganizationsForSync(batchSize, null, segmentIds)
} catch (error) {
svc.log.error(error, 'Error getting organizations for sync')
throw error
Expand Down
1 change: 1 addition & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export interface IScriptBatchTestArgs {
}

export interface ISyncArgs extends IScriptBatchTestArgs {
segmentIds?: string[]
chunkSize?: number
clean?: boolean
withAggs?: boolean
Expand Down
37 changes: 16 additions & 21 deletions services/apps/script_executor_worker/src/workflows/sync/members.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,51 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'

import * as entityIndexActivities from '../../activities/sync/entity-index'
import * as memberSyncActivities from '../../activities/sync/member'
import * as activities from '../../activities'
import { ISyncArgs } from '../../types'

const memberSyncActivity = proxyActivities<typeof memberSyncActivities>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

const entityIndexActivity = proxyActivities<typeof entityIndexActivities>({
startToCloseTimeout: '10 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})
const { deleteIndexedEntities, markEntitiesIndexed, getMembersForSync, syncMembersBatch } =
proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
})

export async function syncMembers(args: ISyncArgs): Promise<void> {
const BATCH_SIZE = args.batchSize ?? 100
const WITH_AGGS = args.withAggs ?? true

console.log('Starting syncMembers with args:', { ...args })

if (args.clean) {
await entityIndexActivity.deleteIndexedEntities(IndexedEntityType.MEMBER)
await deleteIndexedEntities(IndexedEntityType.MEMBER, args.segmentIds)
console.log('Deleted indexed entities for members!')
}

const memberIds = await memberSyncActivity.getMembersForSync(BATCH_SIZE)
const memberIds = await getMembersForSync(BATCH_SIZE, args.segmentIds)

if (memberIds.length === 0) {
console.log('No more members to sync!')
return
}

const batchStartTime = new Date()
const { memberCount } = await memberSyncActivity.syncMembersBatch(
memberIds,
WITH_AGGS,
args.chunkSize,
)
const { memberCount } = await syncMembersBatch(memberIds, WITH_AGGS, args.chunkSize)

const diffInSeconds = (new Date().getTime() - batchStartTime.getTime()) / 1000

console.log(
`Synced ${memberCount} members! Speed: ${Math.round(memberCount / diffInSeconds)} members/second!`,
)

await entityIndexActivity.markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds)
await markEntitiesIndexed(IndexedEntityType.MEMBER, memberIds)

if (args.testRun) {
console.log('Test run completed - stopping after first batch!')
return
}

// Continue as new for the next batch
await continueAsNew<typeof syncMembers>(args)
// Continue as new for the next batch, but without the clean flag to avoid infinite cleaning
await continueAsNew<typeof syncMembers>({
...args,
clean: false,
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,38 @@ import { continueAsNew, proxyActivities } from '@temporalio/workflow'

import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data'

import * as entityIndexActivities from '../../activities/sync/entity-index'
import * as orgSyncActivities from '../../activities/sync/organization'
import * as activities from '../../activities'
import { ISyncArgs } from '../../types'

const orgSyncActivity = proxyActivities<typeof orgSyncActivities>({
const {
deleteIndexedEntities,
markEntitiesIndexed,
getOrganizationsForSync,
syncOrganizationsBatch,
} = proxyActivities<typeof activities>({
startToCloseTimeout: '30 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

const entityIndexActivity = proxyActivities<typeof entityIndexActivities>({
startToCloseTimeout: '10 minutes',
retry: { maximumAttempts: 3, backoffCoefficient: 3 },
})

export async function syncOrganizations(args: ISyncArgs): Promise<void> {
const BATCH_SIZE = args.batchSize ?? 100
const WITH_AGGS = args.withAggs ?? true

console.log('Starting syncOrganizations with args:', { ...args })

if (args.clean) {
await entityIndexActivity.deleteIndexedEntities(IndexedEntityType.ORGANIZATION)
await deleteIndexedEntities(IndexedEntityType.ORGANIZATION, args.segmentIds)
console.log('Deleted indexed entities for organizations!')
}

const organizationIds = await orgSyncActivity.getOrganizationsForSync(BATCH_SIZE)
const organizationIds = await getOrganizationsForSync(BATCH_SIZE, args.segmentIds)

if (organizationIds.length === 0) {
console.log('No more organizations to sync!')
return
}

const batchStartTime = new Date()
const { organizationCount } = await orgSyncActivity.syncOrganizationsBatch(
const { organizationCount } = await syncOrganizationsBatch(
organizationIds,
WITH_AGGS,
args.chunkSize,
Expand All @@ -47,13 +47,16 @@ export async function syncOrganizations(args: ISyncArgs): Promise<void> {
)} organizations/second!`,
)

await entityIndexActivity.markEntitiesIndexed(IndexedEntityType.ORGANIZATION, organizationIds)
await markEntitiesIndexed(IndexedEntityType.ORGANIZATION, organizationIds)

if (args.testRun) {
console.log('Test run completed - stopping after first batch!')
return
}

// Continue as new for the next batch
await continueAsNew<typeof syncOrganizations>(args)
// Continue as new for the next batch, but without the clean flag to avoid infinite cleaning
await continueAsNew<typeof syncOrganizations>({
...args,
clean: false,
})
}
27 changes: 24 additions & 3 deletions services/libs/opensearch/src/repo/indexing.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,41 @@ export class IndexingRepository extends RepositoryBase<IndexingRepository> {
super(dbStore, parentLog)
}

public async deleteIndexedEntities(type: IndexedEntityType): Promise<void> {
public async deleteIndexedEntities(
type: IndexedEntityType,
segmentIds?: string[],
): Promise<void> {
let segmentCondition = ''

if (segmentIds) {
const materializedView =
type === IndexedEntityType.MEMBER ? 'member_segments_mv' : 'organization_segments_mv'
const entityColumn = type === IndexedEntityType.MEMBER ? '"memberId"' : '"organizationId"'

segmentCondition = `
USING ${materializedView} mv
WHERE mv.${entityColumn} = indexed_entities.entity_id
AND mv."segmentId" IN ($(segmentIds:csv))
`
}

await this.db().none(
`
delete from indexed_entities where type = $(type)
DELETE FROM indexed_entities
${segmentCondition}
AND indexed_entities.type = $(type)
`,
{
type,
segmentIds,
},
)
}

public async markEntitiesIndexed(type: IndexedEntityType, data: string[]): Promise<void> {
if (data.length > 0) {
const values = data.map((d) => `('${type}', '${d}')`)
const uniqueRecords = [...new Set(data)]
const values = uniqueRecords.map((d) => `('${type}', '${d}')`)
const query = `
insert into indexed_entities(type, entity_id)
values ${values.join(',\n')}
Expand Down
37 changes: 19 additions & 18 deletions services/libs/opensearch/src/repo/member.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ export class MemberRepository extends RepositoryBase<MemberRepository> {
return results
}

public async getMembersForSync(perPage: number): Promise<string[]> {
const results = await this.db().any(
`
SELECT m.id
FROM members m
WHERE NOT EXISTS (
SELECT 1
FROM indexed_entities ie
WHERE ie.entity_id = m.id
AND ie.type = $(type)
)
ORDER BY m.id
LIMIT ${perPage};
`,
{
type: IndexedEntityType.MEMBER,
},
)
public async getMembersForSync(perPage: number, segmentIds?: string[]): Promise<string[]> {
const segmentCondition = segmentIds
? 'INNER JOIN member_segments_mv ms ON ms."memberId" = m.id AND ms."segmentId" in ($(segmentIds:csv))'
: ''

const query = `
SELECT DISTINCT m.id FROM members m
${segmentCondition}
LEFT JOIN indexed_entities ie ON ie.entity_id = m.id AND ie.type = $(type)
WHERE ie.entity_id IS NULL
ORDER BY m.id LIMIT $(perPage);
`

const results = await this.db().query(query, {
segmentIds,
perPage,
type: IndexedEntityType.MEMBER,
})

return results.map((r) => r.id)
}

Expand Down
21 changes: 13 additions & 8 deletions services/libs/opensearch/src/repo/organization.repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,31 @@ export class OrganizationRepository extends RepositoryBase<OrganizationRepositor
public async getOrganizationsForSync(
perPage: number,
previousBatchIds?: string[],
segmentIds?: string[],
): Promise<string[]> {
const notInClause =
previousBatchIds?.length > 0 ? `AND o.id NOT IN ($(previousBatchIds:csv))` : ''

const segmentCondition = segmentIds
? 'INNER JOIN organization_segments_mv osm ON osm."organizationId" = o.id AND osm."segmentId" in ($(segmentIds:csv))'
: ''

const results = await this.db().any(
`
SELECT o.id
FROM organizations o
${segmentCondition}
LEFT JOIN indexed_entities ie ON ie.entity_id = o.id AND ie.type = $(type)
WHERE o."deletedAt" is null
${notInClause}
AND NOT EXISTS (
SELECT 1
FROM indexed_entities ie
WHERE ie.entity_id = o.id
AND ie.type = $(type)
)
AND ie.entity_id IS NULL
${notInClause}
ORDER BY o.id
LIMIT ${perPage}`,
LIMIT $(perPage)`,
{
type: IndexedEntityType.ORGANIZATION,
previousBatchIds,
segmentIds,
perPage,
},
)

Expand Down
Loading