Skip to content

Reorganize processors #5594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,46 @@
package processors
package account

import (
"fmt"
"time"

"github.com/guregu/null"
"github.com/guregu/null/zero"
"github.com/stellar/go/ingest"
utils "github.com/stellar/go/ingest/processors/processor_utils"
"github.com/stellar/go/xdr"
)

// AccountOutput is a representation of an account that aligns with the BigQuery table accounts
type AccountOutput struct {
AccountID string `json:"account_id"` // account address
Balance float64 `json:"balance"`
BuyingLiabilities float64 `json:"buying_liabilities"`
SellingLiabilities float64 `json:"selling_liabilities"`
SequenceNumber int64 `json:"sequence_number"`
SequenceLedger zero.Int `json:"sequence_ledger"`
SequenceTime zero.Int `json:"sequence_time"`
NumSubentries uint32 `json:"num_subentries"`
InflationDestination string `json:"inflation_destination"`
Flags uint32 `json:"flags"`
HomeDomain string `json:"home_domain"`
MasterWeight int32 `json:"master_weight"`
ThresholdLow int32 `json:"threshold_low"`
ThresholdMedium int32 `json:"threshold_medium"`
ThresholdHigh int32 `json:"threshold_high"`
Sponsor null.String `json:"sponsor"`
NumSponsored uint32 `json:"num_sponsored"`
NumSponsoring uint32 `json:"num_sponsoring"`
LastModifiedLedger uint32 `json:"last_modified_ledger"`
LedgerEntryChange uint32 `json:"ledger_entry_change"`
Deleted bool `json:"deleted"`
ClosedAt time.Time `json:"closed_at"`
LedgerSequence uint32 `json:"ledger_sequence"`
}

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := ExtractEntryFromChange(ledgerChange)
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
}
Expand Down Expand Up @@ -75,7 +105,7 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}
Expand All @@ -84,9 +114,9 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: ConvertStroopValueToReal(outputBalance),
BuyingLiabilities: ConvertStroopValueToReal(outputBuyingLiabilities),
SellingLiabilities: ConvertStroopValueToReal(outputSellingLiabilities),
Balance: utils.ConvertStroopValueToReal(outputBalance),
BuyingLiabilities: utils.ConvertStroopValueToReal(outputBuyingLiabilities),
SellingLiabilities: utils.ConvertStroopValueToReal(outputSellingLiabilities),
SequenceNumber: outputSequenceNumber,
SequenceLedger: zero.IntFrom(int64(outputSequenceLedger)),
SequenceTime: zero.IntFrom(int64(outputSequenceTime)),
Expand All @@ -99,7 +129,7 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory
ThresholdMedium: outputThreshMed,
ThresholdHigh: outputThreshHigh,
LastModifiedLedger: outputLastModifiedLedger,
Sponsor: ledgerEntrySponsorToNullString(ledgerEntry),
Sponsor: utils.LedgerEntrySponsorToNullString(ledgerEntry),
NumSponsored: uint32(accountEntry.NumSponsored()),
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
package processors
package account

import (
"fmt"
"sort"
"time"

"github.com/guregu/null"
"github.com/stellar/go/ingest"
utils "github.com/stellar/go/ingest/processors/processor_utils"
"github.com/stellar/go/xdr"
)

// AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers
type AccountSignerOutput struct {
AccountID string `json:"account_id"`
Signer string `json:"signer"`
Weight int32 `json:"weight"`
Sponsor null.String `json:"sponsor"`
LastModifiedLedger uint32 `json:"last_modified_ledger"`
LedgerEntryChange uint32 `json:"ledger_entry_change"`
Deleted bool `json:"deleted"`
ClosedAt time.Time `json:"closed_at"`
LedgerSequence uint32 `json:"ledger_sequence"`
}

// TransformAccountSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery
func TransformAccountSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) {
var signers []AccountSignerOutput

ledgerEntry, changeType, outputDeleted, err := ExtractEntryFromChange(ledgerChange)
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return signers, err
}
Expand All @@ -23,7 +38,7 @@ func TransformAccountSigners(ledgerChange ingest.Change, header xdr.LedgerHeader
return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type)
}

closedAt, err := TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return signers, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package processors
package account

import (
"fmt"
Expand All @@ -13,6 +13,13 @@ import (
"github.com/stellar/go/xdr"
)

// a selection of hardcoded accounts with their IDs and addresses
var testAccount1Address = "GCEODJVUUVYVFD5KT4TOEDTMXQ76OPFOQC2EMYYMLPXQCUVPOB6XRWPQ"
var testAccount1ID, _ = xdr.AddressToAccountId(testAccount1Address)

var testAccount2Address = "GAOEOQMXDDXPVJC3HDFX6LZFKANJ4OOLQOD2MNXJ7PGAY5FEO4BRRAQU"
var testAccount2ID, _ = xdr.AddressToAccountId(testAccount2Address)

func TestTransformAccountSigner(t *testing.T) {
type inputStruct struct {
injest ingest.Change
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package processors
package account

import (
"fmt"
Expand All @@ -12,6 +12,13 @@ import (
"github.com/stellar/go/xdr"
)

// a selection of hardcoded accounts with their IDs and addresses
var genericAccountID, _ = xdr.NewAccountId(xdr.PublicKeyTypePublicKeyTypeEd25519, xdr.Uint256([32]byte{}))
var genericAccountAddress, _ = genericAccountID.GetAddress()

var testAccount3Address = "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN"
var testAccount3ID, _ = xdr.AddressToAccountId(testAccount3Address)

func TestTransformAccount(t *testing.T) {
type inputStruct struct {
ledgerChange ingest.Change
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
package processors
package asset

import (
"fmt"
"time"

"github.com/dgryski/go-farm"
utils "github.com/stellar/go/ingest/processors/processor_utils"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"
)

// AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets
type AssetOutput struct {
AssetCode string `json:"asset_code"`
AssetIssuer string `json:"asset_issuer"`
AssetType string `json:"asset_type"`
AssetID int64 `json:"asset_id"`
ClosedAt time.Time `json:"closed_at"`
LedgerSequence uint32 `json:"ledger_sequence"`
}

// TransformAsset converts an asset from a payment operation into a form suitable for BigQuery
func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIndex int32, ledgerSeq int32, lcm xdr.LedgerCloseMeta) (AssetOutput, error) {
operationID := toid.New(ledgerSeq, int32(transactionIndex), operationIndex).ToInt64()
Expand Down Expand Up @@ -35,29 +46,29 @@ func TransformAsset(operation xdr.Operation, operationIndex int32, transactionIn

}

outputAsset, err := transformSingleAsset(asset)
outputAsset, err := TransformSingleAsset(asset)
if err != nil {
return AssetOutput{}, fmt.Errorf("%s (id %d)", err.Error(), operationID)
}

outputCloseTime, err := GetCloseTime(lcm)
outputCloseTime, err := utils.GetCloseTime(lcm)
if err != nil {
return AssetOutput{}, err
}
outputAsset.ClosedAt = outputCloseTime
outputAsset.LedgerSequence = GetLedgerSequence(lcm)
outputAsset.LedgerSequence = utils.GetLedgerSequence(lcm)

return outputAsset, nil
}

func transformSingleAsset(asset xdr.Asset) (AssetOutput, error) {
func TransformSingleAsset(asset xdr.Asset) (AssetOutput, error) {
var outputAssetType, outputAssetCode, outputAssetIssuer string
err := asset.Extract(&outputAssetType, &outputAssetCode, &outputAssetIssuer)
if err != nil {
return AssetOutput{}, fmt.Errorf("could not extract asset from this operation")
}

farmAssetID := FarmHashAsset(outputAssetCode, outputAssetIssuer, outputAssetType)
farmAssetID := utils.FarmHashAsset(outputAssetCode, outputAssetIssuer, outputAssetType)

return AssetOutput{
AssetCode: outputAssetCode,
Expand All @@ -66,10 +77,3 @@ func transformSingleAsset(asset xdr.Asset) (AssetOutput, error) {
AssetID: farmAssetID,
}, nil
}

func FarmHashAsset(assetCode, assetIssuer, assetType string) int64 {
asset := fmt.Sprintf("%s%s%s", assetCode, assetIssuer, assetType)
hash := farm.Fingerprint64([]byte(asset))

return int64(hash)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package processors
package asset

import (
"fmt"
Expand All @@ -8,9 +8,58 @@ import (
"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
utils "github.com/stellar/go/ingest/processors/processor_utils"
"github.com/stellar/go/xdr"
)

var genericSourceAccount, _ = xdr.NewMuxedAccount(xdr.CryptoKeyTypeKeyTypeEd25519, xdr.Uint256([32]byte{}))

var genericBumpOperation = xdr.Operation{
SourceAccount: &genericSourceAccount,
Body: xdr.OperationBody{
Type: xdr.OperationTypeBumpSequence,
BumpSequenceOp: &xdr.BumpSequenceOp{},
},
}
var genericBumpOperationEnvelope = xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
SourceAccount: genericSourceAccount,
Memo: xdr.Memo{},
Operations: []xdr.Operation{
genericBumpOperation,
},
Ext: xdr.TransactionExt{
V: 0,
SorobanData: &xdr.SorobanTransactionData{
Ext: xdr.ExtensionPoint{
V: 0,
},
Resources: xdr.SorobanResources{
Footprint: xdr.LedgerFootprint{
ReadOnly: []xdr.LedgerKey{},
ReadWrite: []xdr.LedgerKey{},
},
},
ResourceFee: 100,
},
},
},
}

var genericLedgerCloseMeta = xdr.LedgerCloseMeta{
V: 0,
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: 2,
ScpValue: xdr.StellarValue{
CloseTime: 10,
},
},
},
},
}

func TestTransformAsset(t *testing.T) {

type assetInput struct {
Expand Down Expand Up @@ -65,6 +114,59 @@ func TestTransformAsset(t *testing.T) {
}
}

var lpAssetA = xdr.Asset{
Type: xdr.AssetTypeAssetTypeNative,
}

var lpAssetB = xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: xdr.AssetCode4([4]byte{0x55, 0x53, 0x53, 0x44}),
Issuer: testAccount4ID,
},
}

var genericTxMeta = utils.CreateSampleTxMeta(29, lpAssetA, lpAssetB)

var genericLedgerTransaction = ingest.LedgerTransaction{
Index: 1,
Envelope: xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &genericBumpOperationEnvelope,
},
Result: utils.CreateSampleResultMeta(true, 10).Result,
UnsafeMeta: xdr.TransactionMeta{
V: 1,
V1: genericTxMeta,
},
}

// a selection of hardcoded accounts with their IDs and addresses
var testAccount1Address = "GCEODJVUUVYVFD5KT4TOEDTMXQ76OPFOQC2EMYYMLPXQCUVPOB6XRWPQ"
var testAccount1ID, _ = xdr.AddressToAccountId(testAccount1Address)
var testAccount1 = testAccount1ID.ToMuxedAccount()

var testAccount2Address = "GAOEOQMXDDXPVJC3HDFX6LZFKANJ4OOLQOD2MNXJ7PGAY5FEO4BRRAQU"
var testAccount2ID, _ = xdr.AddressToAccountId(testAccount2Address)
var testAccount2 = testAccount2ID.ToMuxedAccount()

var testAccount3Address = "GBT4YAEGJQ5YSFUMNKX6BPBUOCPNAIOFAVZOF6MIME2CECBMEIUXFZZN"
var testAccount3ID, _ = xdr.AddressToAccountId(testAccount3Address)
var testAccount3 = testAccount3ID.ToMuxedAccount()

var testAccount4Address = "GBVVRXLMNCJQW3IDDXC3X6XCH35B5Q7QXNMMFPENSOGUPQO7WO7HGZPA"
var testAccount4ID, _ = xdr.AddressToAccountId(testAccount4Address)

var usdtAsset = xdr.Asset{
Type: xdr.AssetTypeAssetTypeCreditAlphanum4,
AlphaNum4: &xdr.AlphaNum4{
AssetCode: xdr.AssetCode4([4]byte{0x55, 0x53, 0x44, 0x54}),
Issuer: testAccount4ID,
},
}

var nativeAsset = xdr.MustNewNativeAsset()

func makeAssetTestInput() (inputTransaction ingest.LedgerTransaction, err error) {
inputTransaction = genericLedgerTransaction
inputEnvelope := genericBumpOperationEnvelope
Expand Down
Loading
Loading