Skip to content

Commit 570d0eb

Browse files
committed
sstable: fixes to prefix replacing iterator
We add more logic inside the prefix replacing iterators: - pass through `SeekGE` calls even when not strictly necessary, to enable optimizations indicated by the flags. - keep track of whether we are positioned before/after the synthetic prefix range, to enable expected behaviors like `SeekGE` after the range followed by `Prev` (or `SeekLT` to before the range followed by `Next`)
1 parent bde41ee commit 570d0eb

File tree

3 files changed

+136
-48
lines changed

3 files changed

+136
-48
lines changed

sstable/prefix_replacing_iterator.go

Lines changed: 117 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,38 @@ type prefixReplacingIterator struct {
2323
contentPrefix []byte
2424
syntheticPrefix []byte
2525

26-
// keyInRange is a valid key in the logical range that has the syntheticPrefix.
27-
// When an argument to a seek function does not have the syntheticPrefix,
28-
// keyInRange is used to determine if the argument key is before or after the
29-
// range of keys produced by the iterator.
30-
keyInRange []byte
26+
// lower is a valid key that has syntheticPrefix and is a lower bound for all
27+
// keys produced by i. It is used to determine if the argument key is before
28+
// or after the range of keys produced by the iterator.
29+
lower []byte
3130

3231
// arg and arg2 are buffers that are used to avoid allocations when rewriting
3332
// keys that are provided as arguments. They always start with contentPrefix.
3433
arg, arg2 []byte
3534

35+
state prefixReplacingIteratorState
36+
// Last bounds received via SetBounds.
37+
lowerBound, upperBound []byte
3638
// res is used to avoid allocations when rewriting result keys. It always
3739
// starts with syntheticPrefix.
3840
res InternalKey
3941
err error
40-
// empty is set after a seek operation that returns no keys.
41-
empty bool
4242
}
4343

44-
func errInputPrefixMismatch() error {
45-
return errors.AssertionFailedf("key argument does not have prefix required for replacement")
46-
}
44+
type prefixReplacingIteratorState int8
45+
46+
const (
47+
// inRange indicates that the prefix replacing iterator is "in sync" with the
48+
// underlying iterator; any Next/Prev calls can be passed through.
49+
inRange prefixReplacingIteratorState = iota
50+
// afterRange indicates that our iterator is positioned after the synthetic
51+
// prefix range. A Prev() call should return the last key/span in the range.
52+
afterRange
53+
// beforeRange indicates that our iterator is positioned after the synthetic
54+
// prefix range. A Next() call should return the first key/span in the range.
55+
beforeRange
56+
empty
57+
)
4758

4859
func errOutputPrefixMismatch() error {
4960
return errors.AssertionFailedf("key returned does not have prefix required for replacement")
@@ -56,29 +67,28 @@ var _ Iterator = (*prefixReplacingIterator)(nil)
5667
// `syntheticPrefix`. Every key produced by the underlying iterator must have
5768
// `contentPrefix`.
5869
//
59-
// keyInRange is a valid key that starts with syntheticPrefix. When a seek
60-
// function is called with a key that does not start with syntheticPrefix,
61-
// keyInRange is used to determine if the key is before or after the synthetic
62-
// prefix range.
70+
// lower is a valid key that has syntheticPreficx and is a lower bound for all
71+
// keys produced by i. It is used to determine if an argument key is before
72+
// or after the range of keys produced by the iterator.
6373
//
6474
// INVARIANT: len(syntheticPrefix) > 0 && keyInRange stars with syntheticPrefix.
6575
func newPrefixReplacingIterator(
66-
i Iterator, contentPrefix, syntheticPrefix []byte, keyInRange []byte, cmp base.Compare,
76+
i Iterator, contentPrefix, syntheticPrefix []byte, lower []byte, cmp base.Compare,
6777
) Iterator {
6878
if invariants.Enabled {
6979
if len(syntheticPrefix) == 0 {
7080
panic("newPrefixReplacingIterator called without synthetic prefix")
7181
}
72-
if !bytes.HasPrefix(keyInRange, syntheticPrefix) {
73-
panic(fmt.Sprintf("keyInRange %q does not have synthetic prefix %q", keyInRange, syntheticPrefix))
82+
if !bytes.HasPrefix(lower, syntheticPrefix) {
83+
panic(fmt.Sprintf("lower %q does not have synthetic prefix %q", lower, syntheticPrefix))
7484
}
7585
}
7686
return &prefixReplacingIterator{
7787
i: i,
7888
cmp: cmp,
7989
contentPrefix: contentPrefix,
8090
syntheticPrefix: syntheticPrefix,
81-
keyInRange: keyInRange,
91+
lower: lower,
8292
arg: slices.Clone(contentPrefix),
8393
arg2: slices.Clone(contentPrefix),
8494
res: InternalKey{UserKey: slices.Clone(syntheticPrefix)},
@@ -121,14 +131,16 @@ func (p *prefixReplacingIterator) rewriteResult(
121131
func (p *prefixReplacingIterator) SeekGE(
122132
key []byte, flags base.SeekGEFlags,
123133
) (*InternalKey, base.LazyValue) {
124-
p.empty = false
134+
p.state = inRange
125135
if !bytes.HasPrefix(key, p.syntheticPrefix) {
126-
if p.cmp(key, p.keyInRange) > 0 {
127-
p.empty = true
136+
if p.cmp(key, p.lower) > 0 {
137+
p.state = afterRange
128138
return nil, base.LazyValue{}
129139
}
130-
// Key must be before the range; use First instead.
131-
return p.rewriteResult(p.i.First())
140+
// Key must be before the range; seek to the lower bound instead.
141+
// We don't use First because we may miss out on optimizations passed
142+
// through SeekEFlags.
143+
key = p.lower
132144
}
133145
return p.rewriteResult(p.i.SeekGE(p.rewriteArg(key), flags))
134146
}
@@ -137,13 +149,15 @@ func (p *prefixReplacingIterator) SeekGE(
137149
func (p *prefixReplacingIterator) SeekPrefixGE(
138150
prefix, key []byte, flags base.SeekGEFlags,
139151
) (*InternalKey, base.LazyValue) {
140-
p.empty = false
141-
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
142-
panic(fmt.Sprintf("key %q does not have prefix %q", key, prefix))
143-
}
144-
if !bytes.HasPrefix(prefix, p.syntheticPrefix) {
152+
p.state = inRange
153+
if !bytes.HasPrefix(prefix, p.syntheticPrefix) || !bytes.HasPrefix(key, p.syntheticPrefix) {
145154
// We never produce keys with this prefix; we can return nil.
146-
p.empty = true
155+
if p.cmp(prefix, p.lower) < 0 {
156+
// We still want to seek the underlying iterator to potentially enable
157+
// optimizations passed through flags.
158+
p.i.SeekGE(p.rewriteArg(p.lower), flags)
159+
}
160+
p.state = empty
147161
return nil, base.LazyValue{}
148162
}
149163
return p.rewriteResult(p.i.SeekPrefixGE(p.rewriteArg2(prefix), p.rewriteArg(key), flags))
@@ -153,52 +167,84 @@ func (p *prefixReplacingIterator) SeekPrefixGE(
153167
func (p *prefixReplacingIterator) SeekLT(
154168
key []byte, flags base.SeekLTFlags,
155169
) (*InternalKey, base.LazyValue) {
156-
p.empty = false
170+
p.state = inRange
157171
if !bytes.HasPrefix(key, p.syntheticPrefix) {
158-
if p.cmp(key, p.keyInRange) < 0 {
159-
p.empty = true
172+
if p.cmp(key, p.lower) < 0 {
173+
// Key before the range; no results.
174+
p.state = beforeRange
160175
return nil, base.LazyValue{}
161176
}
162177
// Key must be after the range. Use Last instead.
163178
return p.rewriteResult(p.i.Last())
164179
}
180+
165181
return p.rewriteResult(p.i.SeekLT(p.rewriteArg(key), flags))
166182
}
167183

168184
// First implements the Iterator interface.
169185
func (p *prefixReplacingIterator) First() (*InternalKey, base.LazyValue) {
170-
p.empty = false
186+
p.state = inRange
171187
return p.rewriteResult(p.i.First())
172188
}
173189

174190
// Last implements the Iterator interface.
175191
func (p *prefixReplacingIterator) Last() (*InternalKey, base.LazyValue) {
176-
p.empty = false
192+
p.state = inRange
177193
return p.rewriteResult(p.i.Last())
178194
}
179195

180196
// Next implements the Iterator interface.
181197
func (p *prefixReplacingIterator) Next() (*InternalKey, base.LazyValue) {
182-
if p.empty {
198+
switch p.state {
199+
case empty, afterRange:
183200
return nil, base.LazyValue{}
201+
case beforeRange:
202+
p.state = inRange
203+
if p.lowerBound != nil {
204+
return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone))
205+
}
206+
return p.rewriteResult(p.i.First())
207+
case inRange:
208+
return p.rewriteResult(p.i.Next())
209+
default:
210+
panic("invalid iterator state")
184211
}
185-
return p.rewriteResult(p.i.Next())
186212
}
187213

188214
// NextPrefix implements the Iterator interface.
189215
func (p *prefixReplacingIterator) NextPrefix(succKey []byte) (*InternalKey, base.LazyValue) {
190-
if p.empty {
216+
switch p.state {
217+
case empty, afterRange:
191218
return nil, base.LazyValue{}
219+
case beforeRange:
220+
p.state = inRange
221+
if p.lowerBound != nil {
222+
return p.rewriteResult(p.i.SeekGE(p.lowerBound, base.SeekGEFlagsNone))
223+
}
224+
return p.rewriteResult(p.i.First())
225+
case inRange:
226+
return p.rewriteResult(p.i.NextPrefix(succKey))
227+
default:
228+
panic("invalid iterator state")
192229
}
193-
return p.rewriteResult(p.i.NextPrefix(p.rewriteArg(succKey)))
194230
}
195231

196232
// Prev implements the Iterator interface.
197233
func (p *prefixReplacingIterator) Prev() (*InternalKey, base.LazyValue) {
198-
if p.empty {
234+
switch p.state {
235+
case empty, beforeRange:
199236
return nil, base.LazyValue{}
237+
case afterRange:
238+
p.state = inRange
239+
if p.upperBound != nil {
240+
return p.rewriteResult(p.i.SeekLT(p.upperBound, base.SeekLTFlagsNone))
241+
}
242+
return p.rewriteResult(p.i.Last())
243+
case inRange:
244+
return p.rewriteResult(p.i.Prev())
245+
default:
246+
panic("invalid iterator state")
200247
}
201-
return p.rewriteResult(p.i.Prev())
202248
}
203249

204250
// Error implements the Iterator interface.
@@ -260,6 +306,8 @@ type prefixReplacingFragmentIterator struct {
260306

261307
arg []byte
262308
out1, out2 []byte
309+
310+
state prefixReplacingIteratorState
263311
}
264312

265313
// newPrefixReplacingFragmentIterator wraps a FragmentIterator over some reader
@@ -284,9 +332,6 @@ func newPrefixReplacingFragmentIterator(
284332
}
285333

286334
func (p *prefixReplacingFragmentIterator) rewriteArg(key []byte) ([]byte, error) {
287-
if !bytes.HasPrefix(key, p.syntheticPrefix) {
288-
return nil, errInputPrefixMismatch()
289-
}
290335
p.arg = append(p.arg[:len(p.contentPrefix)], key[len(p.syntheticPrefix):]...)
291336
return p.arg, nil
292337
}
@@ -307,8 +352,10 @@ func (p *prefixReplacingFragmentIterator) rewriteSpan(
307352

308353
// SeekGE implements the FragmentIterator interface.
309354
func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, error) {
355+
p.state = inRange
310356
if !bytes.HasPrefix(key, p.syntheticPrefix) {
311357
if p.cmp(key, p.keyInRange) > 0 {
358+
p.state = afterRange
312359
return nil, nil
313360
}
314361
// Key must be before the range; use First instead.
@@ -323,8 +370,10 @@ func (p *prefixReplacingFragmentIterator) SeekGE(key []byte) (*keyspan.Span, err
323370

324371
// SeekLT implements the FragmentIterator interface.
325372
func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, error) {
373+
p.state = inRange
326374
if !bytes.HasPrefix(key, p.syntheticPrefix) {
327375
if p.cmp(key, p.keyInRange) < 0 {
376+
p.state = beforeRange
328377
return nil, nil
329378
}
330379
// Key must be after the range; use Last instead.
@@ -339,22 +388,44 @@ func (p *prefixReplacingFragmentIterator) SeekLT(key []byte) (*keyspan.Span, err
339388

340389
// First implements the FragmentIterator interface.
341390
func (p *prefixReplacingFragmentIterator) First() (*keyspan.Span, error) {
391+
p.state = inRange
342392
return p.rewriteSpan(p.i.First())
343393
}
344394

345395
// Last implements the FragmentIterator interface.
346396
func (p *prefixReplacingFragmentIterator) Last() (*keyspan.Span, error) {
397+
p.state = inRange
347398
return p.rewriteSpan(p.i.Last())
348399
}
349400

350-
// Close implements the FragmentIterator interface.
401+
// Next implements the FragmentIterator interface.
351402
func (p *prefixReplacingFragmentIterator) Next() (*keyspan.Span, error) {
352-
return p.rewriteSpan(p.i.Next())
403+
switch p.state {
404+
case empty, afterRange:
405+
return nil, nil
406+
case beforeRange:
407+
p.state = inRange
408+
return p.rewriteSpan(p.i.First())
409+
case inRange:
410+
return p.rewriteSpan(p.i.Next())
411+
default:
412+
panic("invalid iterator state")
413+
}
353414
}
354415

355416
// Prev implements the FragmentIterator interface.
356417
func (p *prefixReplacingFragmentIterator) Prev() (*keyspan.Span, error) {
357-
return p.rewriteSpan(p.i.Prev())
418+
switch p.state {
419+
case empty, beforeRange:
420+
return nil, nil
421+
case afterRange:
422+
p.state = inRange
423+
return p.rewriteSpan(p.i.Last())
424+
case inRange:
425+
return p.rewriteSpan(p.i.Prev())
426+
default:
427+
panic("invalid iterator state")
428+
}
358429
}
359430

360431
// Close implements the FragmentIterator interface.

sstable/prefix_replacing_iterator_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,14 @@ func TestPrefixReplacingIterator(t *testing.T) {
9696
got, _ = it.SeekGE(k(100), base.SeekGEFlagsNone)
9797
require.Nil(t, got)
9898

99+
got, _ = it.Prev()
100+
require.Equal(t, k(19), got.UserKey)
101+
99102
got, _ = it.SeekGE(kMax, base.SeekGEFlagsNone)
100103
require.Nil(t, got)
104+
105+
got, _ = it.Prev()
106+
require.Equal(t, k(19), got.UserKey)
101107
})
102108

103109
t.Run("SeekPrefixGE", func(t *testing.T) {
@@ -112,12 +118,18 @@ func TestPrefixReplacingIterator(t *testing.T) {
112118

113119
got, _ = it.SeekPrefixGE(tc.to, k(100), base.SeekGEFlagsNone)
114120
require.Nil(t, got)
121+
122+
got, _ = it.Prev()
123+
require.Equal(t, k(19), got.UserKey)
115124
})
116125

117126
t.Run("SeekLT", func(t *testing.T) {
118127
got, _ = it.SeekLT(kMin, base.SeekLTFlagsNone)
119128
require.Nil(t, got)
120129

130+
got, _ = it.Next()
131+
require.Equal(t, k(0), got.UserKey)
132+
121133
got, _ = it.SeekLT(k(0), base.SeekLTFlagsNone)
122134
require.Nil(t, got)
123135

sstable/reader_virtual.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ var _ CommonReader = (*VirtualReader)(nil)
2929

3030
// Lightweight virtual sstable state which can be passed to sstable iterators.
3131
type virtualState struct {
32-
lower InternalKey
33-
upper InternalKey
32+
// Bounds for the virtual table. Note that when prefixChange is set, these
33+
// bounds are the logical bounds which start with the SyntheticPrefix.
34+
lower InternalKey
35+
upper InternalKey
36+
37+
// Virtual table number, for informative purposes.
3438
fileNum base.FileNum
3539
Compare Compare
3640
isSharedIngested bool
@@ -104,6 +108,7 @@ func (v *VirtualReader) NewCompactionIter(
104108
i, err := v.reader.newCompactionIter(
105109
transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool)
106110
if err == nil && v.vState.prefixChange != nil {
111+
107112
i = newPrefixReplacingIterator(
108113
i, v.vState.prefixChange.ContentPrefix, v.vState.prefixChange.SyntheticPrefix,
109114
v.vState.lower.UserKey, v.reader.Compare,

0 commit comments

Comments
 (0)