Skip to content

Commit 43231f5

Browse files
authored
use ilp to push activity updates to questdb instead of pg protocol api (#2633)
1 parent 5da42e0 commit 43231f5

File tree

4 files changed

+61
-23
lines changed

4 files changed

+61
-23
lines changed

Diff for: services/apps/data_sink_worker/src/service/activity.service.ts

+56-21
Original file line numberDiff line numberDiff line change
@@ -244,27 +244,62 @@ export default class ActivityService extends LoggerBase {
244244
platform: toUpdate.platform || (original.platform as PlatformType),
245245
})
246246

247-
await updateActivity(this.qdbStore.connection(), id, {
248-
tenantId: tenantId,
249-
segmentId: segmentId,
250-
type: toUpdate.type || original.type,
251-
isContribution: toUpdate.isContribution || original.isContribution,
252-
score: toUpdate.score || original.score,
253-
sourceId: toUpdate.sourceId || original.sourceId,
254-
sourceParentId: toUpdate.sourceParentId || original.sourceParentId,
255-
memberId: toUpdate.memberId || original.memberId,
256-
username: toUpdate.username || original.username,
257-
sentiment: toUpdate.sentiment || original.sentiment,
258-
attributes: toUpdate.attributes || original.attributes,
259-
body: escapeNullByte(toUpdate.body || original.body),
260-
title: escapeNullByte(toUpdate.title || original.title),
261-
channel: toUpdate.channel || original.channel,
262-
url: toUpdate.url || original.url,
263-
organizationId: toUpdate.organizationId || original.organizationId,
264-
platform: toUpdate.platform || (original.platform as PlatformType),
265-
isBotActivity: memberInfo.isBot,
266-
isTeamMemberActivity: memberInfo.isTeamMember,
267-
})
247+
// use insert instead of update to avoid using pg protocol with questdb
248+
try {
249+
await insertActivities([
250+
{
251+
id,
252+
memberId: toUpdate.memberId || original.memberId,
253+
timestamp: original.timestamp,
254+
platform: toUpdate.platform || (original.platform as PlatformType),
255+
type: toUpdate.type || original.type,
256+
isContribution: toUpdate.isContribution || original.isContribution,
257+
score: toUpdate.score || original.score,
258+
sourceId: toUpdate.sourceId || original.sourceId,
259+
sourceParentId: toUpdate.sourceParentId || original.sourceParentId,
260+
tenantId: tenantId,
261+
attributes: toUpdate.attributes || original.attributes,
262+
sentiment: toUpdate.sentiment || original.sentiment,
263+
body: escapeNullByte(toUpdate.body || original.body),
264+
title: escapeNullByte(toUpdate.title || original.title),
265+
channel: toUpdate.channel || original.channel,
266+
url: toUpdate.url || original.url,
267+
username: toUpdate.username || original.username,
268+
objectMemberId: activity.objectMemberId,
269+
objectMemberUsername: activity.objectMemberUsername,
270+
segmentId: segmentId,
271+
organizationId: toUpdate.organizationId || original.organizationId,
272+
isBotActivity: memberInfo.isBot,
273+
isTeamMemberActivity: memberInfo.isTeamMember,
274+
importHash: original.importHash,
275+
},
276+
])
277+
} catch (error) {
278+
this.log.error('Error updating (by inserting) activity in QuestDB:', error)
279+
throw error
280+
}
281+
282+
// await updateActivity(this.qdbStore.connection(), id, {
283+
// tenantId: tenantId,
284+
// segmentId: segmentId,
285+
// type: toUpdate.type || original.type,
286+
// isContribution: toUpdate.isContribution || original.isContribution,
287+
// score: toUpdate.score || original.score,
288+
// sourceId: toUpdate.sourceId || original.sourceId,
289+
// sourceParentId: toUpdate.sourceParentId || original.sourceParentId,
290+
// memberId: toUpdate.memberId || original.memberId,
291+
// username: toUpdate.username || original.username,
292+
// sentiment: toUpdate.sentiment || original.sentiment,
293+
// attributes: toUpdate.attributes || original.attributes,
294+
// body: escapeNullByte(toUpdate.body || original.body),
295+
// title: escapeNullByte(toUpdate.title || original.title),
296+
// channel: toUpdate.channel || original.channel,
297+
// url: toUpdate.url || original.url,
298+
// organizationId: toUpdate.organizationId || original.organizationId,
299+
// platform: toUpdate.platform || (original.platform as PlatformType),
300+
// isBotActivity: memberInfo.isBot,
301+
// isTeamMemberActivity: memberInfo.isTeamMember,
302+
// })
268303

269304
return true
270305
} else {

Diff for: services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface IDbActivity {
1717
objectMemberId?: string
1818
objectMemberUsername?: string
1919
attributes: Record<string, unknown>
20+
importHash?: string
2021
body?: string
2122
title?: string
2223
channel?: string

Diff for: services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
3535
"objectMemberId",
3636
"objectMemberUsername",
3737
attributes,
38+
"importHash",
3839
body,
3940
title,
4041
channel,
@@ -92,6 +93,7 @@ export default class ActivityRepository extends RepositoryBase<ActivityRepositor
9293
"objectMemberId",
9394
"objectMemberUsername",
9495
attributes,
96+
"importHash",
9597
body,
9698
title,
9799
channel,

Diff for: services/libs/database/src/connection.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { getServiceChildLogger } from '@crowd/logging'
22
import pgPromise from 'pg-promise'
33
import { DbConnection, DbInstance, IDatabaseConfig } from './types'
4-
import { IS_CLOUD_ENV } from '@crowd/common'
4+
import { IS_CLOUD_ENV, IS_DEV_ENV } from '@crowd/common'
55

66
const log = getServiceChildLogger('database.connection')
77

@@ -81,7 +81,7 @@ export const getDbConnection = async (
8181
rejectUnauthorized: false,
8282
}
8383
: false,
84-
max: maxPoolSize || 20,
84+
max: maxPoolSize || (IS_DEV_ENV ? 5 : 20),
8585
idleTimeoutMillis: idleTimeoutMillis !== undefined ? idleTimeoutMillis : 10000,
8686
// query_timeout: 30000,
8787
application_name: process.env.SERVICE || 'unknown-app',

0 commit comments

Comments
 (0)