Skip to content

Commit e443da6

Browse files
committed
ingest: Add support for ledger entry change type "restore"
1 parent fd2ebb0 commit e443da6

7 files changed

+612
-113
lines changed

ingest/change.go

+36-19
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type Change struct {
4242
// The type of the ledger entry being changed.
4343
Type xdr.LedgerEntryType
4444

45+
ChangeType xdr.LedgerEntryChangeType
46+
4547
// The state of the LedgerEntry before the change. This will be nil if the entry was created.
4648
Pre *xdr.LedgerEntry
4749

@@ -144,25 +146,49 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
144146
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
145147
created := entryChange.MustCreated()
146148
changes = append(changes, Change{
147-
Type: created.Data.Type,
148-
Pre: nil,
149-
Post: &created,
149+
Type: created.Data.Type,
150+
Pre: nil,
151+
Post: &created,
152+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryCreated,
150153
})
151154
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
152155
state := ledgerEntryChanges[i-1].MustState()
153156
updated := entryChange.MustUpdated()
154157
changes = append(changes, Change{
155-
Type: state.Data.Type,
156-
Pre: &state,
157-
Post: &updated,
158+
Type: state.Data.Type,
159+
Pre: &state,
160+
Post: &updated,
161+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
158162
})
159163
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
160164
state := ledgerEntryChanges[i-1].MustState()
161165
changes = append(changes, Change{
162-
Type: state.Data.Type,
163-
Pre: &state,
164-
Post: nil,
166+
Type: state.Data.Type,
167+
Pre: &state,
168+
Post: nil,
169+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryRemoved,
170+
})
171+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
172+
restored := entryChange.MustRestored()
173+
if i > 0 {
174+
if state, ok := ledgerEntryChanges[i-1].GetState(); ok {
175+
changes = append(changes, Change{
176+
Type: restored.Data.Type,
177+
Pre: &state,
178+
Post: &restored,
179+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryRestored,
180+
})
181+
continue
182+
}
183+
}
184+
185+
changes = append(changes, Change{
186+
Type: restored.Data.Type,
187+
Pre: nil,
188+
Post: &restored,
189+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryRestored,
165190
})
191+
166192
case xdr.LedgerEntryChangeTypeLedgerEntryState:
167193
continue
168194
default:
@@ -223,16 +249,7 @@ func sortChanges(changes []Change) {
223249

224250
// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
225251
func (c Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
226-
switch {
227-
case c.Pre == nil && c.Post != nil:
228-
return xdr.LedgerEntryChangeTypeLedgerEntryCreated
229-
case c.Pre != nil && c.Post == nil:
230-
return xdr.LedgerEntryChangeTypeLedgerEntryRemoved
231-
case c.Pre != nil && c.Post != nil:
232-
return xdr.LedgerEntryChangeTypeLedgerEntryUpdated
233-
default:
234-
panic("Invalid state of Change (Pre == nil && Post == nil)")
235-
}
252+
return c.ChangeType
236253
}
237254

238255
// getLiquidityPool gets the most recent state of the LiquidityPool that exists or existed.

ingest/change_compactor.go

+159-48
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,58 @@ import (
2525
//
2626
// 1. If the change is CREATED it checks if any change connected to given entry
2727
// is already in the cache. If not, it adds CREATED change. Otherwise, if
28-
// existing change is:
29-
// a. CREATED it returns error because we can't add an entry that already
28+
// existing change is
29+
// a. CREATED: return an error because we can't add an entry that already exists.
30+
// b. UPDATED: return an error because we can't add an entry that already exists.
31+
// c. REMOVED: entry exists in the DB but was marked for removal; change the type
32+
// to UPDATED and update the new value.
33+
// d. RESTORED: return an error as the RESTORED change indicates the entry already
3034
// exists.
31-
// b. UPDATED it returns error because we can't add an entry that already
32-
// exists.
33-
// c. REMOVED it means that due to previous transitions we want to remove
34-
// this from a DB what means that it already exists in a DB so we need to
35-
// update the type of change to UPDATED.
35+
//
3636
// 2. If the change is UPDATE it checks if any change connected to given entry
3737
// is already in the cache. If not, it adds UPDATE change. Otherwise, if
38-
// existing change is:
39-
// a. CREATED it means that due to previous transitions we want to create
40-
// this in a DB what means that it doesn't exist in a DB so we need to
41-
// update the entry but stay with CREATED type.
42-
// b. UPDATED we simply update it with the new value.
38+
// existing change is
39+
// a. CREATED: We want to create this in a DB which means that it doesn't exist
40+
// in a DB so we need to update the entry but stay with CREATED type.
41+
// b. UPDATED: update it with the new value.
4342
// c. REMOVED it means that at this point in the ledger the entry is removed
4443
// so updating it returns an error.
45-
// 3. If the change is REMOVE it checks if any change connected to given entry
46-
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
47-
// existing change is:
48-
// a. CREATED it means that due to previous transitions we want to create
49-
// this in a DB what means that it doesn't exist in a DB. If it was
50-
// created and removed in the same ledger it's a noop so we remove entry
51-
// from the cache.
52-
// b. UPDATED we simply update it to be a REMOVE change because the UPDATE
53-
// change means the entry exists in a DB.
54-
// c. REMOVED it returns error because we can't remove an entry that was
55-
// already removed.
44+
// d. RESTORED: update it with the new value but keep the type as RESTORED.
45+
//
46+
// 3. If the change is REMOVED, it checks if any change related to the given entry
47+
// already exists in the cache. If not, it adds the `REMOVED` change. Otherwise,
48+
// if existing change is
49+
// a. CREATED: due to previous transitions we want to create
50+
// this in a DB which means that it doesn't exist in a DB. If it was created and
51+
// removed in the same ledger it's a noop so we remove the entry from the cache.
52+
// b. UPDATED: update it to be a REMOVE change because the UPDATE change means
53+
// the entry exists in a DB.
54+
// c. REMOVED: return an error because we can't remove an entry that was already
55+
// removed.
56+
// d. RESTORED: if the item was previously restored from an archived state, it means
57+
// it already exists in the DB, so change it to REMOVED type. If the restored item
58+
// was evicted, it doesn't exist in the DB, so it's a noop so remove the entry from
59+
// the cache.
60+
//
61+
// 4. If the change is RESTORED for an evicted entry (pre is nil), it checks if any
62+
// change related to the given entry already exists in the cache. If not, it adds
63+
// the RESTORED change. Otherwise, if existing change is
64+
// a. CREATED: return an error because we can't restore and entry that already exists.
65+
// b. UPDATED: return an error because we can't restore an entry that already exists.
66+
// c. REMOVED: entry exists in the DB but was marked for removal; change the
67+
// type to RESTORED and update the new value.
68+
// d. RESTORED: return an error as the RESTORED change indicates the entry
69+
// already exists.
70+
//
71+
// 5. If the change is RESTORED for an archived entry (pre and post not nil), it checks
72+
// if any change related to the given entry already exists in the cache. If not,
73+
// it adds the RESTORED change. Otherwise, if existing change is
74+
// a. CREATED: it means that it doesn't exist in the DB so we need to update the
75+
// entry but stay with CREATED type.
76+
// b. UPDATED: update it with the new value and change the type to RESTORED.
77+
// c. REMOVED: return an error because we can not RESTORE an entry that was already
78+
// removed.
79+
// d. RESTORED: update it with the new value.
5680
type ChangeCompactor struct {
5781
// ledger key => Change
5882
cache map[string]Change
@@ -75,13 +99,15 @@ func NewChangeCompactor() *ChangeCompactor {
7599
// cache takes too much memory, you apply changes returned by GetChanges and
76100
// create a new ChangeCompactor object to continue ingestion.
77101
func (c *ChangeCompactor) AddChange(change Change) error {
78-
switch {
79-
case change.Pre == nil && change.Post != nil:
102+
switch change.ChangeType {
103+
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
80104
return c.addCreatedChange(change)
81-
case change.Pre != nil && change.Post != nil:
105+
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
82106
return c.addUpdatedChange(change)
83-
case change.Pre != nil && change.Post == nil:
107+
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
84108
return c.addRemovedChange(change)
109+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
110+
return c.addRestoredChange(change)
85111
default:
86112
return errors.New("Unknown entry change state")
87113
}
@@ -110,11 +136,10 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
110136

111137
switch existingChange.LedgerEntryChangeType() {
112138
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
113-
return NewStateError(errors.Errorf(
114-
"can't create an entry that already exists (ledger key = %s)",
115-
base64.StdEncoding.EncodeToString(ledgerKey),
116-
))
139+
fallthrough
117140
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
141+
fallthrough
142+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
118143
return NewStateError(errors.Errorf(
119144
"can't create an entry that already exists (ledger key = %s)",
120145
base64.StdEncoding.EncodeToString(ledgerKey),
@@ -123,9 +148,10 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
123148
// If existing type is removed it means that this entry does exist
124149
// in a DB so we update entry change.
125150
c.cache[ledgerKeyString] = Change{
126-
Type: key.Type,
127-
Pre: existingChange.Pre,
128-
Post: change.Post,
151+
Type: key.Type,
152+
Pre: existingChange.Pre,
153+
Post: change.Post,
154+
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
129155
}
130156
default:
131157
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
@@ -157,26 +183,25 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {
157183

158184
switch existingChange.LedgerEntryChangeType() {
159185
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
186+
fallthrough
187+
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
188+
fallthrough
189+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
160190
// If existing type is created it means that this entry does not
161191
// exist in a DB so we update entry change.
162192
c.cache[ledgerKeyString] = Change{
163-
Type: key.Type,
164-
Pre: existingChange.Pre, // = nil
165-
Post: change.Post,
166-
}
167-
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
168-
c.cache[ledgerKeyString] = Change{
169-
Type: key.Type,
170-
Pre: existingChange.Pre,
171-
Post: change.Post,
193+
Type: key.Type,
194+
Pre: existingChange.Pre, // = nil for created type
195+
Post: change.Post,
196+
ChangeType: existingChange.ChangeType,
172197
}
173198
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
174199
return NewStateError(errors.Errorf(
175200
"can't update an entry that was previously removed (ledger key = %s)",
176201
base64.StdEncoding.EncodeToString(ledgerKey),
177202
))
178203
default:
179-
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
204+
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
180205
}
181206

182207
return nil
@@ -210,22 +235,108 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
210235
delete(c.cache, ledgerKeyString)
211236
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
212237
c.cache[ledgerKeyString] = Change{
213-
Type: key.Type,
214-
Pre: existingChange.Pre,
215-
Post: nil,
238+
Type: key.Type,
239+
Pre: existingChange.Pre,
240+
Post: nil,
241+
ChangeType: change.ChangeType,
216242
}
217243
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
218244
return NewStateError(errors.Errorf(
219245
"can't remove an entry that was previously removed (ledger key = %s)",
220246
base64.StdEncoding.EncodeToString(ledgerKey),
221247
))
248+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
249+
if existingChange.Pre == nil {
250+
// Entry was created and removed in the same ledger; deleting it is effectively a noop.
251+
delete(c.cache, ledgerKeyString)
252+
} else {
253+
// If the entry exists, we mark it as removed by setting Post to nil.
254+
c.cache[ledgerKeyString] = Change{
255+
Type: existingChange.Type,
256+
Pre: existingChange.Pre,
257+
Post: nil,
258+
ChangeType: change.ChangeType,
259+
}
260+
}
222261
default:
223-
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
262+
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
224263
}
225264

226265
return nil
227266
}
228267

268+
// addRestoredChange adds a change to the cache, but returns an error if the restore
269+
// change is unexpected.
270+
func (c *ChangeCompactor) addRestoredChange(change Change) error {
271+
// safe, since we later cast to string (causing a copy)
272+
key, err := change.Post.LedgerKey()
273+
if err != nil {
274+
return errors.Wrap(err, "error getting ledger key for updated entry")
275+
}
276+
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
277+
if err != nil {
278+
return errors.Wrap(err, "error marshaling ledger key for updated entry")
279+
}
280+
281+
ledgerKeyString := string(ledgerKey)
282+
283+
existingChange, exist := c.cache[ledgerKeyString]
284+
if !exist {
285+
c.cache[ledgerKeyString] = change
286+
return nil
287+
}
288+
// If 'Pre' is nil, it indicates that an item previously *evicted* is being restored.
289+
if change.Pre == nil {
290+
switch existingChange.ChangeType {
291+
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
292+
fallthrough
293+
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
294+
fallthrough
295+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
296+
return NewStateError(errors.Errorf(
297+
"can't restore an entry that already exists (ledger key = %s)",
298+
base64.StdEncoding.EncodeToString(ledgerKey),
299+
))
300+
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
301+
c.cache[ledgerKeyString] = Change{
302+
Type: key.Type,
303+
Pre: existingChange.Pre,
304+
Post: change.Post,
305+
ChangeType: change.ChangeType,
306+
}
307+
default:
308+
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
309+
}
310+
} else {
311+
// If 'Pre' is not nil, it indicates that an item previously *archived* is being restored.
312+
switch existingChange.ChangeType {
313+
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
314+
c.cache[ledgerKeyString] = Change{
315+
Type: key.Type,
316+
Pre: nil,
317+
Post: change.Post,
318+
ChangeType: existingChange.ChangeType,
319+
}
320+
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
321+
fallthrough
322+
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
323+
c.cache[ledgerKeyString] = Change{
324+
Type: key.Type,
325+
Pre: existingChange.Pre,
326+
Post: change.Post,
327+
ChangeType: change.ChangeType,
328+
}
329+
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
330+
return NewStateError(errors.Errorf(
331+
"can't restore an entry that was previously removed (ledger key = %s)", ledgerKey,
332+
))
333+
default:
334+
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
335+
}
336+
}
337+
return nil
338+
}
339+
229340
// GetChanges returns a slice of Changes in the cache. The order of changes is
230341
// random but each change is connected to a separate entry.
231342
func (c *ChangeCompactor) GetChanges() []Change {

0 commit comments

Comments
 (0)