Skip to content

Commit 25c51fa

Browse files
committed
add tests and other improvements
1 parent 586392c commit 25c51fa

File tree

6 files changed

+213
-129
lines changed

6 files changed

+213
-129
lines changed

ingest/change.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,13 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
154154
ChangeType: entryChange.Type,
155155
})
156156
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
157+
// Update entries always have a previous state entry [state, updated]
158+
// except for contract entries that are restored and updated within the same
159+
// transaction, which appear as [restored, updated]
160+
// For details, see https://github.com/stellar/stellar-protocol/blob/master/core/cap-0062.md
157161
updated := entryChange.MustUpdated()
158-
var state xdr.LedgerEntry
159-
if _, ok := ledgerEntryChanges[i-1].GetState(); ok {
160-
state = ledgerEntryChanges[i-1].MustState()
161-
} else {
162+
state, ok := ledgerEntryChanges[i-1].GetState()
163+
if !ok {
162164
state = ledgerEntryChanges[i-1].MustRestored()
163165
}
164166
changes = append(changes, Change{
@@ -168,10 +170,12 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
168170
ChangeType: entryChange.Type,
169171
})
170172
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
171-
var state xdr.LedgerEntry
172-
if _, ok := ledgerEntryChanges[i-1].GetState(); ok {
173-
state = ledgerEntryChanges[i-1].MustState()
174-
} else {
173+
// Removed entries always have an associated state entry [state, updated]
174+
// except for contract entries that are restored and removed within the same
175+
// transaction, which appear as [restored, removed]
176+
// For details, see https://github.com/stellar/stellar-protocol/blob/master/core/cap-0062.md
177+
state, ok := ledgerEntryChanges[i-1].GetState()
178+
if !ok {
175179
state = ledgerEntryChanges[i-1].MustRestored()
176180
}
177181
changes = append(changes, Change{

ingest/change_compactor.go

+55-53
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ import (
4444
// b. UPDATED we simply update it with the new value.
4545
// c. REMOVED it means that at this point in the ledger the entry is removed
4646
// so updating it returns an error.
47-
// d. RESTORED we update it with the new value but keep the change type as RESTORED.
47+
// d. RESTORED we update it with the new value but keep the change type as
48+
// RESTORED.
4849
// 3. If the change is REMOVE it checks if any change connected to given entry
4950
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
5051
// existing change is:
@@ -56,26 +57,37 @@ import (
5657
// change means the entry exists in a DB.
5758
// c. REMOVED it returns error because we can't remove an entry that was
5859
// already removed.
59-
// d. RESTORED it means the entry doesn't exist in the DB, so it's a noop
60-
// so remove the entry from the cache.
61-
// 4. If the change is RESTORED it checks if any change related to the given entry
62-
// already exists in the cache. If not, it adds the RESTORED change. Otherwise,
63-
// returns an error since restoration is only possible for previously archived/evicted
64-
// entries. If the entry was created, updated or removed within the same ledger, restoration
65-
// is not possible.
60+
// d. RESTORED depending on the change compactor's configuration, we may or
61+
// may not emit a REMOVE change type for an entry that was restored earlier
62+
// in the ledger.
63+
// 4. If the change is RESTORED it checks if any change related to the given
64+
// entry already exists in the cache. If not, it adds the RESTORED change.
65+
// Otherwise, it returns an error because only expired entries can be
66+
// restored. If the entry was created, updated or removed in the same
67+
// ledger, it's not expired.
6668
type ChangeCompactor struct {
6769
// ledger key => Change
68-
cache map[string]Change
69-
encodingBuffer *xdr.EncodingBuffer
70-
emitExpiredEntriesRemovedChange bool
70+
cache map[string]Change
71+
encodingBuffer *xdr.EncodingBuffer
72+
config *ChangeCompactorConfig
73+
}
74+
75+
type ChangeCompactorConfig struct {
76+
EmitExpiredEntryRemovedChange bool
77+
}
78+
79+
func NewChangeCompactorDefaultConfig() *ChangeCompactorConfig {
80+
return &ChangeCompactorConfig{
81+
EmitExpiredEntryRemovedChange: true,
82+
}
7183
}
7284

7385
// NewChangeCompactor returns a new ChangeCompactor.
74-
func NewChangeCompactor(emitExpiredEntriesRemovedChange bool) *ChangeCompactor {
86+
func NewChangeCompactor(config *ChangeCompactorConfig) *ChangeCompactor {
7587
return &ChangeCompactor{
76-
cache: make(map[string]Change),
77-
encodingBuffer: xdr.NewEncodingBuffer(),
78-
emitExpiredEntriesRemovedChange: emitExpiredEntriesRemovedChange,
88+
cache: make(map[string]Change),
89+
encodingBuffer: xdr.NewEncodingBuffer(),
90+
config: config,
7991
}
8092
}
8193

@@ -104,16 +116,10 @@ func (c *ChangeCompactor) AddChange(change Change) error {
104116
// addCreatedChange adds a change to the cache, but returns an error if create
105117
// change is unexpected.
106118
func (c *ChangeCompactor) addCreatedChange(change Change) error {
107-
// safe, since we later cast to string (causing a copy)
108-
key, err := change.Post.LedgerKey()
119+
ledgerKey, err := c.getLedgerKey(change.Post)
109120
if err != nil {
110-
return errors.Wrap(err, "error getting ledger key for new entry")
121+
return err
111122
}
112-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
113-
if err != nil {
114-
return errors.Wrap(err, "error marshaling ledger key for new entry")
115-
}
116-
117123
ledgerKeyString := string(ledgerKey)
118124

119125
existingChange, exist := c.cache[ledgerKeyString]
@@ -136,7 +142,7 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
136142
// If existing type is removed it means that this entry does exist
137143
// in a DB so we update entry change.
138144
c.cache[ledgerKeyString] = Change{
139-
Type: key.Type,
145+
Type: change.Type,
140146
Pre: existingChange.Pre,
141147
Post: change.Post,
142148
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
@@ -148,19 +154,26 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
148154
return nil
149155
}
150156

151-
// addUpdatedChange adds a change to the cache, but returns an error if update
152-
// change is unexpected.
153-
func (c *ChangeCompactor) addUpdatedChange(change Change) error {
157+
func (c *ChangeCompactor) getLedgerKey(ledgerEntry *xdr.LedgerEntry) ([]byte, error) {
154158
// safe, since we later cast to string (causing a copy)
155-
key, err := change.Post.LedgerKey()
159+
key, err := ledgerEntry.LedgerKey()
156160
if err != nil {
157-
return errors.Wrap(err, "error getting ledger key for updated entry")
161+
return nil, errors.Wrap(err, "error getting ledger key for new entry")
158162
}
159163
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
160164
if err != nil {
161-
return errors.Wrap(err, "error marshaling ledger key for updated entry")
165+
return nil, errors.Wrap(err, "error marshaling ledger key for new entry")
162166
}
167+
return ledgerKey, nil
168+
}
163169

170+
// addUpdatedChange adds a change to the cache, but returns an error if update
171+
// change is unexpected.
172+
func (c *ChangeCompactor) addUpdatedChange(change Change) error {
173+
ledgerKey, err := c.getLedgerKey(change.Post)
174+
if err != nil {
175+
return err
176+
}
164177
ledgerKeyString := string(ledgerKey)
165178

166179
existingChange, exist := c.cache[ledgerKeyString]
@@ -174,15 +187,16 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
174187
// If existing type is created it means that this entry does not
175188
// exist in a DB so we update entry change.
176189
c.cache[ledgerKeyString] = Change{
177-
Type: key.Type,
178-
Pre: existingChange.Pre, // = nil
179-
Post: change.Post,
190+
Type: change.Type,
191+
Pre: existingChange.Pre, // = nil
192+
Post: change.Post,
193+
ChangeType: existingChange.ChangeType,
180194
}
181195
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
182196
fallthrough
183197
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
184198
c.cache[ledgerKeyString] = Change{
185-
Type: key.Type,
199+
Type: change.Type,
186200
Pre: existingChange.Pre,
187201
Post: change.Post,
188202
ChangeType: existingChange.ChangeType, //keep the existing change type
@@ -202,16 +216,10 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
202216
// addRemovedChange adds a change to the cache, but returns an error if remove
203217
// change is unexpected.
204218
func (c *ChangeCompactor) addRemovedChange(change Change) error {
205-
// safe, since we later cast to string (causing a copy)
206-
key, err := change.Pre.LedgerKey()
219+
ledgerKey, err := c.getLedgerKey(change.Pre)
207220
if err != nil {
208-
return errors.Wrap(err, "error getting ledger key for removed entry")
221+
return err
209222
}
210-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
211-
if err != nil {
212-
return errors.Wrap(err, "error marshaling ledger key for removed entry")
213-
}
214-
215223
ledgerKeyString := string(ledgerKey)
216224

217225
existingChange, exist := c.cache[ledgerKeyString]
@@ -227,7 +235,7 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
227235
delete(c.cache, ledgerKeyString)
228236
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
229237
c.cache[ledgerKeyString] = Change{
230-
Type: key.Type,
238+
Type: change.Type,
231239
Pre: existingChange.Pre,
232240
Post: nil,
233241
ChangeType: change.ChangeType,
@@ -238,9 +246,9 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
238246
base64.StdEncoding.EncodeToString(ledgerKey),
239247
))
240248
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
241-
if c.emitExpiredEntriesRemovedChange {
249+
if c.config.EmitExpiredEntryRemovedChange {
242250
c.cache[ledgerKeyString] = Change{
243-
Type: key.Type,
251+
Type: change.Type,
244252
Pre: change.Pre,
245253
Post: nil,
246254
ChangeType: change.ChangeType,
@@ -259,16 +267,10 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
259267
// addRestoredChange adds a change to the cache, but returns an error if the restore
260268
// change is unexpected.
261269
func (c *ChangeCompactor) addRestoredChange(change Change) error {
262-
// safe, since we later cast to string (causing a copy)
263-
key, err := change.Post.LedgerKey()
270+
ledgerKey, err := c.getLedgerKey(change.Post)
264271
if err != nil {
265-
return errors.Wrap(err, "error getting ledger key for updated entry")
272+
return err
266273
}
267-
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
268-
if err != nil {
269-
return errors.Wrap(err, "error marshaling ledger key for updated entry")
270-
}
271-
272274
ledgerKeyString := string(ledgerKey)
273275

274276
if _, exist := c.cache[ledgerKeyString]; exist {

ingest/change_compactor_test.go

+15-19
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type TestChangeCompactorExistingCreatedSuite struct {
2323
}
2424

2525
func (s *TestChangeCompactorExistingCreatedSuite) SetupTest() {
26-
s.cache = NewChangeCompactor(true)
26+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
2727

2828
change := Change{
2929
Type: xdr.LedgerEntryTypeAccount,
@@ -148,7 +148,7 @@ type TestChangeCompactorExistingUpdatedSuite struct {
148148
}
149149

150150
func (s *TestChangeCompactorExistingUpdatedSuite) SetupTest() {
151-
s.cache = NewChangeCompactor(true)
151+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
152152

153153
change := Change{
154154
Type: xdr.LedgerEntryTypeAccount,
@@ -283,7 +283,7 @@ type TestChangeCompactorExistingRemovedSuite struct {
283283
}
284284

285285
func (s *TestChangeCompactorExistingRemovedSuite) SetupTest() {
286-
s.cache = NewChangeCompactor(true)
286+
s.cache = NewChangeCompactor(NewChangeCompactorDefaultConfig())
287287

288288
change := Change{
289289
Type: xdr.LedgerEntryTypeAccount,
@@ -400,19 +400,24 @@ func (s *TestChangeCompactorExistingRemovedSuite) TestChangeRestored() {
400400
}
401401

402402
func TestChangeCompactorExistingRestored(t *testing.T) {
403-
suite.Run(t, new(TestChangeCompactorExistingRestoredSuite))
403+
for _, emitExpiredRemoved := range []bool{true, false} {
404+
s := new(TestChangeCompactorExistingRestoredSuite)
405+
s.emitExpiredEntryRemovedChange = emitExpiredRemoved
406+
suite.Run(t, s)
407+
}
404408
}
405409

406410
// TestChangeCompactorExistingRestoredSuite tests transitions from existing
407411
// RESTORED state in the cache.
408412
type TestChangeCompactorExistingRestoredSuite struct {
409413
suite.Suite
410-
cache *ChangeCompactor
411-
contractDataEntry xdr.LedgerEntry
414+
cache *ChangeCompactor
415+
contractDataEntry xdr.LedgerEntry
416+
emitExpiredEntryRemovedChange bool
412417
}
413418

414-
func (s *TestChangeCompactorExistingRestoredSuite) SetupWithEmitRemovalFlag(emitRemoval bool) {
415-
s.cache = NewChangeCompactor(emitRemoval)
419+
func (s *TestChangeCompactorExistingRestoredSuite) SetupTest() {
420+
s.cache = NewChangeCompactor(&ChangeCompactorConfig{EmitExpiredEntryRemovedChange: s.emitExpiredEntryRemovedChange})
416421
val := true
417422
s.contractDataEntry = xdr.LedgerEntry{
418423
LastModifiedLedgerSeq: 1,
@@ -443,14 +448,6 @@ func (s *TestChangeCompactorExistingRestoredSuite) SetupWithEmitRemovalFlag(emit
443448
s.Assert().EqualValues(&s.contractDataEntry, changes[0].Post)
444449
}
445450

446-
func (s *TestChangeCompactorExistingRestoredSuite) SetupTest() {
447-
for _, flag := range []bool{true, false} {
448-
s.Run(fmt.Sprintf("EmitExpiredEntriesRemovedChange_%v", flag), func() {
449-
s.SetupWithEmitRemovalFlag(flag)
450-
})
451-
}
452-
}
453-
454451
func (s *TestChangeCompactorExistingRestoredSuite) getLedgerKeyString(entry *xdr.LedgerEntry) string {
455452
lk, err := entry.LedgerKey()
456453
s.Require().NoError(err)
@@ -502,14 +499,13 @@ func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRemoved() {
502499
s.Assert().NoError(s.cache.AddChange(change))
503500
changes := s.cache.GetChanges()
504501

505-
if s.cache.emitExpiredEntriesRemovedChange {
502+
if s.cache.config.EmitExpiredEntryRemovedChange {
506503
s.Assert().Len(changes, 1)
507504
s.Assert().Equal(xdr.LedgerEntryChangeTypeLedgerEntryRemoved, changes[0].ChangeType)
508505
s.Assert().EqualValues(&s.contractDataEntry, changes[0].Pre)
509506
} else {
510507
s.Assert().Len(changes, 0)
511508
}
512-
513509
}
514510

515511
func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRestored() {
@@ -534,7 +530,7 @@ func (s *TestChangeCompactorExistingRestoredSuite) TestChangeRestored() {
534530
// GAJ2T6NQ6TDZRVRSNWM3JC7L3TG4H7UBCVK3GUHKP3TQ5NQ3LM4JGBTJ sends money
535531
// GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML receives money
536532
func TestChangeCompactorSquashMultiplePayments(t *testing.T) {
537-
cache := NewChangeCompactor(true)
533+
cache := NewChangeCompactor(NewChangeCompactorDefaultConfig())
538534

539535
for i := 1; i <= 1000; i++ {
540536
change := Change{

0 commit comments

Comments
 (0)