-
Notifications
You must be signed in to change notification settings - Fork 884
[codex] Harden multiversion iterator validation #3656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,9 +95,48 @@ func (store *Store) newMVSValidationIterator( | |
| } | ||
| } | ||
|
|
||
| func (vi *validationIterator) hasCurrentValue(key []byte) bool { | ||
| strKey := string(key) | ||
| if _, ok := vi.writeset[strKey]; ok { | ||
| return true | ||
| } | ||
| if _, ok := vi.readCache[strKey]; ok { | ||
| return true | ||
| } | ||
| return vi.mvStore.GetLatestBeforeIndex(vi.index, key) != nil | ||
| } | ||
|
|
||
| func (vi *validationIterator) skipRemovedKeys() { | ||
| for vi.Iterator.Valid() && !vi.hasCurrentValue(vi.Iterator.Key()) { | ||
| vi.Iterator.Next() | ||
| } | ||
| } | ||
|
|
||
| func (vi *validationIterator) Valid() bool { | ||
| vi.skipRemovedKeys() | ||
| return vi.Iterator.Valid() | ||
| } | ||
|
|
||
| func (vi *validationIterator) Next() { | ||
| vi.Iterator.Next() | ||
| vi.skipRemovedKeys() | ||
| } | ||
|
|
||
| func (vi *validationIterator) Key() []byte { | ||
| vi.skipRemovedKeys() | ||
| return vi.Iterator.Key() | ||
| } | ||
|
|
||
| func (vi *validationIterator) WriteAbort(abort occtypes.Abort) { | ||
| select { | ||
| case vi.abortChannel <- abort: | ||
| default: | ||
| } | ||
| } | ||
|
|
||
| // try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator | ||
| func (vi *validationIterator) Value() []byte { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] |
||
| key := vi.Key() | ||
| key := vi.Iterator.Key() | ||
|
|
||
| // try fetch from writeset - return if exists | ||
| if val, ok := vi.writeset[string(key)]; ok { | ||
|
|
@@ -110,10 +149,13 @@ func (vi *validationIterator) Value() []byte { | |
|
|
||
| // get the value from the multiversion store | ||
| val := vi.mvStore.GetLatestBeforeIndex(vi.index, key) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [suggestion] TOCTOU noted by Codex: skipRemovedKeys()/hasCurrentValue() (called from the overridden Valid()/Key()) decide this key still has a current value via GetLatestBeforeIndex, but Value() re-reads GetLatestBeforeIndex here. If a lower-index transaction removes the MV entry between those two reads, this returns nil, which mvsMergeIterator.skipUntilExistsOrInvalid treats as a real delete and skips the key. Because hasCurrentValue does not populate readCache, there is no consistency guarantee for the first Value() of a key. In practice this generally biases toward fewer foundKeys → validation failure (conservative), and the underlying MV store is concurrently mutable regardless, so this is not blocking — but consider caching the hasCurrentValue determination (or the materialized value) so the skip decision and Value() cannot disagree.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @codchen this is worth fixing in a follow up PR you think? |
||
| if val == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // if we have an estimate, write to abort channel | ||
| if val.IsEstimate() { | ||
| vi.abortChannel <- occtypes.NewEstimateAbort(val.Index()) | ||
| vi.WriteAbort(occtypes.NewEstimateAbort(val.Index())) | ||
| } | ||
|
|
||
| // if we have a deleted value, return nil | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,14 +2,18 @@ package multiversion | |
|
|
||
| import ( | ||
| "bytes" | ||
| "runtime/debug" | ||
| "sort" | ||
| "sync" | ||
|
|
||
| "github.com/sei-protocol/sei-chain/sei-cosmos/store/types" | ||
| "github.com/sei-protocol/sei-chain/sei-cosmos/types/occ" | ||
| "github.com/sei-protocol/seilog" | ||
| db "github.com/tendermint/tm-db" | ||
| ) | ||
|
|
||
| var logger = seilog.NewLogger("cosmos", "store", "multiversion") | ||
|
|
||
| type MultiVersionStore interface { | ||
| GetLatest(key []byte) (value MultiVersionValueItem) | ||
| GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem) | ||
|
|
@@ -269,51 +273,104 @@ func (s *Store) validateIterator(index int, tracker iterationTracker) bool { | |
| validChannel := make(chan bool, 1) | ||
| abortChannel := make(chan occ.Abort, 1) | ||
|
|
||
| // listen for abort while iterating | ||
| // Run validation in a goroutine so unexpected iterator panics can be | ||
| // contained and converted into validation failure. | ||
| go func(iterationTracker iterationTracker, items *db.MemDB, returnChan chan bool, abortChan chan occ.Abort) { | ||
| var parentIter types.Iterator | ||
| expectedKeys := iterationTracker.iteratedKeys | ||
| foundKeys := 0 | ||
| iter := s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, items, iterationTracker.ascending, iterationTracker.writeset, abortChan) | ||
| if iterationTracker.ascending { | ||
| parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) | ||
| } else { | ||
| parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) | ||
| } | ||
| // create a new MVSMergeiterator | ||
| mergeIterator := NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) | ||
| defer func() { _ = mergeIterator.Close() }() | ||
| for ; mergeIterator.Valid(); mergeIterator.Next() { | ||
| if (len(expectedKeys) - foundKeys) == 0 { | ||
| // if we have no more expected keys, then the iterator is invalid | ||
| returnChan <- false | ||
| return | ||
| } | ||
| key := mergeIterator.Key() | ||
| // TODO: is this ok to not delete the key since we shouldnt have duplicate keys? | ||
| if _, ok := expectedKeys[string(key)]; !ok { | ||
| // if key isn't found | ||
| returnChan <- false | ||
| return | ||
| valid := false | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| logger.Error( | ||
| "panic during multiversion iterator validation", | ||
| "panic", r, | ||
| "tx_index", index, | ||
| "start", iterationTracker.startKey, | ||
| "end", iterationTracker.endKey, | ||
| "ascending", iterationTracker.ascending, | ||
| "stack", string(debug.Stack()), | ||
| ) | ||
| } | ||
| // remove from expected keys | ||
| foundKeys += 1 | ||
| // delete(expectedKeys, string(key)) | ||
| returnChan <- valid | ||
| }() | ||
|
|
||
| // if our iterator key was the early stop, then we can break | ||
| if bytes.Equal(key, iterationTracker.earlyStopKey) { | ||
| break | ||
| } | ||
| } | ||
| // return whether we found the exact number of expected keys | ||
| returnChan <- foundKeys >= len(expectedKeys) | ||
| valid = s.validateIteratorReplay(index, iterationTracker, items, abortChan) | ||
| }(tracker, sortedItems, validChannel, abortChannel) | ||
|
|
||
| return <-validChannel | ||
| } | ||
|
|
||
| func (s *Store) validateIteratorReplay(index int, iterationTracker iterationTracker, items *db.MemDB, abortChan chan occ.Abort) bool { | ||
| var parentIter types.Iterator | ||
| var iter types.Iterator | ||
| var mergeIterator *mvsMergeIterator | ||
| expectedKeys := iterationTracker.iteratedKeys | ||
| foundKeys := 0 | ||
| defer func() { | ||
| if mergeIterator != nil { | ||
| _ = mergeIterator.Close() | ||
| return | ||
| } | ||
| if parentIter != nil { | ||
| _ = parentIter.Close() | ||
| } | ||
| if iter != nil { | ||
| _ = iter.Close() | ||
| } | ||
| }() | ||
|
|
||
| if iterationTracker.ascending { | ||
| parentIter = s.parentStore.Iterator(iterationTracker.startKey, iterationTracker.endKey) | ||
| } else { | ||
| parentIter = s.parentStore.ReverseIterator(iterationTracker.startKey, iterationTracker.endKey) | ||
| } | ||
| iter = s.newMVSValidationIterator(index, iterationTracker.startKey, iterationTracker.endKey, items, iterationTracker.ascending, iterationTracker.writeset, abortChan) | ||
| // create a new MVSMergeiterator | ||
| mergeIterator = NewMVSMergeIterator(parentIter, iter, iterationTracker.ascending, NoOpHandler{}) | ||
| for { | ||
| if iteratorValidationAborted(abortChan) { | ||
| return false | ||
| } | ||
| if !mergeIterator.Valid() { | ||
| break | ||
| } | ||
| if iteratorValidationAborted(abortChan) { | ||
| return false | ||
| } | ||
| if (len(expectedKeys) - foundKeys) == 0 { | ||
| // if we have no more expected keys, then the iterator is invalid | ||
| return false | ||
| } | ||
| key := mergeIterator.Key() | ||
| if iteratorValidationAborted(abortChan) { | ||
| return false | ||
| } | ||
| // TODO: is this ok to not delete the key since we shouldnt have duplicate keys? | ||
| if _, ok := expectedKeys[string(key)]; !ok { | ||
| // if key isn't found | ||
| return false | ||
| } | ||
| // remove from expected keys | ||
| foundKeys += 1 | ||
| // delete(expectedKeys, string(key)) | ||
|
|
||
| // if our iterator key was the early stop, then we can break | ||
| if bytes.Equal(key, iterationTracker.earlyStopKey) { | ||
| break | ||
| } | ||
| mergeIterator.Next() | ||
| } | ||
| if iteratorValidationAborted(abortChan) { | ||
| return false | ||
| } | ||
| // return whether we found the exact number of expected keys | ||
| return foundKeys >= len(expectedKeys) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] |
||
| } | ||
|
|
||
| func iteratorValidationAborted(abortChan <-chan occ.Abort) bool { | ||
| select { | ||
| case <-abortChannel: | ||
| // if we get an abort, then we know that the iterator is invalid | ||
| case <-abortChan: | ||
| return true | ||
| default: | ||
| return false | ||
| case valid := <-validChannel: | ||
| return valid | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Good fix: the non-blocking WriteAbort() prevents the validation goroutine from blocking when multiple estimates are encountered against the size-1 buffered abort channel (previously a direct
vi.abortChannel <- ...could block once a prior abort was buffered). The added regression test TestMVSIteratorValidationMultipleEstimatesDoNotBlock covers this.