Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 4fdd926

Browse files
committed
feat: add transaction for all models
1 parent 8b14483 commit 4fdd926

File tree

19 files changed

+1040
-200
lines changed

19 files changed

+1040
-200
lines changed

Diff for: config/default.js

+10
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ module.exports = {
3131

3232
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
3333

34+
TOPCODER_GROUP_API: process.env.TOPCODER_GROUP_API || 'https://api.topcoder-dev.com/v5/groups',
35+
3436
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
3537
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',
3638

@@ -63,6 +65,14 @@ module.exports = {
6365
password: process.env.ELASTICCLOUD_PASSWORD
6466
},
6567

68+
USER_ACHIEVEMENT_PROPERTY_NAME: process.env.USER_ACHIEVEMENT_PROPERTY_NAME || 'achievements',
69+
USER_EXTERNALPROFILE_PROPERTY_NAME: process.env.USER_EXTERNALPROFILE_PROPERTY_NAME || 'externalProfiles',
70+
USER_ATTRIBUTE_PROPERTY_NAME: process.env.USER_ATTRIBUTE_PROPERTY_NAME || 'attributes',
71+
USER_ROLE_PROPERTY_NAME: process.env.USER_ROLE_PROPERTY_NAME || 'roles',
72+
USER_SKILL_PROPERTY_NAME: process.env.USER_SKILL_PROPERTY_NAME || 'skills',
73+
USER_GROUP_PROPERTY_NAME: process.env.USER_GROUP_PROPERTY_NAME || 'groups',
74+
75+
ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME: process.env.ORGANIZATION_SKILLPROVIDER_PROPERTY_NAME || 'skillProviders',
6676
// es mapping: _index, _type, _id
6777
DOCUMENTS: {
6878
achievementprovider: {

Diff for: src/common/es-helper.js

+255-17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ const helper = require('../common/helper')
66
const appConst = require('../consts')
77
const esClient = require('./es-client').getESClient()
88

9+
const {
10+
TopResources,
11+
UserResources,
12+
OrganizationResources
13+
} = appConst
14+
915
const DOCUMENTS = config.ES.DOCUMENTS
1016
const RESOURCES = Object.keys(DOCUMENTS)
1117

@@ -283,21 +289,213 @@ function escapeRegex (str) {
283289
/* eslint-enable no-useless-escape */
284290
}
285291

292+
/**
293+
* Function to get user from es
294+
* @param {String} userId
295+
* @returns {Object} user
296+
*/
297+
async function getUser (userId) {
298+
const { body: user } = await esClient.get({ index: TopResources.user.index, type: TopResources.user.type, id: userId })
299+
return { seqNo: user._seq_no, primaryTerm: user._primary_term, user: user._source }
300+
}
301+
302+
/**
303+
* Function to update es user
304+
* @param {String} userId
305+
* @param {Number} seqNo
306+
* @param {Number} primaryTerm
307+
* @param {Object} body
308+
*/
309+
async function updateUser (userId, body, seqNo, primaryTerm) {
310+
try {
311+
await esClient.index({
312+
index: TopResources.user.index,
313+
type: TopResources.user.type,
314+
id: userId,
315+
body,
316+
if_seq_no: seqNo,
317+
if_primary_term: primaryTerm,
318+
pipeline: TopResources.user.ingest.pipeline.id,
319+
refresh: 'wait_for'
320+
})
321+
logger.debug('Update user completed')
322+
} catch (err) {
323+
if (err && err.meta && err.meta.body && err.meta.body.error) {
324+
logger.debug(JSON.stringify(err.meta.body.error, null, 4))
325+
}
326+
logger.debug(JSON.stringify(err))
327+
throw err
328+
}
329+
}
330+
331+
/**
332+
* Function to get org from es
333+
* @param {String} organizationId
334+
* @returns {Object} organization
335+
*/
336+
async function getOrg (organizationId) {
337+
const { body: org } = await esClient.get({ index: TopResources.organization.index, type: TopResources.organization.type, id: organizationId })
338+
return { seqNo: org._seq_no, primaryTerm: org._primary_term, org: org._source }
339+
}
340+
341+
/**
342+
* Function to update es organization
343+
* @param {String} organizationId
344+
* @param {Number} seqNo
345+
* @param {Number} primaryTerm
346+
* @param {Object} body
347+
*/
348+
async function updateOrg (organizationId, body, seqNo, primaryTerm) {
349+
await esClient.index({
350+
index: TopResources.organization.index,
351+
type: TopResources.organization.type,
352+
id: organizationId,
353+
body,
354+
if_seq_no: seqNo,
355+
if_primary_term: primaryTerm,
356+
refresh: 'wait_for'
357+
})
358+
await esClient.enrich.executePolicy({ name: TopResources.organization.enrich.policyName })
359+
}
360+
286361
/**
287362
* Process create entity
288363
* @param {String} resource resource name
289364
* @param {Object} entity entity object
290365
*/
291366
async function processCreate (resource, entity) {
292-
helper.validProperties(entity, ['id'])
293-
await esClient.index({
294-
index: DOCUMENTS[resource].index,
295-
type: DOCUMENTS[resource].type,
296-
id: entity.id,
297-
body: entity,
298-
refresh: 'true'
299-
})
300-
logger.info(`Insert in Elasticsearch resource ${resource} entity, , ${JSON.stringify(entity, null, 2)}`)
367+
if (_.includes(_.keys(TopResources), resource)) {
368+
// process the top resources such as user, skill...
369+
helper.validProperties(entity, ['id'])
370+
await esClient.index({
371+
index: TopResources[resource].index,
372+
type: TopResources[resource].type,
373+
id: entity.id,
374+
body: _.omit(entity, ['resource', 'originalTopic']),
375+
pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
376+
refresh: 'true'
377+
})
378+
if (TopResources[resource].enrich) {
379+
await esClient.enrich.executePolicy({
380+
name: TopResources[resource].enrich.policyName
381+
})
382+
}
383+
} else if (_.includes(_.keys(UserResources), resource)) {
384+
// process user resources such as userSkill, userAttribute...
385+
const userResource = UserResources[resource]
386+
userResource.validate(entity)
387+
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
388+
const relateId = entity[userResource.relateKey]
389+
if (!user[userResource.propertyName]) {
390+
user[userResource.propertyName] = []
391+
}
392+
393+
// import groups for a new user
394+
if (resource === 'externalprofile' && entity.externalId) {
395+
const userGroups = await helper.getUserGroup(entity.externalId)
396+
user[config.get('ES.USER_GROUP_PROPERTY_NAME')] = _.unionBy(user[config.get('ES.USER_GROUP_PROPERTY_NAME')], userGroups, 'id')
397+
}
398+
399+
// check the resource does not exist
400+
if (_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
401+
logger.error(`Can't create existed ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId}`)
402+
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
403+
} else {
404+
user[userResource.propertyName].push(entity)
405+
await updateUser(entity.userId, user, seqNo, primaryTerm)
406+
}
407+
} else if (_.includes(_.keys(OrganizationResources), resource)) {
408+
// process org resources such as org skill provider
409+
const orgResources = OrganizationResources[resource]
410+
orgResources.validate(entity)
411+
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
412+
const relateId = entity[orgResources.relateKey]
413+
if (!org[orgResources.propertyName]) {
414+
org[orgResources.propertyName] = []
415+
}
416+
417+
// check the resource does not exist
418+
if (_.some(org[orgResources.propertyName], [orgResources.relateKey, relateId])) {
419+
logger.error(`Can't create existing ${resource} with the ${orgResources.relateKey}: ${relateId}, organizationId: ${entity.organizationId}`)
420+
throw helper.getErrorWithStatus('[version_conflict_engine_exception]', 409)
421+
} else {
422+
org[orgResources.propertyName].push(entity)
423+
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
424+
}
425+
} else {
426+
logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
427+
}
428+
}
429+
430+
/**
431+
* Process update entity
432+
* @param {String} resource resource name
433+
* @param {Object} entity entity object
434+
*/
435+
async function processUpdate (resource, entity) {
436+
if (_.includes(_.keys(TopResources), resource)) {
437+
logger.info(`Processing top level resource: ${resource}`)
438+
// process the top resources such as user, skill...
439+
helper.validProperties(entity, ['id'])
440+
const { index, type } = TopResources[resource]
441+
const id = entity.id
442+
const { body: source } = await esClient.get({ index, type, id })
443+
await esClient.index({
444+
index,
445+
type,
446+
id,
447+
body: _.assign(source._source, _.omit(entity, ['resource', 'originalTopic'])),
448+
pipeline: TopResources[resource].ingest ? TopResources[resource].ingest.pipeline.id : undefined,
449+
if_seq_no: source._seq_no,
450+
if_primary_term: source._primary_term,
451+
refresh: 'true'
452+
})
453+
if (TopResources[resource].enrich) {
454+
await esClient.enrich.executePolicy({
455+
name: TopResources[resource].enrich.policyName
456+
})
457+
}
458+
} else if (_.includes(_.keys(UserResources), resource)) {
459+
// process user resources such as userSkill, userAttribute...
460+
const userResource = UserResources[resource]
461+
const relateId = entity[userResource.relateKey]
462+
logger.info(`Processing user level resource: ${resource}:${relateId}`)
463+
userResource.validate(entity)
464+
logger.info(`Resource validated for ${relateId}`)
465+
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
466+
logger.info(`User fetched ${user.id} and ${relateId}`)
467+
468+
// check the resource exist
469+
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
470+
logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
471+
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
472+
} else {
473+
const updateIndex = _.findIndex(user[userResource.propertyName], [userResource.relateKey, relateId])
474+
user[userResource.propertyName].splice(updateIndex, 1, entity)
475+
logger.info(`Updating ${user.id} and ${relateId}`)
476+
await updateUser(entity.userId, user, seqNo, primaryTerm)
477+
logger.info(`Updated ${user.id} and ${relateId}`)
478+
}
479+
} else if (_.includes(_.keys(OrganizationResources), resource)) {
480+
logger.info(`Processing org level resource: ${resource}`)
481+
// process org resources such as org skill providers
482+
const orgResource = OrganizationResources[resource]
483+
orgResource.validate(entity)
484+
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
485+
const relateId = entity[orgResource.relateKey]
486+
487+
// check the resource exist
488+
if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
489+
logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
490+
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
491+
} else {
492+
const updateIndex = _.findIndex(org[orgResource.propertyName], [orgResource.relateKey, relateId])
493+
org[orgResource.propertyName].splice(updateIndex, 1, entity)
494+
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
495+
}
496+
} else {
497+
logger.info(`Ignore this message since resource is not in [${_.union(_.values(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
498+
}
301499
}
302500

303501
/**
@@ -306,13 +504,53 @@ async function processCreate (resource, entity) {
306504
* @param {Object} entity entity object
307505
*/
308506
async function processDelete (resource, entity) {
309-
helper.validProperties(entity, ['id'])
310-
await esClient.delete({
311-
index: DOCUMENTS[resource].index,
312-
type: DOCUMENTS[resource].type,
313-
id: entity.id,
314-
refresh: 'wait_for'
315-
})
507+
if (_.includes(_.keys(TopResources), resource)) {
508+
// process the top resources such as user, skill...
509+
helper.validProperties(entity, ['id'])
510+
await esClient.delete({
511+
index: TopResources[resource].index,
512+
type: TopResources[resource].type,
513+
id: entity.id,
514+
refresh: 'wait_for'
515+
})
516+
if (TopResources[resource].enrich) {
517+
await esClient.enrich.executePolicy({
518+
name: TopResources[resource].enrich.policyName
519+
})
520+
}
521+
} else if (_.includes(_.keys(UserResources), resource)) {
522+
// process user resources such as userSkill, userAttribute...
523+
const userResource = UserResources[resource]
524+
userResource.validate(entity)
525+
const { seqNo, primaryTerm, user } = await getUser(entity.userId)
526+
const relateId = entity[userResource.relateKey]
527+
528+
// check the resource exist
529+
if (!user[userResource.propertyName] || !_.some(user[userResource.propertyName], [userResource.relateKey, relateId])) {
530+
logger.error(`The ${resource} with the ${userResource.relateKey}: ${relateId}, userId: ${entity.userId} not exist`)
531+
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
532+
} else {
533+
_.remove(user[userResource.propertyName], [userResource.relateKey, relateId])
534+
await updateUser(entity.userId, user, seqNo, primaryTerm)
535+
}
536+
} else if (_.includes(_.keys(OrganizationResources), resource)) {
537+
// process user resources such as org skill provider
538+
const orgResource = OrganizationResources[resource]
539+
orgResource.validate(entity)
540+
const { seqNo, primaryTerm, org } = await getOrg(entity.organizationId)
541+
const relateId = entity[orgResource.relateKey]
542+
543+
// check the resource exist
544+
if (!org[orgResource.propertyName] || !_.some(org[orgResource.propertyName], [orgResource.relateKey, relateId])) {
545+
logger.error(`The ${resource} with the ${orgResource.relateKey}: ${relateId}, organizationId: ${entity.organizationId} not exist`)
546+
throw helper.getErrorWithStatus('[resource_not_found_exception]', 404)
547+
} else {
548+
_.remove(org[orgResource.propertyName], [orgResource.relateKey, relateId])
549+
await updateOrg(entity.organizationId, org, seqNo, primaryTerm)
550+
}
551+
} else {
552+
logger.info(`Ignore this message since resource is not in [${_.union(_.keys(TopResources), _.keys(UserResources), _.keys(OrganizationResources))}]`)
553+
}
316554
}
317555

318556
async function getOrganizationId (handle) {
@@ -1487,7 +1725,7 @@ async function searchAchievementValues ({ organizationId, keyword }) {
14871725

14881726
module.exports = {
14891727
processCreate,
1490-
processUpdate: processCreate,
1728+
processUpdate,
14911729
processDelete,
14921730
searchElasticSearch,
14931731
getFromElasticSearch,

0 commit comments

Comments
 (0)