Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7093b09
chore: attach debugger logs to understand deeper
skwowet May 26, 2025
b5352e6
chore: add todo
skwowet May 27, 2025
d5147d4
chore: move utility func to different file
skwowet May 28, 2025
2a7c373
Merge branch 'main' into improve/CM-2310
skwowet Jun 2, 2025
e6598b6
refactor: affiliations [wip]
skwowet Jun 3, 2025
dfbdb44
refactor: clean up logic
skwowet Jun 4, 2025
c10c621
refactor: updateActivities usage in merge and affiliations
skwowet Jun 10, 2025
34ab932
Merge branch 'main' into improve/CM-2310
skwowet Jun 10, 2025
2093699
chore: revert debugger changes and stuff
skwowet Jun 10, 2025
bfd5be2
chore: use existing methods instead of creating new ones
skwowet Jun 23, 2025
a4d547d
chore: revert
skwowet Jun 23, 2025
adb8beb
Merge branch 'main' into improve/CM-2310
skwowet Jun 23, 2025
3fdcabb
Merge branch 'main' into improve/CM-2310
skwowet Jul 2, 2025
c89ca66
Merge branch 'improve/CM-2310' of github.com:CrowdDotDev/crowd.dev in…
skwowet Jul 2, 2025
b17678c
refactor(data-sink-worker): switch memberId lookup to activityRelations
skwowet Jul 3, 2025
717357e
Merge branch 'main' into improve/CM-2310
skwowet Jul 3, 2025
3f108cc
refactor(part-1): read from activityRelations instead of activities (…
skwowet Jul 3, 2025
52d3370
refactor(part-2): read from activityRelations instead of activities (…
skwowet Jul 4, 2025
c4f6ad4
chore: add index for affiliations queries
skwowet Jul 10, 2025
98cab11
refactor(data-sink): simplify activity ingestion and deduplication
skwowet Jul 10, 2025
a3269b8
chore: improve code comment
skwowet Jul 11, 2025
399a308
Merge branch 'main' into improve/CM-2310
skwowet Jul 16, 2025
1127172
fix: finishMemberUnmerging
skwowet Jul 16, 2025
09220f3
chore: rm LOG_LEVEL=trace
skwowet Jul 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions backend/src/bin/jobs/syncActivities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { IDbActivityCreateData } from '@crowd/data-access-layer/src/old/apps/dat
import ActivityRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo'
import { QueryExecutor, formatQuery, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { Logger, logExecutionTimeV2, timer } from '@crowd/logging'
import { getClientSQL } from '@crowd/questdb'
import { PlatformType } from '@crowd/types'

import { DB_CONFIG } from '@/conf'
Expand All @@ -16,16 +15,18 @@ import { retryBackoff } from '../../utils/backoff'

async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
if (!maxUpdatedAt) {
const result = await pgQx.selectOne('SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM activities')
const result = await pgQx.selectOne(
'SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM "activityRelations"',
)
return result?.maxUpdatedAt
}

return maxUpdatedAt
}

async function getTotalActivities(qdbQx: QueryExecutor, whereClause: string): Promise<number> {
const { totalActivities } = await qdbQx.selectOne(
`SELECT COUNT(1) AS "totalActivities" FROM activities WHERE ${whereClause}`,
async function getTotalActivities(pgQx: QueryExecutor, whereClause: string): Promise<number> {
const { totalActivities } = await pgQx.selectOne(
`SELECT COUNT(1) AS "totalActivities" FROM "activityRelations" WHERE ${whereClause}`,
)
return totalActivities
}
Expand Down Expand Up @@ -78,7 +79,6 @@ async function syncActivitiesBatch({
export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
logger.info(`Syncing activities from ${maxUpdatedAt}`)

const qdb = await getClientSQL()
const db = await getDbConnection({
host: DB_CONFIG.writeHost,
port: DB_CONFIG.port,
Expand All @@ -88,7 +88,6 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
})

const pgQx = pgpQx(db)
const qdbQx = pgpQx(qdb)
const activityRepo = new ActivityRepository(new DbStore(logger, db, undefined, true), logger)

let updatedAt = await logExecutionTimeV2(
Expand All @@ -100,7 +99,7 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
const whereClause = createWhereClause(updatedAt)

const totalActivities = await logExecutionTimeV2(
() => getTotalActivities(qdbQx, whereClause),
() => getTotalActivities(pgQx, whereClause),
logger,
'get total activities',
)
Expand All @@ -114,10 +113,10 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
// eslint-disable-next-line @typescript-eslint/no-loop-func
() =>
retryBackoff(() =>
qdbQx.select(
pgQx.select(
`
SELECT *
FROM activities
FROM "activityRelations"
WHERE "updatedAt" > $(updatedAt)
ORDER BY "updatedAt"
LIMIT 1000;
Expand All @@ -126,7 +125,7 @@ export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
),
),
logger,
`getting activities with updatedAt > ${updatedAt}`,
`getting activityRelations with updatedAt > ${updatedAt}`,
)

if (result.length === 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS "ix_activityRelations_memberId_segmentId_timestamp"
ON "activityRelations" ("memberId", "segmentId", "timestamp");
35 changes: 24 additions & 11 deletions backend/src/database/repositories/memberRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import {
groupBy,
} from '@crowd/common'
import {
countMembersWithActivities,
getActiveMembers,
getLastActivitiesForMembers,
queryActivityRelations,
setMemberDataToActivities,
} from '@crowd/data-access-layer'
import { findManyLfxMemberships } from '@crowd/data-access-layer/src/lfx_memberships'
Expand Down Expand Up @@ -1437,7 +1437,9 @@ class MemberRepository {
segments = [originalSegment]
}

const activeMemberResults = await getActiveMembers(options.qdb, {
const qx = SequelizeRepository.getQueryExecutor(options)

const activeMemberResults = await getActiveMembers(qx, {
timestampFrom: new Date(Date.parse(filter.activityTimestampFrom)).toISOString(),
timestampTo: new Date(Date.parse(filter.activityTimestampTo)).toISOString(),
isContribution: filter.activityIsContribution === true ? true : undefined,
Expand Down Expand Up @@ -1565,13 +1567,21 @@ class MemberRepository {
}

static async countMembersPerSegment(options: IRepositoryOptions, segmentIds: string[]) {
const countResults = await countMembersWithActivities(options.qdb, {
segmentIds,
const qx = SequelizeRepository.getQueryExecutor(options)
const result = await queryActivityRelations(qx, {
filter: {
and: [
{
segmentId: {
in: segmentIds,
},
},
],
},
countOnly: true,
})
return countResults.reduce((acc, curr: any) => {
acc[curr.segmentId] = parseInt(curr.totalCount, 10)
return acc
}, {})

return result.count
}

static async countMembers(options: IRepositoryOptions, segmentIds: string[]) {
Expand Down Expand Up @@ -1710,9 +1720,10 @@ class MemberRepository {
row.activityCount = parseInt(row.activityCount, 10)
}

const qx = SequelizeRepository.getQueryExecutor(options)

const memberIds = translatedRows.map((r) => r.id)
if (memberIds.length > 0) {
const qx = SequelizeRepository.getQueryExecutor(options)
const organizationIds = uniq(
translatedRows.reduce((acc, r) => {
acc.push(...r.organizations.map((o) => o.id))
Expand All @@ -1729,7 +1740,7 @@ class MemberRepository {
}
}

const lastActivities = await getLastActivitiesForMembers(options.qdb, memberIds, segments)
const lastActivities = await getLastActivitiesForMembers(qx, options.qdb, memberIds, segments)

for (const row of translatedRows) {
const r = row as any
Expand Down Expand Up @@ -2114,7 +2125,9 @@ class MemberRepository {
})

if (memberIds.length > 0) {
const lastActivities = await getLastActivitiesForMembers(options.qdb, memberIds, [segmentId])
const lastActivities = await getLastActivitiesForMembers(qx, options.qdb, memberIds, [
segmentId,
])

rows.forEach((r) => {
r.lastActivity = lastActivities.find((a) => a.memberId === r.id)
Expand Down
32 changes: 20 additions & 12 deletions backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import {
} from '@crowd/audit-logs'
import { Error400, Error404, Error409, PageData, RawQueryParser } from '@crowd/common'
import {
countMembersWithActivities,
getActiveOrganizations,
queryActivities,
queryActivityRelations,
} from '@crowd/data-access-layer'
import { findManyLfxMemberships } from '@crowd/data-access-layer/src/lfx_memberships'
import {
Expand Down Expand Up @@ -1404,7 +1404,9 @@ class OrganizationRepository {
segments = [originalSegment]
}

const activeOrgsResults = await getActiveOrganizations(options.qdb, {
const qx = SequelizeRepository.getQueryExecutor(options)

const activeOrgsResults = await getActiveOrganizations(qx, {
timestampFrom: new Date(Date.parse(filter.activityTimestampFrom)),
timestampTo: new Date(Date.parse(filter.activityTimestampTo)),
platforms: filter.platforms ? filter.platforms : undefined,
Expand Down Expand Up @@ -1491,7 +1493,6 @@ class OrganizationRepository {
options,
)

const qx = SequelizeRepository.getQueryExecutor(options)
const lfxMemberships = await findManyLfxMemberships(qx, {
organizationIds,
})
Expand Down Expand Up @@ -1876,17 +1877,24 @@ class OrganizationRepository {
platform: string,
options: IRepositoryOptions,
): Promise<number> {
const rows = await countMembersWithActivities(options.qdb, {
organizationId,
platform,
})

let count = 0
rows.forEach((row) => {
count += Number(row.count)
const qx = SequelizeRepository.getQueryExecutor(options)
const rows = await queryActivityRelations(qx, {
filter: {
and: [
{
organizationId: {
eq: organizationId,
},
platform: {
eq: platform,
},
},
],
},
countOnly: true,
})

return count
return rows.count
}

static async removeIdentitiesFromOrganization(
Expand Down
34 changes: 0 additions & 34 deletions backend/src/services/memberService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
} from '@crowd/data-access-layer/src/members'
import { findMergeAction } from '@crowd/data-access-layer/src/mergeActions/repo'
import { QueryExecutor, optionsQx } from '@crowd/data-access-layer/src/queryExecutor'
// import { getActivityCountOfMemberIdentities } from '@crowd/data-access-layer'
import { fetchManySegments } from '@crowd/data-access-layer/src/segments'
import { LoggerBase } from '@crowd/logging'
import {
Expand Down Expand Up @@ -1045,20 +1044,6 @@ export default class MemberService extends LoggerBase {
)
member.memberOrganizations = unmergedRoles as IMemberRoleWithOrganization[]

const secondaryActivityCount = 0
const primaryActivityCount = 0
// activity count
// const secondaryActivityCount = await getActivityCountOfMemberIdentities(
// this.options.qdb,
// member.id,
// secondaryBackup.identities,
// )
// const primaryActivityCount = await getActivityCountOfMemberIdentities(
// this.options.qdb,
// member.id,
// member.identities,
// )

return {
primary: {
...lodash.pick(member, MemberService.MEMBER_MERGE_FIELDS),
Expand All @@ -1068,15 +1053,13 @@ export default class MemberService extends LoggerBase {
member.memberOrganizations,
),
username: MemberRepository.getUsernameFromIdentities(member.identities),
activityCount: primaryActivityCount,
numberOfOpenSourceContributions: member.contributions?.length || 0,
},
secondary: {
...secondaryBackup,
organizations: OrganizationRepository.calculateRenderFriendlyOrganizations(
secondaryBackup.memberOrganizations,
),
activityCount: secondaryActivityCount,
numberOfOpenSourceContributions: secondaryBackup.contributions?.length || 0,
},
}
Expand Down Expand Up @@ -1108,21 +1091,6 @@ export default class MemberService extends LoggerBase {
throw new Error400(this.options.language, 'merge.errors.noIdentities')
}

const secondaryActivityCount = 0
const primaryActivityCount = 0

// const secondaryActivityCount = await getActivityCountOfMemberIdentities(
// this.options.qdb,
// member.id,
// secondaryIdentities,
// )
//
// const primaryActivityCount = await getActivityCountOfMemberIdentities(
// this.options.qdb,
// member.id,
// primaryIdentities,
// )

const primaryMemberRoles = await MemberOrganizationRepository.findMemberRoles(
member.id,
this.options,
Expand All @@ -1136,7 +1104,6 @@ export default class MemberService extends LoggerBase {
organizations:
OrganizationRepository.calculateRenderFriendlyOrganizations(primaryMemberRoles),
username: MemberRepository.getUsernameFromIdentities(primaryIdentities),
activityCount: primaryActivityCount,
numberOfOpenSourceContributions: member.contributions?.length || 0,
},
secondary: {
Expand All @@ -1155,7 +1122,6 @@ export default class MemberService extends LoggerBase {
contributions: [],
manuallyCreated: true,
manuallyChangedFields: [],
activityCount: secondaryActivityCount,
numberOfOpenSourceContributions: 0,
},
}
Expand Down
16 changes: 16 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
activitiesTimeseries,
getTimeseriesOfActiveMembers,
getTimeseriesOfNewMembers,
queryActivities,
queryActivityRelations,
} from '@crowd/data-access-layer'
import { DbStore } from '@crowd/data-access-layer/src/database'
import ActivityRepository from '@crowd/data-access-layer/src/old/apps/cache_worker/activity.repo'
Expand All @@ -15,7 +15,7 @@ import {
getTimeseriesOfActiveOrganizations,
getTimeseriesOfNewOrganizations,
} from '@crowd/data-access-layer/src/organizations'
import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor'
import { dbStoreQx, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { RedisCache } from '@crowd/redis'
import {
DashboardTimeframe,
Expand Down Expand Up @@ -109,7 +109,8 @@ export async function getActivitiesNumber(params: IQueryTimeseriesParams): Promi
})
}

const res = await queryActivities(svc.questdbSQL, {
const qx = pgpQx(svc.postgres.reader.connection())
const res = await queryActivityRelations(qx, {
segmentIds: params.segmentIds,
countOnly: true,
filter: {
Expand Down
Loading
Loading