Skip to content

Commit c1f33a2

Browse files
committed
Merge pull request #6972 from petermattis/pmattis/engine-cleanup
storage/engine: split up Engine interface
2 parents 39b451b + cc23094 commit c1f33a2

17 files changed

+274
-394
lines changed

storage/abort_cache.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,20 @@ func (sc *AbortCache) max() roachpb.Key {
7575

7676
// ClearData removes all persisted items stored in the cache.
7777
func (sc *AbortCache) ClearData(e engine.Engine) error {
78-
_, err := engine.ClearRange(e, engine.MakeMVCCMetadataKey(sc.min()), engine.MakeMVCCMetadataKey(sc.max()))
79-
return err
78+
b := e.NewBatch()
79+
defer b.Close()
80+
_, err := engine.ClearRange(b, engine.MakeMVCCMetadataKey(sc.min()), engine.MakeMVCCMetadataKey(sc.max()))
81+
if err != nil {
82+
return err
83+
}
84+
return b.Commit()
8085
}
8186

8287
// Get looks up an abort cache entry recorded for this transaction ID.
8388
// Returns whether an abort record was found and any error.
8489
func (sc *AbortCache) Get(
8590
ctx context.Context,
86-
e engine.Engine,
91+
e engine.Reader,
8792
txnID *uuid.UUID,
8893
entry *roachpb.AbortCacheEntry,
8994
) (bool, error) {
@@ -103,7 +108,7 @@ func (sc *AbortCache) Get(
103108
// TODO(tschottdorf): should not use a pointer to UUID.
104109
func (sc *AbortCache) Iterate(
105110
ctx context.Context,
106-
e engine.Engine,
111+
e engine.Reader,
107112
f func([]byte, *uuid.UUID, roachpb.AbortCacheEntry),
108113
) {
109114
_, _ = engine.MVCCIterate(ctx, e, sc.min(), sc.max(), roachpb.ZeroTimestamp,
@@ -123,7 +128,7 @@ func (sc *AbortCache) Iterate(
123128
}
124129

125130
func copySeqCache(
126-
e engine.Engine,
131+
e engine.ReadWriter,
127132
ms *engine.MVCCStats,
128133
srcID, dstID roachpb.RangeID,
129134
keyMin, keyMax engine.MVCCKey,
@@ -171,7 +176,7 @@ func copySeqCache(
171176
// abort cache. Failures decoding individual cache entries return an error.
172177
// On success, returns the number of entries (key-value pairs) copied.
173178
func (sc *AbortCache) CopyInto(
174-
e engine.Engine,
179+
e engine.ReadWriter,
175180
ms *engine.MVCCStats,
176181
destRangeID roachpb.RangeID,
177182
) (int, error) {
@@ -187,7 +192,7 @@ func (sc *AbortCache) CopyInto(
187192
// On success, returns the number of entries (key-value pairs) copied.
188193
func (sc *AbortCache) CopyFrom(
189194
ctx context.Context,
190-
e engine.Engine,
195+
e engine.ReadWriter,
191196
ms *engine.MVCCStats,
192197
originRangeID roachpb.RangeID,
193198
) (int, error) {
@@ -199,7 +204,7 @@ func (sc *AbortCache) CopyFrom(
199204
// Del removes all abort cache entries for the given transaction.
200205
func (sc *AbortCache) Del(
201206
ctx context.Context,
202-
e engine.Engine,
207+
e engine.ReadWriter,
203208
ms *engine.MVCCStats,
204209
txnID *uuid.UUID,
205210
) error {
@@ -210,7 +215,7 @@ func (sc *AbortCache) Del(
210215
// Put writes an entry for the specified transaction ID.
211216
func (sc *AbortCache) Put(
212217
ctx context.Context,
213-
e engine.Engine,
218+
e engine.ReadWriter,
214219
ms *engine.MVCCStats,
215220
txnID *uuid.UUID,
216221
entry *roachpb.AbortCacheEntry,

storage/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ func TestSortRangeDescByAge(t *testing.T) {
920920
}
921921
}
922922

923-
func verifyRangeStats(eng engine.Engine, rangeID roachpb.RangeID, expMS engine.MVCCStats) error {
923+
func verifyRangeStats(eng engine.Reader, rangeID roachpb.RangeID, expMS engine.MVCCStats) error {
924924
var ms engine.MVCCStats
925925
if err := engine.MVCCGetRangeStats(context.Background(), eng, rangeID, &ms); err != nil {
926926
return err
@@ -933,7 +933,9 @@ func verifyRangeStats(eng engine.Engine, rangeID roachpb.RangeID, expMS engine.M
933933
return nil
934934
}
935935

936-
func verifyRecomputedStats(eng engine.Engine, d *roachpb.RangeDescriptor, expMS engine.MVCCStats, nowNanos int64) error {
936+
func verifyRecomputedStats(
937+
eng engine.Reader, d *roachpb.RangeDescriptor, expMS engine.MVCCStats, nowNanos int64,
938+
) error {
937939
if ms, err := storage.ComputeStatsForRange(d, eng, nowNanos); err != nil {
938940
return err
939941
} else if expMS != ms {

storage/engine/batch_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func mvccKey(k interface{}) MVCCKey {
4747
}
4848
}
4949

50-
func testBatchBasics(t *testing.T, commit func(e, b Engine) error) {
50+
func testBatchBasics(t *testing.T, commit func(e Engine, b Batch) error) {
5151
stopper := stop.NewStopper()
5252
defer stopper.Stop()
5353
e := NewInMem(roachpb.Attributes{}, 1<<20, stopper)
@@ -118,14 +118,14 @@ func testBatchBasics(t *testing.T, commit func(e, b Engine) error) {
118118
// visible until commit, and then are all visible after commit.
119119
func TestBatchBasics(t *testing.T) {
120120
defer leaktest.AfterTest(t)()
121-
testBatchBasics(t, func(e, b Engine) error {
121+
testBatchBasics(t, func(e Engine, b Batch) error {
122122
return b.Commit()
123123
})
124124
}
125125

126126
func TestBatchRepr(t *testing.T) {
127127
defer leaktest.AfterTest(t)()
128-
testBatchBasics(t, func(e, b Engine) error {
128+
testBatchBasics(t, func(e Engine, b Batch) error {
129129
repr := b.Repr()
130130

131131
// Simple sanity checks about the format of the batch representation. This

storage/engine/engine.go

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,14 @@ type Iterator interface {
8787
ComputeStats(start, end MVCCKey, nowNanos int64) (MVCCStats, error)
8888
}
8989

90-
// Engine is the interface that wraps the core operations of a
91-
// key/value store.
92-
type Engine interface {
93-
// Open initializes the engine.
94-
Open() error
95-
// Close closes the engine, freeing up any outstanding resources.
90+
// Reader is the read interface to an engine's data.
91+
type Reader interface {
92+
// Close closes the reader, freeing up any outstanding resources.
9693
Close()
97-
// Attrs returns the engine/store attributes.
98-
Attrs() roachpb.Attributes
99-
// Put sets the given key to the value provided.
100-
Put(key MVCCKey, value []byte) error
94+
// closed returns true if the reader has been closed or is not usable.
95+
// Objects backed by this reader (e.g. Iterators) can check this to ensure
96+
// that they are not using a closed engine.
97+
closed() bool
10198
// Get returns the value for the given key, nil otherwise.
10299
Get(key MVCCKey) ([]byte, error)
103100
// GetProto fetches the value at the specified key and unmarshals it
@@ -111,6 +108,21 @@ type Engine interface {
111108
// an error, the iteration will stop and return the error.
112109
// If the first result of f is true, the iteration stops.
113110
Iterate(start, end MVCCKey, f func(MVCCKeyValue) (bool, error)) error
111+
// NewIterator returns a new instance of an Iterator over this engine. When
112+
// prefix is true, Seek will use the user-key prefix of the supplied MVCC key
113+
// to restrict which sstables are searched, but iteration (using Next) over
114+
// keys without the same user-key prefix will not work correctly (keys may be
115+
// skipped). The caller must invoke Iterator.Close() when finished with the
116+
// iterator to free resources.
117+
NewIterator(prefix bool) Iterator
118+
}
119+
120+
// Writer is the write interface to an engine's data.
121+
type Writer interface {
122+
// ApplyBatchRepr atomically applies a set of batched updates. Created by
123+
// calling Repr() on a batch. Using this method is equivalent to constructing
124+
// and committing a batch whose Repr() equals repr.
125+
ApplyBatchRepr(repr []byte) error
114126
// Clear removes the item from the db with the given key.
115127
// Note that clear actually removes entries from the storage
116128
// engine, rather than inserting tombstones.
@@ -129,67 +141,63 @@ type Engine interface {
129141
//
130142
// The logic for merges is written in db.cc in order to be compatible with RocksDB.
131143
Merge(key MVCCKey, value []byte) error
144+
// Put sets the given key to the value provided.
145+
Put(key MVCCKey, value []byte) error
146+
}
147+
148+
// ReadWriter is the read/write interface to an engine's data.
149+
type ReadWriter interface {
150+
Reader
151+
Writer
152+
}
153+
154+
// Engine is the interface that wraps the core operations of a key/value store.
155+
type Engine interface {
156+
ReadWriter
157+
// Attrs returns the engine/store attributes.
158+
Attrs() roachpb.Attributes
132159
// Capacity returns capacity details for the engine's available storage.
133160
Capacity() (roachpb.StoreCapacity, error)
134161
// Flush causes the engine to write all in-memory data to disk
135162
// immediately.
136163
Flush() error
137-
// NewIterator returns a new instance of an Iterator over this engine. When
138-
// prefix is true, Seek will use the user-key prefix of the supplied MVCC key
139-
// to restrict which sstables are searched, but iteration (using Next) over
140-
// keys without the same user-key prefix will not work correctly (keys may be
141-
// skipped). The caller must invoke Iterator.Close() when finished with the
142-
// iterator to free resources.
143-
NewIterator(prefix bool) Iterator
164+
// GetStats retrieves stats from the engine.
165+
GetStats() (*Stats, error)
166+
// NewBatch returns a new instance of a batched engine which wraps
167+
// this engine. Batched engines accumulate all mutations and apply
168+
// them atomically on a call to Commit().
169+
NewBatch() Batch
144170
// NewSnapshot returns a new instance of a read-only snapshot
145171
// engine. Snapshots are instantaneous and, as long as they're
146172
// released relatively quickly, inexpensive. Snapshots are released
147173
// by invoking Close(). Note that snapshots must not be used after the
148174
// original engine has been stopped.
149-
NewSnapshot() Engine
150-
// NewBatch returns a new instance of a batched engine which wraps
151-
// this engine. Batched engines accumulate all mutations and apply
152-
// them atomically on a call to Commit().
153-
NewBatch() Batch
154-
// Commit atomically applies any batched updates to the underlying
155-
// engine. This is a noop unless the engine was created via NewBatch().
156-
Commit() error
157-
// ApplyBatchRepr atomically applies a set of batched updates. Created by
158-
// calling Repr() on a batch. Using this method is equivalent to constructing
159-
// and committing a batch whose Repr() equals repr.
160-
ApplyBatchRepr(repr []byte) error
161-
// Repr returns the underlying representation of the batch and can be used to
162-
// reconstitute the batch on a remote node using
163-
// Engine.NewBatchFromRepr(). This method is only valid on engines created
164-
// via NewBatch().
165-
Repr() []byte
166-
// Defer adds a callback to be run after the batch commits
167-
// successfully. If Commit() fails (or if this engine was not
168-
// created via NewBatch()), deferred callbacks are not called. As
169-
// with the defer statement, the last callback to be deferred is the
170-
// first to be executed.
171-
Defer(fn func())
172-
// Closed returns true if the engine has been close or not usable.
173-
// Objects backed by this engine (e.g. Iterators) can check this to ensure
174-
// that they are not using an closed engine.
175-
Closed() bool
176-
// GetStats retrieves stats from the engine.
177-
GetStats() (*Stats, error)
175+
NewSnapshot() Reader
176+
// Open initializes the engine.
177+
Open() error
178178
}
179179

180180
// Batch is the interface for batch specific operations.
181-
//
182-
// TODO(peter): Move various methods of Engine to Batch, such as Commit() and Repr().
183181
type Batch interface {
184-
Engine
182+
ReadWriter
183+
// Commit atomically applies any batched updates to the underlying
184+
// engine. This is a noop unless the engine was created via NewBatch().
185+
Commit() error
186+
// Defer adds a callback to be run after the batch commits successfully. If
187+
// Commit() fails deferred callbacks are not called. As with the defer
188+
// statement, the last callback to be deferred is the first to be executed.
189+
Defer(fn func())
185190
// Distinct returns a view of the existing batch which passes reads directly
186191
// to the underlying engine (the one the batch was created from). That is,
187192
// the returned batch will not read its own writes. This is used as an
188193
// optimization to avoid flushing mutations buffered by the batch in
189194
// situations where we know all of the batched operations are for distinct
190195
// keys. Closing/committing the returned engine is equivalent to
191196
// closing/committing the original batch.
192-
Distinct() Engine
197+
Distinct() ReadWriter
198+
// Repr returns the underlying representation of the batch and can be used to
199+
// reconstitute the batch on a remote node using Writer.ApplyBatchRepr().
200+
Repr() []byte
193201
}
194202

195203
// Stats is a set of RocksDB stats. These are all described in RocksDB
@@ -220,7 +228,7 @@ type Stats struct {
220228
// PutProto sets the given key to the protobuf-serialized byte string
221229
// of msg and the provided timestamp. Returns the length in bytes of
222230
// key and the value.
223-
func PutProto(engine Engine, key MVCCKey, msg proto.Message) (keyBytes, valBytes int64, err error) {
231+
func PutProto(engine Writer, key MVCCKey, msg proto.Message) (keyBytes, valBytes int64, err error) {
224232
bytes, err := protoutil.Marshal(msg)
225233
if err != nil {
226234
return 0, 0, err
@@ -236,7 +244,7 @@ func PutProto(engine Engine, key MVCCKey, msg proto.Message) (keyBytes, valBytes
236244
// Scan returns up to max key/value objects starting from
237245
// start (inclusive) and ending at end (non-inclusive).
238246
// Specify max=0 for unbounded scans.
239-
func Scan(engine Engine, start, end MVCCKey, max int64) ([]MVCCKeyValue, error) {
247+
func Scan(engine Reader, start, end MVCCKey, max int64) ([]MVCCKeyValue, error) {
240248
var kvs []MVCCKeyValue
241249
err := engine.Iterate(start, end, func(kv MVCCKeyValue) (bool, error) {
242250
if max != 0 && int64(len(kvs)) >= max {
@@ -254,18 +262,16 @@ func Scan(engine Engine, start, end MVCCKey, max int64) ([]MVCCKeyValue, error)
254262
// none, and an error will be returned. Note that this function
255263
// actually removes entries from the storage engine, rather than
256264
// inserting tombstones, as with deletion through the MVCC.
257-
func ClearRange(engine Engine, start, end MVCCKey) (int, error) {
258-
b := engine.NewBatch()
259-
defer b.Close()
265+
func ClearRange(engine ReadWriter, start, end MVCCKey) (int, error) {
260266
count := 0
261267
if err := engine.Iterate(start, end, func(kv MVCCKeyValue) (bool, error) {
262-
if err := b.Clear(kv.Key); err != nil {
268+
if err := engine.Clear(kv.Key); err != nil {
263269
return false, err
264270
}
265271
count++
266272
return false, nil
267273
}); err != nil {
268274
return 0, err
269275
}
270-
return count, b.Commit()
276+
return count, nil
271277
}

0 commit comments

Comments
 (0)