Skip to content

Commit 3c04223

Browse files
authored
Merge branch 'main' into groupspill
2 parents 2130640 + 62c1148 commit 3c04223

File tree

12 files changed

+597
-96
lines changed

12 files changed

+597
-96
lines changed

pkg/fileservice/fifocache/data_cache.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ var seed = maphash.MakeSeed()
4444
func shardCacheKey(key fscache.CacheKey) uint64 {
4545
var hasher maphash.Hash
4646
hasher.SetSeed(seed)
47+
// Offset and Path is good enough to distribute keys to shards.
48+
// TestShardCacheKey will fail if shardCacheKey is not good enough
4749
hasher.Write(util.UnsafeToBytes(&key.Offset))
4850
hasher.WriteString(key.Path)
4951
return hasher.Sum64()
@@ -80,10 +82,7 @@ func (d *DataCache) deletePath(ctx context.Context, shardIndex int, path string)
8082
for key, item := range shard.values {
8183
if key.Path == path {
8284
delete(shard.values, key)
83-
// key deleted, call postEvict
84-
if d.fifo.postEvict != nil {
85-
d.fifo.postEvict(ctx, item.key, item.value, item.size)
86-
}
85+
d.fifo.purgeItemValue(ctx, item)
8786
}
8887
}
8988
}

pkg/fileservice/fifocache/fifo.go

Lines changed: 107 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@ import (
2020
"sync"
2121
"sync/atomic"
2222

23-
"golang.org/x/sys/cpu"
24-
2523
"github.com/matrixorigin/matrixone/pkg/fileservice/fscache"
24+
"golang.org/x/sys/cpu"
2625
)
2726

2827
const numShards = 256
2928

3029
// Cache implements an in-memory cache with FIFO-based eviction
31-
// it's mostly like the S3-fifo, only without the ghost queue part
3230
type Cache[K comparable, V any] struct {
3331
capacity fscache.CapacityFunc
3432
capacity1 fscache.CapacityFunc
@@ -51,15 +49,18 @@ type Cache[K comparable, V any] struct {
5149
queue1 Queue[*_CacheItem[K, V]]
5250
used2 int64
5351
queue2 Queue[*_CacheItem[K, V]]
52+
ghostSize int64
53+
ghost Queue[*_CacheItem[K, V]]
5454

5555
capacityCut atomic.Int64
5656
}
5757

5858
type _CacheItem[K comparable, V any] struct {
59-
key K
60-
value V
61-
size int64
62-
count atomic.Int32
59+
key K
60+
value V
61+
valueOK bool
62+
size int64
63+
count atomic.Int32
6364
}
6465

6566
func (c *_CacheItem[K, V]) inc() {
@@ -101,6 +102,7 @@ func New[K comparable, V any](
101102
itemQueue: make(chan *_CacheItem[K, V], runtime.GOMAXPROCS(0)*2),
102103
queue1: *NewQueue[*_CacheItem[K, V]](),
103104
queue2: *NewQueue[*_CacheItem[K, V]](),
105+
ghost: *NewQueue[*_CacheItem[K, V]](),
104106
keyShardFunc: keyShardFunc,
105107
postSet: postSet,
106108
postGet: postGet,
@@ -116,16 +118,37 @@ func (c *Cache[K, V]) set(ctx context.Context, key K, value V, size int64) *_Cac
116118
shard := &c.shards[c.keyShardFunc(key)%numShards]
117119
shard.Lock()
118120
defer shard.Unlock()
119-
_, ok := shard.values[key]
121+
122+
oldItem, ok := shard.values[key]
120123
if ok {
121-
// existed
124+
125+
// ghost item
126+
if !oldItem.valueOK {
127+
// insert new item
128+
item := &_CacheItem[K, V]{
129+
key: key,
130+
value: value,
131+
valueOK: true,
132+
size: size,
133+
}
134+
item.count.Store(oldItem.count.Load())
135+
// replacing the oldItem. oldItem will be evicted from ghost queue eventually.
136+
shard.values[key] = item
137+
if c.postSet != nil {
138+
c.postSet(ctx, key, value, size)
139+
}
140+
return item
141+
}
142+
143+
// existed and value ok, skip set
122144
return nil
123145
}
124146

125147
item := &_CacheItem[K, V]{
126-
key: key,
127-
value: value,
128-
size: size,
148+
key: key,
149+
value: value,
150+
valueOK: true,
151+
size: size,
129152
}
130153
shard.values[key] = item
131154
if c.postSet != nil {
@@ -137,6 +160,7 @@ func (c *Cache[K, V]) set(ctx context.Context, key K, value V, size int64) *_Cac
137160

138161
func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) {
139162
if item := c.set(ctx, key, value, size); item != nil {
163+
// item inserted, enqueue
140164
c.enqueue(item)
141165
c.Evict(ctx, nil, 0)
142166
}
@@ -175,19 +199,33 @@ func (c *Cache[K, V]) enqueue(item *_CacheItem[K, V]) {
175199
}
176200

177201
func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) {
202+
var item *_CacheItem[K, V]
203+
defer func() {
204+
// item ok, increase count
205+
if item != nil {
206+
item.inc()
207+
}
208+
}()
209+
178210
shard := &c.shards[c.keyShardFunc(key)%numShards]
179211
shard.Lock()
180-
var item *_CacheItem[K, V]
212+
defer shard.Unlock()
213+
181214
item, ok = shard.values[key]
182215
if !ok {
183-
shard.Unlock()
216+
// not exist
217+
return
218+
}
219+
220+
// ghost item
221+
if !item.valueOK {
222+
ok = false
184223
return
185224
}
225+
186226
if c.postGet != nil {
187227
c.postGet(ctx, item.key, item.value, item.size)
188228
}
189-
shard.Unlock()
190-
item.inc()
191229
return item.value, true
192230
}
193231

@@ -200,11 +238,9 @@ func (c *Cache[K, V]) Delete(ctx context.Context, key K) {
200238
return
201239
}
202240
delete(shard.values, key)
203-
// key deleted, call postEvict
204-
if c.postEvict != nil {
205-
c.postEvict(ctx, item.key, item.value, item.size)
206-
}
207-
// queues will be update in evict
241+
c.purgeItemValue(ctx, item)
242+
// we do not update queues here, to reduce cost
243+
// deleted item in queue will be evicted eventually.
208244
}
209245

210246
func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut int64) {
@@ -279,27 +315,15 @@ func (c *Cache[K, V]) evict1(ctx context.Context) {
279315
c.used1 -= item.size
280316
c.used2 += item.size
281317
} else {
282-
// evict
283-
c.deleteItem(ctx, item)
318+
// put ghost
319+
c.enqueueGhost(ctx, item)
320+
c.evictGhost()
284321
c.used1 -= item.size
285322
return
286323
}
287324
}
288325
}
289326

290-
func (c *Cache[K, V]) deleteItem(ctx context.Context, item *_CacheItem[K, V]) {
291-
shard := &c.shards[c.keyShardFunc(item.key)%numShards]
292-
shard.Lock()
293-
defer shard.Unlock()
294-
if _, ok := shard.values[item.key]; ok {
295-
delete(shard.values, item.key)
296-
// key deleted, call postEvict
297-
if c.postEvict != nil {
298-
c.postEvict(ctx, item.key, item.value, item.size)
299-
}
300-
}
301-
}
302-
303327
func (c *Cache[K, V]) evict2(ctx context.Context) {
304328
// queue 2
305329
for {
@@ -313,10 +337,55 @@ func (c *Cache[K, V]) evict2(ctx context.Context) {
313337
c.queue2.enqueue(item)
314338
item.dec()
315339
} else {
316-
// evict
317-
c.deleteItem(ctx, item)
340+
// put ghost
341+
c.enqueueGhost(ctx, item)
342+
c.evictGhost()
318343
c.used2 -= item.size
319344
return
320345
}
321346
}
322347
}
348+
349+
func (c *Cache[K, V]) enqueueGhost(ctx context.Context, item *_CacheItem[K, V]) {
350+
c.ghost.enqueue(item)
351+
c.ghostSize += item.size
352+
353+
shard := &c.shards[c.keyShardFunc(item.key)%numShards]
354+
shard.Lock()
355+
defer shard.Unlock()
356+
c.purgeItemValue(ctx, item)
357+
}
358+
359+
func (c *Cache[K, V]) purgeItemValue(ctx context.Context, item *_CacheItem[K, V]) {
360+
if !item.valueOK {
361+
return
362+
}
363+
if c.postEvict != nil {
364+
c.postEvict(ctx, item.key, item.value, item.size)
365+
}
366+
item.valueOK = false
367+
var zero V
368+
item.value = zero
369+
}
370+
371+
func (c *Cache[K, V]) evictGhost() {
372+
ghostCapacity := c.capacity() - c.capacity1() // same to queue2 capacity
373+
for c.ghostSize > ghostCapacity {
374+
item, ok := c.ghost.dequeue()
375+
if !ok {
376+
break
377+
}
378+
c.ghostSize -= item.size
379+
c.deleteItem(item)
380+
}
381+
}
382+
383+
func (c *Cache[K, V]) deleteItem(item *_CacheItem[K, V]) {
384+
shard := &c.shards[c.keyShardFunc(item.key)%numShards]
385+
shard.Lock()
386+
defer shard.Unlock()
387+
// item may be replaced in set, check before delete
388+
if shard.values[item.key] == item {
389+
delete(shard.values, item.key)
390+
}
391+
}

pkg/fileservice/fifocache/fifo_test.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ func TestCacheEvict2(t *testing.T) {
8383
func TestCacheEvict3(t *testing.T) {
8484
ctx := context.Background()
8585
var nEvict, nGet, nSet int
86-
cache := New(fscache.ConstCapacity(1024),
86+
cache := New(
87+
fscache.ConstCapacity(1024),
8788
ShardInt[int],
8889
func(_ context.Context, _ int, _ bool, _ int64) {
8990
nSet++
@@ -135,3 +136,32 @@ func TestDoubleFree(t *testing.T) {
135136
// check
136137
assert.Equal(t, 1, evicts[1])
137138
}
139+
140+
func TestGhostQueue(t *testing.T) {
141+
numSet := make(map[int]int)
142+
numEvict := make(map[int]int)
143+
cache := New(
144+
fscache.ConstCapacity(1),
145+
ShardInt,
146+
func(ctx context.Context, key int, value int, size int64) {
147+
numSet[key]++
148+
},
149+
nil,
150+
func(ctx context.Context, key int, value int, size int64) {
151+
numEvict[key]++
152+
},
153+
)
154+
cache.Set(t.Context(), 1, 1, 1)
155+
cache.Set(t.Context(), 2, 2, 1)
156+
assert.Equal(t, 1, numEvict[1])
157+
// 1 is in the ghost queue now
158+
_, ok := cache.Get(t.Context(), 1)
159+
if ok {
160+
t.Fatal()
161+
}
162+
cache.Set(t.Context(), 1, 1, 1)
163+
assert.Equal(t, 2, numSet[1])
164+
// 2 is in the ghost queue now
165+
cache.Set(t.Context(), 3, 3, 1)
166+
// 2 was evicted from ghost queue
167+
}

0 commit comments

Comments
 (0)