Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Add support for ledger entry change type "restore" for Protocol 23 #5587

Merged
merged 19 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_USE_DB: true
PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 22.1.0-2194.0241e79f7.focal
PROTOCOL_22_CORE_DOCKER_IMG: stellar/stellar-core:22.1.0-2194.0241e79f7.focal
PROTOCOL_22_STELLAR_RPC_DOCKER_IMG: stellar/stellar-rpc:22.1.1
PROTOCOL_22_STELLAR_RPC_DOCKER_IMG: stellar/stellar-rpc:22.1.2
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
Expand Down
54 changes: 35 additions & 19 deletions ingest/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type Change struct {
// The type of the ledger entry being changed.
Type xdr.LedgerEntryType

// The specific type of change, such as Created, Updated, Removed or Restored.
ChangeType xdr.LedgerEntryChangeType

// The state of the LedgerEntry before the change. This will be nil if the entry was created.
Pre *xdr.LedgerEntry

Expand Down Expand Up @@ -144,24 +147,46 @@ func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges)
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
created := entryChange.MustCreated()
changes = append(changes, Change{
Type: created.Data.Type,
Pre: nil,
Post: &created,
Type: created.Data.Type,
Pre: nil,
Post: &created,
ChangeType: entryChange.Type,
})
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
state := ledgerEntryChanges[i-1].MustState()
updated := entryChange.MustUpdated()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state,
Post: &updated,
Type: state.Data.Type,
Pre: &state,
Post: &updated,
ChangeType: entryChange.Type,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
state := ledgerEntryChanges[i-1].MustState()
changes = append(changes, Change{
Type: state.Data.Type,
Pre: &state,
Post: nil,
Type: state.Data.Type,
Pre: &state,
Post: nil,
ChangeType: entryChange.Type,
})
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
restored := entryChange.MustRestored()
if i > 0 {
if state, ok := ledgerEntryChanges[i-1].GetState(); ok {
changes = append(changes, Change{
Type: restored.Data.Type,
Pre: &state,
Post: &restored,
ChangeType: entryChange.Type,
})
continue
}
}
changes = append(changes, Change{
Type: restored.Data.Type,
Pre: nil,
Post: &restored,
ChangeType: entryChange.Type,
})
case xdr.LedgerEntryChangeTypeLedgerEntryState:
continue
Expand Down Expand Up @@ -223,16 +248,7 @@ func sortChanges(changes []Change) {

// LedgerEntryChangeType returns type in terms of LedgerEntryChangeType.
func (c Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType {
switch {
case c.Pre == nil && c.Post != nil:
return xdr.LedgerEntryChangeTypeLedgerEntryCreated
case c.Pre != nil && c.Post == nil:
return xdr.LedgerEntryChangeTypeLedgerEntryRemoved
case c.Pre != nil && c.Post != nil:
return xdr.LedgerEntryChangeTypeLedgerEntryUpdated
default:
panic("Invalid state of Change (Pre == nil && Post == nil)")
}
return c.ChangeType
}

// getLiquidityPool gets the most recent state of the LiquidityPool that exists or existed.
Expand Down
208 changes: 160 additions & 48 deletions ingest/change_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,58 @@ import (
//
// 1. If the change is CREATED it checks if any change connected to given entry
// is already in the cache. If not, it adds CREATED change. Otherwise, if
// existing change is:
// a. CREATED it returns error because we can't add an entry that already
// existing change is
// a. CREATED: return an error because we can't add an entry that already exists.
// b. UPDATED: return an error because we can't add an entry that already exists.
// c. REMOVED: entry exists in the DB but was marked for removal; change the type
// to UPDATED and update the new value.
// d. RESTORED: return an error as the RESTORED change indicates the entry already
// exists.
// b. UPDATED it returns error because we can't add an entry that already
// exists.
// c. REMOVED it means that due to previous transitions we want to remove
// this from a DB what means that it already exists in a DB so we need to
// update the type of change to UPDATED.
//
// 2. If the change is UPDATE it checks if any change connected to given entry
// is already in the cache. If not, it adds UPDATE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB so we need to
// update the entry but stay with CREATED type.
// b. UPDATED we simply update it with the new value.
// existing change is
// a. CREATED: We want to create this in a DB which means that it doesn't exist
// in a DB so we need to update the entry but stay with CREATED type.
// b. UPDATED: update it with the new value.
// c. REMOVED it means that at this point in the ledger the entry is removed
// so updating it returns an error.
// 3. If the change is REMOVE it checks if any change connected to given entry
// is already in the cache. If not, it adds REMOVE change. Otherwise, if
// existing change is:
// a. CREATED it means that due to previous transitions we want to create
// this in a DB what means that it doesn't exist in a DB. If it was
// created and removed in the same ledger it's a noop so we remove entry
// from the cache.
// b. UPDATED we simply update it to be a REMOVE change because the UPDATE
// change means the entry exists in a DB.
// c. REMOVED it returns error because we can't remove an entry that was
// already removed.
// d. RESTORED: update it with the new value but keep the type as RESTORED.
//
// 3. If the change is REMOVED, it checks if any change related to the given entry
// already exists in the cache. If not, it adds the `REMOVED` change. Otherwise,
// if existing change is
// a. CREATED: due to previous transitions we want to create
// this in a DB which means that it doesn't exist in a DB. If it was created and
// removed in the same ledger it's a noop so we remove the entry from the cache.
// b. UPDATED: update it to be a REMOVE change because the UPDATE change means
// the entry exists in a DB.
// c. REMOVED: return an error because we can't remove an entry that was already
// removed.
// d. RESTORED: if the item was previously restored from an archived state, it means
// it already exists in the DB, so change it to REMOVED type. If the restored item
// was evicted, it doesn't exist in the DB, so it's a noop so remove the entry from
// the cache.
//
// 4. If the change is RESTORED for an evicted entry (pre is nil), it checks if any
// change related to the given entry already exists in the cache. If not, it adds
// the RESTORED change. Otherwise, if existing change is
// a. CREATED: return an error because we can't restore and entry that already exists.
// b. UPDATED: return an error because we can't restore an entry that already exists.
// c. REMOVED: entry exists in the DB but was marked for removal; change the
// type to RESTORED and update the new value.
// d. RESTORED: return an error as the RESTORED change indicates the entry
// already exists.
//
// 5. If the change is RESTORED for an archived entry (pre and post not nil), it checks
// if any change related to the given entry already exists in the cache. If not,
// it adds the RESTORED change. Otherwise, if existing change is
// a. CREATED: it means that it doesn't exist in the DB so we need to update the
// entry but stay with CREATED type.
// b. UPDATED: update it with the new value and change the type to RESTORED.
// c. REMOVED: return an error because we can not RESTORE an entry that was already
// removed.
// d. RESTORED: update it with the new value.
type ChangeCompactor struct {
// ledger key => Change
cache map[string]Change
Expand All @@ -75,13 +99,15 @@ func NewChangeCompactor() *ChangeCompactor {
// cache takes too much memory, you apply changes returned by GetChanges and
// create a new ChangeCompactor object to continue ingestion.
func (c *ChangeCompactor) AddChange(change Change) error {
switch {
case change.Pre == nil && change.Post != nil:
switch change.ChangeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
return c.addCreatedChange(change)
case change.Pre != nil && change.Post != nil:
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
return c.addUpdatedChange(change)
case change.Pre != nil && change.Post == nil:
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return c.addRemovedChange(change)
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
return c.addRestoredChange(change)
default:
return errors.New("Unknown entry change state")
}
Expand Down Expand Up @@ -110,11 +136,10 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
return NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
))
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
return NewStateError(errors.Errorf(
"can't create an entry that already exists (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
Expand All @@ -123,9 +148,10 @@ func (c *ChangeCompactor) addCreatedChange(change Change) error {
// If existing type is removed it means that this entry does exist
// in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated,
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
Expand Down Expand Up @@ -157,26 +183,25 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error {

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
// If existing type is created it means that this entry does not
// exist in a DB so we update entry change.
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre, // = nil
Post: change.Post,
}
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
Type: key.Type,
Pre: existingChange.Pre, // = nil for created type
Post: change.Post,
ChangeType: existingChange.ChangeType,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return NewStateError(errors.Errorf(
"can't update an entry that was previously removed (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}

return nil
Expand Down Expand Up @@ -210,22 +235,109 @@ func (c *ChangeCompactor) addRemovedChange(change Change) error {
delete(c.cache, ledgerKeyString)
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: nil,
Type: key.Type,
Pre: existingChange.Pre,
Post: nil,
ChangeType: change.ChangeType,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return NewStateError(errors.Errorf(
"can't remove an entry that was previously removed (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
))
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
if existingChange.Pre == nil {
// Entry was created and removed in the same ledger; deleting it is effectively a noop.
delete(c.cache, ledgerKeyString)
} else {
// If the entry exists, we mark it as removed by setting Post to nil.
c.cache[ledgerKeyString] = Change{
Type: existingChange.Type,
Pre: existingChange.Pre,
Post: nil,
ChangeType: change.ChangeType,
}
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.Type)
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}

return nil
}

// addRestoredChange adds a change to the cache, but returns an error if the restore
// change is unexpected.
func (c *ChangeCompactor) addRestoredChange(change Change) error {
// safe, since we later cast to string (causing a copy)
key, err := change.Post.LedgerKey()
if err != nil {
return errors.Wrap(err, "error getting ledger key for updated entry")
}
ledgerKey, err := c.encodingBuffer.UnsafeMarshalBinary(key)
if err != nil {
return errors.Wrap(err, "error marshaling ledger key for updated entry")
}

ledgerKeyString := string(ledgerKey)

existingChange, exist := c.cache[ledgerKeyString]
if !exist {
c.cache[ledgerKeyString] = change
return nil
}
// If 'Pre' is nil, it indicates that an item previously *evicted* is being restored.
if change.Pre == nil {
switch existingChange.ChangeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
return NewStateError(errors.Errorf(
"can't restore an entry that already exists (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
))
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
ChangeType: change.ChangeType,
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}
} else {
// If 'Pre' is not nil, it indicates that an item previously *archived* is being restored.
switch existingChange.ChangeType {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: nil,
Post: change.Post,
ChangeType: existingChange.ChangeType,
}
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
fallthrough
case xdr.LedgerEntryChangeTypeLedgerEntryRestored:
c.cache[ledgerKeyString] = Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
ChangeType: change.ChangeType,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return NewStateError(errors.Errorf(
"can't restore an entry that was previously removed (ledger key = %s)",
base64.StdEncoding.EncodeToString(ledgerKey),
))
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
}
}
return nil
}

// GetChanges returns a slice of Changes in the cache. The order of changes is
// random but each change is connected to a separate entry.
func (c *ChangeCompactor) GetChanges() []Change {
Expand Down
Loading
Loading