Skip to content

Commit 63122a5

Browse files
authored
move all dirty related methods to cacheMixin (#162)
* move all dirty related methods to cacheMixin * move code to mixins * restore setMemory() * v0.49.14 * clean * rename CacheMixin -> DirtyMixin * rename back: Dirty -> Cache * rename back: Dirty -> Cache * finish dirty payload code * 1.3.1 * fix pack testing * 1.3.2
1 parent b4f3136 commit 63122a5

17 files changed

+293
-140
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "wechaty-puppet",
3-
"version": "1.2.12",
3+
"version": "1.3.2",
44
"description": "Abstract Puppet for Wechaty",
55
"type": "module",
66
"exports": {
@@ -100,7 +100,7 @@
100100
"brolog": "^1.13.6",
101101
"clone-class": "^1.0.2",
102102
"file-box": "^1.2.3",
103-
"gerror": "^1.0.2",
103+
"gerror": "^1.0.6",
104104
"memory-card": "^0.12.2",
105105
"state-switch": "^1.1.14",
106106
"typed-emitter": "^1.4.0",

src/agents/cache-agent.spec.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,5 @@ import { test } from 'tstest'
55
import { CacheAgent } from './cache-agent.js'
66

77
test('CacheAgent roomMemberId() restart', async t => {
8-
const payloadCache = new CacheAgent()
9-
const roomMemberId = payloadCache.roomMemberId('roomId', 'userId')
10-
t.equal(roomMemberId, 'roomId-userId', 'should get right id')
8+
t.ok(CacheAgent, 'tbw')
119
})

src/agents/cache-agent.ts

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,23 @@ import type {
2929

3030
type PayloadCacheOptions = Required<PuppetOptions>['cache']
3131

32+
interface LruRoomMemberPayload {
33+
[memberContactId: string]: RoomMemberPayload
34+
}
35+
3236
class CacheAgent {
3337

3438
readonly contact : QuickLru<string, ContactPayload>
3539
readonly friendship : QuickLru<string, FriendshipPayload>
3640
readonly message : QuickLru<string, MessagePayload>
3741
readonly room : QuickLru<string, RoomPayload>
3842
readonly roomInvitation : QuickLru<string, RoomInvitationPayload>
39-
readonly roomMember : QuickLru<string, RoomMemberPayload>
43+
readonly roomMember : QuickLru<string, LruRoomMemberPayload>
4044

4145
constructor (
4246
protected options?: PayloadCacheOptions,
4347
) {
44-
log.verbose('CacheAgent', 'constructor(%s)',
48+
log.verbose('PuppetCacheAgent', 'constructor(%s)',
4549
options
4650
? JSON.stringify(options)
4751
: '',
@@ -67,7 +71,7 @@ class CacheAgent {
6771
this.roomInvitation = new QuickLru<string, RoomInvitationPayload>(lruOptions(
6872
envVars.WECHATY_PUPPET_LRU_CACHE_SIZE_ROOM_INVITATION(options?.roomInvitation)),
6973
)
70-
this.roomMember = new QuickLru<string, RoomMemberPayload>(lruOptions(
74+
this.roomMember = new QuickLru<string, LruRoomMemberPayload>(lruOptions(
7175
envVars.WECHATY_PUPPET_LRU_CACHE_SIZE_ROOM_MEMBER(options?.roomMember)),
7276
)
7377
this.room = new QuickLru<string, RoomPayload>(lruOptions(
@@ -77,12 +81,12 @@ class CacheAgent {
7781
}
7882

7983
start (): void {
80-
log.verbose('CacheAgent', 'start()')
84+
log.verbose('PuppetCacheAgent', 'start()')
8185
this.clear()
8286
}
8387

8488
stop (): void {
85-
log.verbose('CacheAgent', 'stop()')
89+
log.verbose('PuppetCacheAgent', 'stop()')
8690
this.clear()
8791
}
8892

@@ -97,7 +101,7 @@ class CacheAgent {
97101
* Huan(2021-08-28): clear the cache when stop
98102
*/
99103
clear (): void {
100-
log.verbose('CacheAgent', 'clear()')
104+
log.verbose('PuppetCacheAgent', 'clear()')
101105

102106
this.contact.clear()
103107
this.friendship.clear()
@@ -107,16 +111,6 @@ class CacheAgent {
107111
this.roomMember.clear()
108112
}
109113

110-
/**
111-
* Concat roomId & contactId to one string
112-
*/
113-
roomMemberId (
114-
roomId : string,
115-
memberId : string,
116-
): string {
117-
return roomId + '-' + memberId
118-
}
119-
120114
}
121115

122116
export type { PayloadCacheOptions }

src/agents/watchdog-agent.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {
33
WatchdogFood,
44
} from 'watchdog'
55

6-
import type { PuppetSkelton } from '../puppet/puppet-skelton.js'
6+
import type { PuppetSkelton } from '../puppet/mod.js'
77
import type { ServiceMixin } from '../mixins/service-mixin.js'
88

99
import {
@@ -21,7 +21,7 @@ class WatchdogAgent {
2121
constructor (
2222
protected readonly puppet: PuppetSkelton & InstanceType<ServiceMixin>,
2323
) {
24-
log.verbose('WatchdogAgent', 'constructor(%s)', puppet.id)
24+
log.verbose('PuppetWatchdogAgent', 'constructor(%s)', puppet.id)
2525

2626
this.cleanCallbackList = []
2727

@@ -31,15 +31,15 @@ class WatchdogAgent {
3131
* feed the watchdog by `this.emit('heartbeat', ...)`
3232
*/
3333
const timeoutSeconds = puppet.options.timeoutSeconds || DEFAULT_WATCHDOG_TIMEOUT_SECONDS
34-
log.verbose('WatchdogAgent', 'constructor() watchdog timeout set to %d seconds', timeoutSeconds)
34+
log.verbose('PuppetWatchdogAgent', 'constructor() timeout %d seconds', timeoutSeconds)
3535
this.watchdog = new Watchdog(1000 * timeoutSeconds, 'Puppet')
3636

3737
// /**
3838
// * 2. Setup `reset` Event via a 1 second Throttle Queue:
3939
// */
4040
// this.resetThrottleQueue = new ThrottleQueue<string>(1000)
4141
// this.resetThrottleQueue.subscribe(reason => {
42-
// log.silly('WatchdogAgent', 'constructor() resetThrottleQueue.subscribe() reason: "%s"', reason)
42+
// log.silly('PuppetWatchdogAgent', 'constructor() resetThrottleQueue.subscribe() reason: "%s"', reason)
4343
// puppet.reset(reason)
4444
// })
4545
}
@@ -50,20 +50,30 @@ class WatchdogAgent {
5050
*/
5151
const feed = (food: WatchdogFood) => { this.watchdog.feed(food) }
5252
this.puppet.on('heartbeat', feed)
53-
this.cleanCallbackList.push(() => this.puppet.off('heartbeat', feed))
53+
log.verbose('PuppetWatchdogAgent', 'start() "heartbeat" event listener added')
54+
55+
this.cleanCallbackList.push(() => {
56+
this.puppet.off('heartbeat', feed)
57+
log.verbose('PuppetWatchdogAgent', 'start() "heartbeat" event listener removed')
58+
})
5459

5560
/**
5661
* watchdog event `reset` to reset() puppet
5762
*/
5863
const reset = (lastFood: WatchdogFood) => {
59-
log.warn('WatchdogAgent', 'start() reset() reason: %s', JSON.stringify(lastFood))
64+
log.warn('PuppetWatchdogAgent', 'start() reset() reason: %s', JSON.stringify(lastFood))
6065
this.puppet.emit('error', new Error(
6166
`WatchdogAgent reset: lastFood: "${JSON.stringify(lastFood)}"`,
6267
))
6368
this.puppet.wrapAsync(this.puppet.reset())
6469
}
6570
this.watchdog.on('reset', reset)
66-
this.cleanCallbackList.push(() => this.puppet.off('reset', reset))
71+
log.verbose('PuppetWatchdogAgent', 'start() "reset" event listener added')
72+
73+
this.cleanCallbackList.push(() => {
74+
this.puppet.off('reset', reset)
75+
log.verbose('PuppetWatchdogAgent', 'start() "reset" event listener removed')
76+
})
6777

6878
// this.puppet.on('reset', this.throttleReset)
6979
}

src/env-vars.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ const DEFAULT_LRU_CACHE_SIZE_FRIENDSHIP = 100
33
const DEFAULT_LRU_CACHE_SIZE_MESSAGE = 500
44
const DEFAULT_LRU_CACHE_SIZE_ROOM = 100
55
const DEFAULT_LRU_CACHE_SIZE_ROOM_INVITATION = 100
6-
const DEFAULT_LRU_CACHE_SIZE_ROOM_MEMBER = 500
6+
const DEFAULT_LRU_CACHE_SIZE_ROOM_MEMBER = 50
77

88
const getNumberEnv = (env: typeof process.env) => (
99
varName : string,

src/mixins/cache-mixin.ts

Lines changed: 145 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,32 @@
1-
import {
2-
log,
3-
} from '../config.js'
1+
import { log } from '../config.js'
2+
3+
import type {
4+
PuppetOptions,
5+
EventDirtyPayload,
6+
} from '../schemas/mod.js'
7+
import { PayloadType } from '../schemas/mod.js'
48

5-
import type { PuppetOptions } from '../schemas/puppet.js'
69
import { CacheAgent } from '../agents/mod.js'
710

8-
import type { PuppetSkelton } from '../puppet/puppet-skelton.js'
11+
import type { PuppetSkelton } from '../puppet/mod.js'
12+
import { timeoutPromise } from 'gerror'
913

14+
/**
15+
*
16+
* Huan(202111) Issue #158 - Refactoring the 'dirty' event, dirtyPayload(),
17+
* and XXXPayloadDirty() methods logic & spec
18+
*
19+
* @see https://github.com/wechaty/puppet/issues/158
20+
*
21+
*/
1022
const cacheMixin = <MixinBase extends typeof PuppetSkelton>(mixinBase: MixinBase) => {
1123

1224
abstract class CacheMixin extends mixinBase {
1325

1426
cache: CacheAgent
1527

28+
_cacheMixinCleanCallbackList: Function[]
29+
1630
constructor (...args: any[]) {
1731
super(...args)
1832
log.verbose('PuppetCacheMixin', 'constructor(%s)',
@@ -23,21 +37,144 @@ const cacheMixin = <MixinBase extends typeof PuppetSkelton>(mixinBase: MixinBase
2337

2438
const options: PuppetOptions = args[0] || {}
2539

40+
this._cacheMixinCleanCallbackList = []
2641
this.cache = new CacheAgent(options.cache)
2742
}
2843

2944
override async start (): Promise<void> {
3045
log.verbose('PuppetCacheMixin', 'start()')
3146
await super.start()
3247
this.cache.start()
48+
49+
const onDirty = this.onDirty.bind(this)
50+
51+
this.on('dirty', onDirty)
52+
log.verbose('PuppetCacheMixin', 'start() "dirty" event listener added')
53+
54+
const cleanFn = () => {
55+
this.off('dirty', onDirty)
56+
log.verbose('PuppetCacheMixin', 'start() "dirty" event listener removed')
57+
}
58+
this._cacheMixinCleanCallbackList.push(cleanFn)
3359
}
3460

3561
override async stop (): Promise<void> {
3662
log.verbose('PuppetCacheMixin', 'stop()')
3763
this.cache.stop()
64+
65+
while (this._cacheMixinCleanCallbackList.length) {
66+
const fn = this._cacheMixinCleanCallbackList.shift()
67+
if (fn) {
68+
fn()
69+
}
70+
}
71+
3872
await super.stop()
3973
}
4074

75+
/**
76+
*
77+
* @windmemory(202008): add dirty payload methods
78+
*
79+
* @see https://github.com/wechaty/grpc/pull/79
80+
*
81+
* Call this method when you want to notify the server that the data cache need to be invalidated.
82+
*/
83+
dirtyPayload (
84+
type : PayloadType,
85+
id : string,
86+
): void {
87+
log.verbose('PuppetCacheMixin', 'dirtyPayload(%s<%s>, %s)', PayloadType[type], type, id)
88+
this.emit('dirty', {
89+
payloadId : id,
90+
payloadType : type,
91+
})
92+
}
93+
94+
/**
95+
* OnDirty will be registered as a `dirty` event listener,
96+
* and it is in charge of invalidating the cache.
97+
*/
98+
async onDirty ({ payloadType, payloadId }: EventDirtyPayload) {
99+
log.verbose('PuppetCacheMixin', 'onDirty(%s<%s>, %s)', PayloadType[payloadType], payloadType, payloadId)
100+
101+
const dirtyMap = {
102+
[PayloadType.Contact]: (id: string) => this.cache.contact.delete(id),
103+
[PayloadType.Friendship]: (id: string) => this.cache.friendship.delete(id),
104+
[PayloadType.Message]: (id: string) => this.cache.message.delete(id),
105+
[PayloadType.Room]: (id: string) => this.cache.room.delete(id),
106+
[PayloadType.RoomMember]: (id: string) => this.cache.roomMember.delete(id),
107+
[PayloadType.Unspecified]: (id: string) => { throw new Error('Unspecified type with id: ' + id) },
108+
}
109+
110+
/**
111+
* Huan(202111) use `!` to force throw exception when there's any unknown `PayloadType`
112+
*/
113+
dirtyMap[payloadType]!(payloadId)
114+
}
115+
116+
/**
117+
* When we are using PuppetService, the `dirty` event will be emitted from the server,
118+
* and we need to wait for the `dirty` event so we can make sure the cache has been invalidated.
119+
*/
120+
async dirtyPayloadAwait (
121+
type : PayloadType,
122+
id : string,
123+
): Promise<void> {
124+
log.verbose('PuppetCacheMixin', 'dirtyPayloadAwait(%s<%s>, %s)', PayloadType[type], type, id)
125+
126+
const isCurrentDirtyEvent = (event: EventDirtyPayload) =>
127+
event.payloadId === id && event.payloadType === type
128+
129+
const onDirtyResolve = (resolve: () => void) => {
130+
const onDirty = (event: EventDirtyPayload) => {
131+
if (isCurrentDirtyEvent(event)) {
132+
resolve()
133+
}
134+
}
135+
return onDirty
136+
}
137+
138+
let onDirty: ReturnType<typeof onDirtyResolve>
139+
140+
const future = new Promise<void>(resolve => {
141+
onDirty = onDirtyResolve(resolve)
142+
this.on('dirty', onDirty)
143+
})
144+
145+
/**
146+
* 1. call for sending the `dirty` event
147+
*/
148+
this.dirtyPayload(type, id)
149+
150+
/**
151+
* 2. wait for the `dirty` event arrive, with a 5 seconds timeout
152+
*/
153+
try {
154+
await timeoutPromise(future, 5 * 1000)
155+
.finally(() => this.off('dirty', onDirty))
156+
157+
} catch (e) {
158+
// timeout, log warning & ignore it
159+
log.warn('PuppetCacheMixin',
160+
[
161+
'dirtyPayloadAwait() timeout.',
162+
'The `dirty` event should be received but no one found.',
163+
'Learn more from https://github.com/wechaty/puppet/issues/158',
164+
'payloadType: %s(%s)',
165+
'payloadId: %s',
166+
'error: %s',
167+
'stack: %s',
168+
].join('\n,'),
169+
PayloadType[type],
170+
type,
171+
id,
172+
(e as Error).message,
173+
(e as Error).stack,
174+
)
175+
}
176+
}
177+
41178
}
42179

43180
return CacheMixin
@@ -47,6 +184,9 @@ type CacheMixin = ReturnType<typeof cacheMixin>
47184

48185
type ProtectedPropertyCacheMixin =
49186
| 'cache'
187+
| 'onDirty'
188+
| '_cacheMixinCleanCallbackList'
189+
| 'dirtyPayloadAwait'
50190

51191
export type {
52192
CacheMixin,

0 commit comments

Comments
 (0)