Skip to content

Commit e00f715

Browse files
committed
sstable: fixes to prefix replacing iterator
We add more logic inside the prefix replacing iterators: - pass through `SeekGE` calls even when not strictly necessary, to enable optimizations indicated by the flags. - keep track of whether we are positioned before/after the synthetic prefix range, to enable expected behaviors like `SeekGE` after the range followed by `Prev` (or `SeekLT` to before the range followed by `Next`)
1 parent 0b94619 commit e00f715

File tree

3 files changed

+126
-48
lines changed

3 files changed

+126
-48
lines changed

sstable/prefix_replacing_iterator.go

Lines changed: 107 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,36 @@ type prefixReplacingIterator struct {
2323
contentPrefix []byte
2424
syntheticPrefix []byte
2525

26-
// keyInRange is a valid key in the logical range that has the syntheticPrefix.
27-
// When an argument to a seek function does not have the syntheticPrefix,
28-
// keyInRange is used to determine if the argument key is before or after the
29-
// range of keys produced by the iterator.
30-
keyInRange []byte
26+
// lower is a valid key that has syntheticPreficx and is a lower bound for all
27+
// keys produced by i. It is used to determine if the argument key is before
28+
// or after the range of keys produced by the iterator.
29+
lower []byte
3130

3231
// arg and arg2 are buffers that are used to avoid allocations when rewriting
3332
// keys that are provided as arguments. They always start with contentPrefix.
3433
arg, arg2 []byte
3534

3635
// res is used to avoid allocations when rewriting result keys. It always
3736
// starts with syntheticPrefix.
38-
res InternalKey
39-
err error
40-
// empty is set after a seek operation that returns no keys.
41-
empty bool
42-
}
43-
44-
func errInputPrefixMismatch() error {
45-
return errors.AssertionFailedf("key argument does not have prefix required for replacement")
46-
}
37+
res InternalKey
38+
err error
39+
state iteratorState
40+
}
41+
42+
type iteratorState int8
43+
44+
const (
45+
// inSync indicates that the prefix replacing iterator is "in sync" with the
46+
// underlying iterator; any Next/Prev calls can be passed through.
47+
inSync iteratorState = iota
48+
// afterRange indicates that our iterator is positioned after the synthetic
49+
// prefix range. A Prev() call should return the last key/span in the range.
50+
afterRange
51+
// beforeRange indicates that our iterator is positioned after the synthetic
52+
// prefix range. A Next() call should return the first key/span in the range.
53+
beforeRange
54+
empty
55+
)
4756

4857
func errOutputPrefixMismatch() error {
4958
return errors.AssertionFailedf("key returned does not have prefix required for replacement")
@@ -63,22 +72,22 @@ var _ Iterator = (*prefixReplacingIterator)(nil)
6372
//
6473
// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix.
6574
func newPrefixReplacingIterator(
66-
i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare,
75+
i Iterator, contentPrefix, syntheticPrefix []byte, lower []byte, cmp base.Compare,
6776
) Iterator {
6877
if invariants.Enabled {
6978
if len(syntheticPrefix) == 0 {
7079
panic("newPrefixReplacingIterator called without synthetic prefix")
7180
}
72-
if !bytes.HasPrefix(keyInRange, syntheticPrefix) {
73-
panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix))
81+
if !bytes.HasPrefix(lower, syntheticPrefix) {
82+
panic(fmt.Sprintf("lower %q does not have synthetic prefix %q", lower, syntheticPrefix))
7483
}
7584
}
7685
return &prefixReplacingIterator{
7786
i: i,
7887
cmp: cmp,
7988
contentPrefix: contentPrefix,
8089
syntheticPrefix: syntheticPrefix,
81-
keyInRange: keyInRange,
90+
lower: lower,
8291
arg: slices.Clone(contentPrefix),
8392
arg2: slices.Clone(contentPrefix),
8493
res: InternalKey{UserKey: slices.Clone(syntheticPrefix)},
@@ -121,14 +130,16 @@ func (p *prefixReplacingIterator) rewriteResult(
121130
func (p *prefixReplacingIterator) SeekGE(
122131
key []byte, flags base.SeekGEFlags,
123132
) (*InternalKey, base.LazyValue) {
124-
p.empty = false
133+
p.state = inSync
125134
if !bytes.HasPrefix(key, p.syntheticPrefix) {
126-
if p.cmp(key, p.keyInRange) > 0 {
127-
p.empty = true
135+
if p.cmp(key, p.lower) > 0 {
136+
p.state = afterRange
128137
return nil, base.LazyValue{}
129138
}
130-
// Key must be before the range; use First instead.
131-
return p.rewriteResult(p.i.First())
139+
// Key must be before the range; seek to the lower bound instead.
140+
// We don't use First because we may miss out on optimizations passed
141+
// through SeekEFlags.
142+
key = p.lower
132143
}
133144
return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
134145
}
@@ -137,13 +148,15 @@ func (p *prefixReplacingIterator) SeekGE(
137148
func (p *prefixReplacingIterator) SeekPrefixGE(
138149
prefix, key []byte, flags base.SeekGEFlags,
139150
) (*InternalKey, base.LazyValue) {
140-
p.empty = false
141-
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
142-
panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix))
143-
}
144-
if !bytes.HasPrefix(prefix, p.syntheticPrefix) {
151+
p.state = inSync
152+
if !bytes.HasPrefix(prefix, p.syntheticPrefix) || !bytes.HasPrefix(key, p.syntheticPrefix) {
145153
// We never produce keys with this prefix; we can return nil.
146-
p.empty = true
154+
if p.cmp(prefix, p.lower) < 0 {
155+
// We still want to seek the underlying iterator to potentially enable
156+
// optimizations passed through flags.
157+
p.i.SeekGE(p.rewriteArg(p.lower), flags)
158+
}
159+
p.state = empty
147160
return nil, base.LazyValue{}
148161
}
149162
return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
@@ -153,52 +166,75 @@ func (p *prefixReplacingIterator) SeekPrefixGE(
153166
func (p *prefixReplacingIterator) SeekLT(
154167
key []byte, flags base.SeekLTFlags,
155168
) (*InternalKey, base.LazyValue) {
156-
p.empty = false
169+
p.state = inSync
157170
if !bytes.HasPrefix(key, p.syntheticPrefix) {
158-
if p.cmp(key, p.keyInRange) < 0 {
159-
p.empty = true
171+
if p.cmp(key, p.lower) < 0 {
172+
// Key before the range; no results.
173+
p.state = beforeRange
160174
return nil, base.LazyValue{}
161175
}
162176
// Key must be after the range. Use Last instead.
163177
return p.rewriteResult(p.i.Last())
164178
}
179+
165180
return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
166181
}
167182

168183
// First implements the Iterator interface.
169184
func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
170-
p.empty = false
185+
p.state = inSync
171186
return p.rewriteResult(p.i.First())
172187
}
173188

174189
// Last implements the Iterator interface.
175190
func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
176-
p.empty = false
191+
p.state = inSync
177192
return p.rewriteResult(p.i.Last())
178193
}
179194

180195
// Next implements the Iterator interface.
181196
func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
182-
if p.empty {
197+
switch p.state {
198+
case empty, afterRange:
183199
return nil, base.LazyValue{}
200+
case beforeRange:
201+
p.state = inSync
202+
return p.rewriteResult(p.i.First())
203+
case inSync:
204+
return p.rewriteResult(p.i.Next())
205+
default:
206+
panic("invalid iterator state")
184207
}
185-
return p.rewriteResult(p.i.Next())
186208
}
187209

188210
// NextPrefix implements the Iterator interface.
189211
func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
190-
if p.empty {
212+
switch p.state {
213+
case empty, afterRange:
191214
return nil, base.LazyValue{}
215+
case beforeRange:
216+
p.state = inSync
217+
return p.rewriteResult(p.i.First())
218+
case inSync:
219+
return p.rewriteResult(p.i.NextPrefix(succKey))
220+
default:
221+
panic("invalid iterator state")
192222
}
193-
return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
194223
}
195224

196225
// Prev implements the Iterator interface.
197226
func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
198-
if p.empty {
227+
switch p.state {
228+
case empty, beforeRange:
199229
return nil, base.LazyValue{}
230+
case afterRange:
231+
p.state = inSync
232+
return p.rewriteResult(p.i.Last())
233+
case inSync:
234+
return p.rewriteResult(p.i.Prev())
235+
default:
236+
panic("invalid iterator state")
200237
}
201-
return p.rewriteResult(p.i.Prev())
202238
}
203239

204240
// Error implements the Iterator interface.
@@ -260,6 +296,8 @@ type prefixReplacingFragmentIterator struct {
260296

261297
arg []byte
262298
out1, out2 []byte
299+
300+
state iteratorState
263301
}
264302

265303
// newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
@@ -284,9 +322,6 @@ func newPrefixReplacingFragmentIterator(
284322
}
285323

286324
func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
287-
if !bytes.HasPrefix(key, p.syntheticPrefix) {
288-
return nil, errInputPrefixMismatch()
289-
}
290325
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
291326
return p.arg, nil
292327
}
@@ -307,8 +342,10 @@ func (p *prefixReplacingFragmentIterator) rewriteSpan(
307342

308343
// SeekGE implements the FragmentIterator interface.
309344
func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
345+
p.state = inSync
310346
if !bytes.HasPrefix(key, p.syntheticPrefix) {
311347
if p.cmp(key, p.keyInRange) > 0 {
348+
p.state = afterRange
312349
return nil, nil
313350
}
314351
// Key must be before the range; use First instead.
@@ -323,8 +360,10 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err
323360

324361
// SeekLT implements the FragmentIterator interface.
325362
func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
363+
p.state = inSync
326364
if !bytes.HasPrefix(key, p.syntheticPrefix) {
327365
if p.cmp(key, p.keyInRange) < 0 {
366+
p.state = beforeRange
328367
return nil, nil
329368
}
330369
// Key must be after the range; use Last instead.
@@ -339,22 +378,44 @@ func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, err
339378

340379
// First implements the FragmentIterator interface.
341380
func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
381+
p.state = inSync
342382
return p.rewriteSpan(p.i.First())
343383
}
344384

345385
// Last implements the FragmentIterator interface.
346386
func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
387+
p.state = inSync
347388
return p.rewriteSpan(p.i.Last())
348389
}
349390

350-
// Close implements the FragmentIterator interface.
391+
// Next implements the FragmentIterator interface.
351392
func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
352-
return p.rewriteSpan(p.i.Next())
393+
switch p.state {
394+
case empty, afterRange:
395+
return nil, nil
396+
case beforeRange:
397+
p.state = inSync
398+
return p.rewriteSpan(p.i.First())
399+
case inSync:
400+
return p.rewriteSpan(p.i.Next())
401+
default:
402+
panic("invalid iterator state")
403+
}
353404
}
354405

355406
// Prev implements the FragmentIterator interface.
356407
func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
357-
return p.rewriteSpan(p.i.Prev())
408+
switch p.state {
409+
case empty, beforeRange:
410+
return nil, nil
411+
case afterRange:
412+
p.state = inSync
413+
return p.rewriteSpan(p.i.Last())
414+
case inSync:
415+
return p.rewriteSpan(p.i.Prev())
416+
default:
417+
panic("invalid iterator state")
418+
}
358419
}
359420

360421
// Close implements the FragmentIterator interface.

sstable/prefix_replacing_iterator_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,14 @@ func TestPrefixReplacingIterator(t *testing.T) {
9696
got, _ = it.SeekGE(k(100), base.SeekGEFlagsNone)
9797
require.Nil(t, got)
9898

99+
got, _ = it.Prev()
100+
require.Equal(t, k(19), got.UserKey)
101+
99102
got, _ = it.SeekGE(kMax, base.SeekGEFlagsNone)
100103
require.Nil(t, got)
104+
105+
got, _ = it.Prev()
106+
require.Equal(t, k(19), got.UserKey)
101107
})
102108

103109
t.Run("SeekPrefixGE", func(t *testing.T) {
@@ -112,12 +118,18 @@ func TestPrefixReplacingIterator(t *testing.T) {
112118

113119
got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone)
114120
require.Nil(t, got)
121+
122+
got, _ = it.Prev()
123+
require.Equal(t, k(19), got.UserKey)
115124
})
116125

117126
t.Run("SeekLT", func(t *testing.T) {
118127
got, _ = it.SeekLT(kMin, base.SeekLTFlagsNone)
119128
require.Nil(t, got)
120129

130+
got, _ = it.Next()
131+
require.Equal(t, k(0), got.UserKey)
132+
121133
got, _ = it.SeekLT(k(0), base.SeekLTFlagsNone)
122134
require.Nil(t, got)
123135

sstable/reader_virtual.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ var _ CommonReader = (*VirtualReader)(nil)
2929

3030
// Lightweight virtual sstable state which can be passed to sstable iterators.
3131
type virtualState struct {
32-
lower InternalKey
33-
upper InternalKey
32+
// Bounds for the virtual table. Note that when prefixChange is set, these
33+
// bounds are the logical bounds which start with the SyntheticPrefix.
34+
lower InternalKey
35+
upper InternalKey
36+
37+
// Virtual table number, for informative purposes.
3438
fileNum base.FileNum
3539
Compare Compare
3640
isSharedIngested bool
@@ -104,6 +108,7 @@ func (v *VirtualReader) NewCompactionIter(
104108
i, err := v.reader.newCompactionIter(
105109
transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool)
106110
if err == nil && v.vState.prefixChange != nil {
111+
107112
i = newPrefixReplacingIterator(
108113
i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
109114
v.vState.lower.UserKey, v.reader.Compare,

0 commit comments

Comments
 (0)