Skip to content

Commit a954a67

Browse files
committed
add tests and other improvements
1 parent 586392c commit a954a67

File tree

6 files changed

+218
-129
lines changed

6 files changed

+218
-129
lines changed

ingest/change.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,13 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
154154
ChangeType: entryChange.Type,
155155
})
156156
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
157+
// Update entries always have a previous state entry [state, updated]
158+
// except for contract entries that are restored and updated within the same
159+
// transaction, which appear as [restored, updated]
160+
// For details, see https://github.com/stellar/stellar-protocol/blob/master/core/cap-0062.md
157161
updated := entryChange.MustUpdated()
158-
var state xdr.LedgerEntry
159-
if _, ok := ledgerEntryChanges[i-1].GetState(); ok {
160-
state = ledgerEntryChanges[i-1].MustState()
161-
} else {
162+
state, ok := ledgerEntryChanges[i-1].GetState()
163+
if !ok {
162164
state = ledgerEntryChanges[i-1].MustRestored()
163165
}
164166
changes = append(changes, Change{
@@ -168,10 +170,12 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
168170
ChangeType: entryChange.Type,
169171
})
170172
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
171-
var state xdr.LedgerEntry
172-
if _, ok := ledgerEntryChanges[i-1].GetState(); ok {
173-
state = ledgerEntryChanges[i-1].MustState()
174-
} else {
173+
// Removed entries always have an associated state entry [state, updated]
174+
// except for contract entries that are restored and removed within the same
175+
// transaction, which appear as [restored, removed]
176+
// For details, see https://github.com/stellar/stellar-protocol/blob/master/core/cap-0062.md
177+
state, ok := ledgerEntryChanges[i-1].GetState()
178+
if !ok {
175179
state = ledgerEntryChanges[i-1].MustRestored()
176180
}
177181
changes = append(changes, Change{

ingest/change_compactor.go

+60-53
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ import (
4444
// b. UPDATED we simply update it with the new value.
4545
// c. REMOVED it means that at this point in the ledger the entry is removed
4646
// so updating it returns an error.
47-
// d. RESTORED we update it with the new value but keep the change type as RESTORED.
47+
// d. RESTORED we update it with the new value but keep the change type as
48+
// RESTORED.
4849
// 3. If the change is REMOVE it checks if any change connected to given entry
4950
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
5051
// existing change is:
@@ -56,26 +57,42 @@ import (
5657
// change means the entry exists in a DB.
5758
// c. REMOVED it returns error because we can't remove an entry that was
5859
// already removed.
59-
// d. RESTORED it means the entry doesn't exist in the DB, so it's a noop
60-
// so remove the entry from the cache.
61-
// 4. If the change is RESTORED it checks if any change related to the given entry
62-
// already exists in the cache. If not, it adds the RESTORED change. Otherwise,
63-
// returns an error since restoration is only possible for previously archived/evicted
64-
// entries. If the entry was created, updated or removed within the same ledger, restoration
65-
// is not possible.
60+
// d. RESTORED depending on the change compactor's configuration, we may or
61+
// may not emit a REMOVE change type for an entry that was restored earlier
62+
// in the ledger.
63+
// 4. If the change is RESTORED it checks if any change related to the given
64+
// entry already exists in the cache. If not, it adds the RESTORED change.
65+
// Otherwise, it returns an error because only expired entries can be
66+
// restored. If the entry was created, updated or removed in the same
67+
// ledger, it's not expired.
6668
type ChangeCompactor struct {
6769
// ledger key => Change
68-
cache map[string]Change
69-
encodingBuffer *xdr.EncodingBuffer
70-
emitExpiredEntriesRemovedChange bool
70+
cache map[string]Change
71+
encodingBuffer *xdr.EncodingBuffer
72+
config *ChangeCompactorConfig
73+
}
74+
75+
type ChangeCompactorConfig struct {
76+
// Determines whether the change compactor emits a REMOVED change when an expired entry
77+
// is restored and then removed within the same ledger.
78+
// If set to true, a REMOVED change is emitted; if false, REMOVED change is suppressed.
79+
EmitExpiredEntryRemovedChange bool
80+
}
81+
82+
func NewChangeCompactorDefaultConfig() *ChangeCompactorConfig {
83+
return &ChangeCompactorConfig{
84+
// By default, set it to true to enable the change compactor
85+
// to emit REMOVED change for expired entries.
86+
EmitExpiredEntryRemovedChange: true,
87+
}
7188
}
7289

7390
// NewChangeCompactor returns a new ChangeCompactor.
74-
func NewChangeCompactor(emitExpiredEntriesRemovedChange bool) *ChangeCompactor {
91+
func NewChangeCompactor(config *ChangeCompactorConfig) *ChangeCompactor {
7592
return &ChangeCompactor{
76-
cache: make(map[string]Change),
77-
encodingBuffer: xdr.NewEncodingBuffer(),
78-
emitExpiredEntriesRemovedChange: emitExpiredEntriesRemovedChange,
93+
cache: make(map[string]Change),
94+
encodingBuffer: xdr.NewEncodingBuffer(),
95+
config: config,
7996
}
8097
}
8198

@@ -104,16 +121,10 @@ func (c *ChangeCompactor) AddChange(change Change) error {
104121
// addCreatedChange adds a change to the cache, but returns an error if create
105122
// change is unexpected.
106123
func (c *ChangeCompactor) addCreatedChange(change Change) error {
107-
// safe, since we later cast to string (causing a copy)
108-
key, err := change.Post.LedgerKey()
124+
ledgerKey, err := c.getLedgerKey(change.Post)
109125
if err != nil {
110-
return errors.Wrap(err, "error getting ledger key for new entry")
126+
return err
111127
}
112-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
113-
if err != nil {
114-
return errors.Wrap(err, "error marshaling ledger key for new entry")
115-
}
116-
117128
ledgerKeyString := string(ledgerKey)
118129

119130
existingChange, exist := c.cache[ledgerKeyString]
@@ -136,7 +147,7 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
136147
// If existing type is removed it means that this entry does exist
137148
// in a DB so we update entry change.
138149
c.cache[ledgerKeyString] = Change{
139-
Type: key.Type,
150+
Type: change.Type,
140151
Pre: existingChange.Pre,
141152
Post: change.Post,
142153
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
@@ -148,19 +159,26 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
148159
return nil
149160
}
150161

151-
// addUpdatedChange adds a change to the cache, but returns an error if update
152-
// change is unexpected.
153-
func (c *ChangeCompactor) addUpdatedChange(change Change) error {
162+
func (c *ChangeCompactor) getLedgerKey(ledgerEntry *xdr.LedgerEntry) ([]byte, error) {
154163
// safe, since we later cast to string (causing a copy)
155-
key, err := change.Post.LedgerKey()
164+
key, err := ledgerEntry.LedgerKey()
156165
if err != nil {
157-
return errors.Wrap(err, "error getting ledger key for updated entry")
166+
return nil, errors.Wrap(err, "error getting ledger key for new entry")
158167
}
159168
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
160169
if err != nil {
161-
return errors.Wrap(err, "error marshaling ledger key for updated entry")
170+
return nil, errors.Wrap(err, "error marshaling ledger key for new entry")
162171
}
172+
return ledgerKey, nil
173+
}
163174

175+
// addUpdatedChange adds a change to the cache, but returns an error if update
176+
// change is unexpected.
177+
func (c *ChangeCompactor) addUpdatedChange(change Change) error {
178+
ledgerKey, err := c.getLedgerKey(change.Post)
179+
if err != nil {
180+
return err
181+
}
164182
ledgerKeyString := string(ledgerKey)
165183

166184
existingChange, exist := c.cache[ledgerKeyString]
@@ -174,15 +192,16 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
174192
// If existing type is created it means that this entry does not
175193
// exist in a DB so we update entry change.
176194
c.cache[ledgerKeyString] = Change{
177-
Type: key.Type,
178-
Pre: existingChange.Pre, // = nil
179-
Post: change.Post,
195+
Type: change.Type,
196+
Pre: existingChange.Pre, // = nil
197+
Post: change.Post,
198+
ChangeType: existingChange.ChangeType,
180199
}
181200
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
182201
fallthrough
183202
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
184203
c.cache[ledgerKeyString] = Change{
185-
Type: key.Type,
204+
Type: change.Type,
186205
Pre: existingChange.Pre,
187206
Post: change.Post,
188207
ChangeType: existingChange.ChangeType, //keep the existing change type
@@ -202,16 +221,10 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
202221
// addRemovedChange adds a change to the cache, but returns an error if remove
203222
// change is unexpected.
204223
func (c *ChangeCompactor) addRemovedChange(change Change) error {
205-
// safe, since we later cast to string (causing a copy)
206-
key, err := change.Pre.LedgerKey()
224+
ledgerKey, err := c.getLedgerKey(change.Pre)
207225
if err != nil {
208-
return errors.Wrap(err, "error getting ledger key for removed entry")
226+
return err
209227
}
210-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
211-
if err != nil {
212-
return errors.Wrap(err, "error marshaling ledger key for removed entry")
213-
}
214-
215228
ledgerKeyString := string(ledgerKey)
216229

217230
existingChange, exist := c.cache[ledgerKeyString]
@@ -227,7 +240,7 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
227240
delete(c.cache, ledgerKeyString)
228241
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
229242
c.cache[ledgerKeyString] = Change{
230-
Type: key.Type,
243+
Type: change.Type,
231244
Pre: existingChange.Pre,
232245
Post: nil,
233246
ChangeType: change.ChangeType,
@@ -238,9 +251,9 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
238251
base64.StdEncoding.EncodeToString(ledgerKey),
239252
))
240253
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
241-
if c.emitExpiredEntriesRemovedChange {
254+
if c.config.EmitExpiredEntryRemovedChange {
242255
c.cache[ledgerKeyString] = Change{
243-
Type: key.Type,
256+
Type: change.Type,
244257
Pre: change.Pre,
245258
Post: nil,
246259
ChangeType: change.ChangeType,
@@ -259,16 +272,10 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
259272
// addRestoredChange adds a change to the cache, but returns an error if the restore
260273
// change is unexpected.
261274
func (c *ChangeCompactor) addRestoredChange(change Change) error {
262-
// safe, since we later cast to string (causing a copy)
263-
key, err := change.Post.LedgerKey()
275+
ledgerKey, err := c.getLedgerKey(change.Post)
264276
if err != nil {
265-
return errors.Wrap(err, "error getting ledger key for updated entry")
277+
return err
266278
}
267-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
268-
if err != nil {
269-
return errors.Wrap(err, "error marshaling ledger key for updated entry")
270-
}
271-
272279
ledgerKeyString := string(ledgerKey)
273280

274281
if _, exist := c.cache[ledgerKeyString]; exist {

ingest/change_compactor_test.go

+15-19
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type TestChangeCompactorExistingCreatedSuite struct {
2323
}
2424

2525
func (s *TestChangeCompactorExistingCreatedSuite) SetupTest() {
26-
s.cache = NewChangeCompactor(true)
26+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
2727

2828
change := Change{
2929
Type: xdr.LedgerEntryTypeAccount,
@@ -148,7 +148,7 @@ type TestChangeCompactorExistingUpdatedSuite struct {
148148
}
149149

150150
func (s *TestChangeCompactorExistingUpdatedSuite) SetupTest() {
151-
s.cache = NewChangeCompactor(true)
151+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
152152

153153
change := Change{
154154
Type: xdr.LedgerEntryTypeAccount,
@@ -283,7 +283,7 @@ type TestChangeCompactorExistingRemovedSuite struct {
283283
}
284284

285285
func (s *TestChangeCompactorExistingRemovedSuite) SetupTest() {
286-
s.cache = NewChangeCompactor(true)
286+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
287287

288288
change := Change{
289289
Type: xdr.LedgerEntryTypeAccount,
@@ -400,19 +400,24 @@ func (s *TestChangeCompactorExistingRemovedSuite) TestChangeRestored() {
400400
}
401401

402402
func TestChangeCompactorExistingRestored(t *testing.T) {
403-
suite.Run(t, new(TestChangeCompactorExistingRestoredSuite))
403+
for _, emitExpiredRemoved := range []bool{true, false} {
404+
s := new(TestChangeCompactorExistingRestoredSuite)
405+
s.emitExpiredEntryRemovedChange = emitExpiredRemoved
406+
suite.Run(t, s)
407+
}
404408
}
405409

406410
// TestChangeCompactorExistingRestoredSuite tests transitions from existing
407411
// RESTORED state in the cache.
408412
type TestChangeCompactorExistingRestoredSuite struct {
409413
suite.Suite
410-
cache *ChangeCompactor
411-
contractDataEntry xdr.LedgerEntry
414+
cache *ChangeCompactor
415+
contractDataEntry xdr.LedgerEntry
416+
emitExpiredEntryRemovedChange bool
412417
}
413418

414-
func (s *TestChangeCompactorExistingRestoredSuite) SetupWithEmitRemovalFlag(emitRemoval bool) {
415-
s.cache = NewChangeCompactor(emitRemoval)
419+
func (s *TestChangeCompactorExistingRestoredSuite) SetupTest() {
420+
s.cache = NewChangeCompactor(&ChangeCompactorConfig{EmitExpiredEntryRemovedChange: s.emitExpiredEntryRemovedChange})
416421
val := true
417422
s.contractDataEntry = xdr.LedgerEntry{
418423
LastModifiedLedgerSeq: 1,
@@ -443,14 +448,6 @@ func (s *TestChangeCompactorExistingRestoredSuite) SetupWithEmitRemovalFlag(emit
443448
s.Assert().EqualValues(&s.contractDataEntry, changes[0].Post)
444449
}
445450

446-
func (s *TestChangeCompactorExistingRestoredSuite) SetupTest() {
447-
for _, flag := range []bool{true, false} {
448-
s.Run(fmt.Sprintf("EmitExpiredEntriesRemovedChange_%v", flag), func() {
449-
s.SetupWithEmitRemovalFlag(flag)
450-
})
451-
}
452-
}
453-
454451
func (s *TestChangeCompactorExistingRestoredSuite) getLedgerKeyString(entry *xdr.LedgerEntry) string {
455452
lk, err := entry.LedgerKey()
456453
s.Require().NoError(err)
@@ -502,14 +499,13 @@ func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRemoved() {
502499
s.Assert().NoError(s.cache.AddChange(change))
503500
changes := s.cache.GetChanges()
504501

505-
if s.cache.emitExpiredEntriesRemovedChange {
502+
if s.cache.config.EmitExpiredEntryRemovedChange {
506503
s.Assert().Len(changes, 1)
507504
s.Assert().Equal(xdr.LedgerEntryChangeTypeLedgerEntryRemoved, changes[0].ChangeType)
508505
s.Assert().EqualValues(&s.contractDataEntry, changes[0].Pre)
509506
} else {
510507
s.Assert().Len(changes, 0)
511508
}
512-
513509
}
514510

515511
func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRestored() {
@@ -534,7 +530,7 @@ func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRestored() {
534530
// GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ sends money
535531
// GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML receives money
536532
func TestChangeCompactorSquashMultiplePayments(t *testing.T) {
537-
cache := NewChangeCompactor(true)
533+
cache := NewChangeCompactor(NewChangeCompactorDefaultConfig())
538534

539535
for i := 1; i <= 1000; i++ {
540536
change := Change{

0 commit comments

Comments
 (0)