Skip to content
6 changes: 4 additions & 2 deletions internal/data/statechanges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,10 @@ func TestStateChangeModel_BatchInsert(t *testing.T) {

mockMetricsService := metrics.NewMockMetricsService()
mockMetricsService.
On("ObserveDBQueryDuration", "BatchInsert", "state_changes", mock.Anything).Return().Once().
On("ObserveDBBatchSize", "BatchInsert", "state_changes", mock.Anything).Return().Once().
On("ObserveDBQueryDuration", "BatchInsert", "state_changes", mock.Anything).Return().Once()
mockMetricsService.
On("ObserveDBBatchSize", "BatchInsert", "state_changes", mock.Anything).Return().Once()
mockMetricsService.
On("IncDBQuery", "BatchInsert", "state_changes").Return().Once()

m := &StateChangeModel{
Expand Down
14 changes: 8 additions & 6 deletions internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,20 @@ type Indexer struct {
tokenTransferProcessor TokenTransferProcessorInterface
processors []OperationProcessorInterface
pool pond.Pool
metricsService processors.MetricsServiceInterface
}

func NewIndexer(networkPassphrase string, ledgerEntryProvider processors.LedgerEntryProvider, pool pond.Pool) *Indexer {
func NewIndexer(networkPassphrase string, ledgerEntryProvider processors.LedgerEntryProvider, pool pond.Pool, metricsService processors.MetricsServiceInterface) *Indexer {
return &Indexer{
participantsProcessor: processors.NewParticipantsProcessor(networkPassphrase),
tokenTransferProcessor: processors.NewTokenTransferProcessor(networkPassphrase),
tokenTransferProcessor: processors.NewTokenTransferProcessor(networkPassphrase, metricsService),
processors: []OperationProcessorInterface{
processors.NewEffectsProcessor(networkPassphrase, ledgerEntryProvider),
processors.NewContractDeployProcessor(networkPassphrase),
contract_processors.NewSACEventsProcessor(networkPassphrase),
processors.NewEffectsProcessor(networkPassphrase, ledgerEntryProvider, metricsService),
processors.NewContractDeployProcessor(networkPassphrase, metricsService),
contract_processors.NewSACEventsProcessor(networkPassphrase, metricsService),
},
pool: pool,
pool: pool,
metricsService: metricsService,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestIndexer_NewIndexer(t *testing.T) {
mockLedgerEntryProvider := &MockLedgerEntryProvider{}
pool := pond.NewPool(runtime.NumCPU())

indexer := NewIndexer(networkPassphrase, mockLedgerEntryProvider, pool)
indexer := NewIndexer(networkPassphrase, mockLedgerEntryProvider, pool, nil)

require.NotNil(t, indexer)
assert.NotNil(t, indexer.participantsProcessor)
Expand Down
19 changes: 16 additions & 3 deletions internal/indexer/processors/contract_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processors
import (
"context"
"fmt"
"time"

operation_processor "github.com/stellar/go/processors/operation"
"github.com/stellar/go/xdr"
Expand All @@ -13,10 +14,14 @@ import (
// ContractDeployProcessor emits state changes for contract deployments.
type ContractDeployProcessor struct {
networkPassphrase string
metricsService MetricsServiceInterface
}

func NewContractDeployProcessor(networkPassphrase string) *ContractDeployProcessor {
return &ContractDeployProcessor{networkPassphrase: networkPassphrase}
func NewContractDeployProcessor(networkPassphrase string, metricsService MetricsServiceInterface) *ContractDeployProcessor {
return &ContractDeployProcessor{
networkPassphrase: networkPassphrase,
metricsService: metricsService,
}
}

func (p *ContractDeployProcessor) Name() string {
Expand All @@ -25,13 +30,21 @@ func (p *ContractDeployProcessor) Name() string {

// ProcessOperation emits a state change for each contract deployment (including subinvocations).
func (p *ContractDeployProcessor) ProcessOperation(_ context.Context, op *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error) {
startTime := time.Now()
defer func() {
if p.metricsService != nil {
duration := time.Since(startTime).Seconds()
p.metricsService.ObserveStateChangeProcessingDuration("ContractDeployProcessor", duration)
}
}()

if op.OperationType() != xdr.OperationTypeInvokeHostFunction {
return nil, ErrInvalidOpType
}
invokeHostOp := op.Operation.Body.MustInvokeHostFunctionOp()

opID := op.ID()
builder := NewStateChangeBuilder(op.Transaction.Ledger.LedgerSequence(), op.LedgerClosed.Unix(), op.Transaction.Hash.HexString(), op.TransactionID()).
builder := NewStateChangeBuilder(op.Transaction.Ledger.LedgerSequence(), op.LedgerClosed.Unix(), op.Transaction.Hash.HexString(), op.TransactionID(), p.metricsService).
WithOperationID(opID).
WithCategory(types.StateChangeCategoryAccount).
WithReason(types.StateChangeReasonCreate)
Expand Down
10 changes: 5 additions & 5 deletions internal/indexer/processors/contract_deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func Test_ContractDeployProcessor_Process_invalidOpType(t *testing.T) {
ctx := context.Background()
proc := NewContractDeployProcessor(network.TestNetworkPassphrase)
proc := NewContractDeployProcessor(network.TestNetworkPassphrase, nil)

op := &operation_processor.TransactionOperationWrapper{
Operation: xdr.Operation{Body: xdr.OperationBody{Type: xdr.OperationTypePayment}},
Expand All @@ -37,7 +37,7 @@ func Test_ContractDeployProcessor_Process_createContract(t *testing.T) {

ctx := context.Background()

builder := NewStateChangeBuilder(12345, closeTime.Unix(), "0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20", 53021371269120).
builder := NewStateChangeBuilder(12345, closeTime.Unix(), "0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20", 53021371269120, nil).
WithOperationID(53021371269121).
WithReason(types.StateChangeReasonCreate).
WithCategory(types.StateChangeCategoryAccount)
Expand Down Expand Up @@ -136,7 +136,7 @@ func Test_ContractDeployProcessor_Process_createContract(t *testing.T) {
}
}

proc := NewContractDeployProcessor(network.TestNetworkPassphrase)
proc := NewContractDeployProcessor(network.TestNetworkPassphrase, nil)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stateChanges, err := proc.ProcessOperation(ctx, tc.op)
Expand Down Expand Up @@ -188,7 +188,7 @@ func Test_ContractDeployProcessor_Process_invokeContract(t *testing.T) {
return op
}

builder := NewStateChangeBuilder(12345, closeTime.Unix(), "0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20", 53021371269120).
builder := NewStateChangeBuilder(12345, closeTime.Unix(), "0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20", 53021371269120, nil).
WithOperationID(53021371269121).
WithReason(types.StateChangeReasonCreate).
WithCategory(types.StateChangeCategoryAccount)
Expand Down Expand Up @@ -257,7 +257,7 @@ func Test_ContractDeployProcessor_Process_invokeContract(t *testing.T) {
}
}

proc := NewContractDeployProcessor(network.TestNetworkPassphrase)
proc := NewContractDeployProcessor(network.TestNetworkPassphrase, nil)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
stateChanges, err := proc.ProcessOperation(ctx, tc.op)
Expand Down
15 changes: 13 additions & 2 deletions internal/indexer/processors/contracts/sac.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

operation_processor "github.com/stellar/go/processors/operation"
"github.com/stellar/go/strkey"
Expand Down Expand Up @@ -34,11 +35,13 @@ const (

type SACEventsProcessor struct {
networkPassphrase string
metricsService processors.MetricsServiceInterface
}

func NewSACEventsProcessor(networkPassphrase string) *SACEventsProcessor {
func NewSACEventsProcessor(networkPassphrase string, metricsService processors.MetricsServiceInterface) *SACEventsProcessor {
return &SACEventsProcessor{
networkPassphrase: networkPassphrase,
metricsService: metricsService,
}
}

Expand All @@ -48,6 +51,14 @@ func (p *SACEventsProcessor) Name() string {

// ProcessOperation processes contract events and converts them into state changes.
func (p *SACEventsProcessor) ProcessOperation(_ context.Context, opWrapper *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error) {
startTime := time.Now()
defer func() {
if p.metricsService != nil {
duration := time.Since(startTime).Seconds()
p.metricsService.ObserveStateChangeProcessingDuration("SACEventsProcessor", duration)
}
}()

if opWrapper.OperationType() != xdr.OperationTypeInvokeHostFunction {
return nil, processors.ErrInvalidOpType
}
Expand All @@ -70,7 +81,7 @@ func (p *SACEventsProcessor) ProcessOperation(_ context.Context, opWrapper *oper
}

stateChanges := make([]types.StateChange, 0)
builder := processors.NewStateChangeBuilder(ledgerNumber, ledgerCloseTime, txHash, txID).WithOperationID(opWrapper.ID())
builder := processors.NewStateChangeBuilder(ledgerNumber, ledgerCloseTime, txHash, txID, p.metricsService).WithOperationID(opWrapper.ID())
for _, event := range contractEvents {
// Validate basic contract contractEvent structure
if event.Type != xdr.ContractEventTypeContract || event.ContractId == nil || event.Body.V != 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/indexer/processors/contracts/sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

func TestSACEventsProcessor_ProcessOperation(t *testing.T) {
processor := NewSACEventsProcessor(networkPassphrase)
processor := NewSACEventsProcessor(networkPassphrase, nil)

t.Run("V3 Format - set_authorized = true (was unauthorized)", func(t *testing.T) {
admin := keypair.MustRandom().Address()
Expand Down
15 changes: 13 additions & 2 deletions internal/indexer/processors/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/base64"
"fmt"
"strconv"
"time"

"github.com/stellar/go/ingest"
effects "github.com/stellar/go/processors/effects"
Expand Down Expand Up @@ -63,13 +64,15 @@ var (
type EffectsProcessor struct {
networkPassphrase string
ledgerEntryProvider LedgerEntryProvider // Provider for ledger entry data to avoid import cycles
metricsService MetricsServiceInterface
}

// NewEffectsProcessor creates a new effects processor for the specified Stellar network.
func NewEffectsProcessor(networkPassphrase string, ledgerEntryProvider LedgerEntryProvider) *EffectsProcessor {
func NewEffectsProcessor(networkPassphrase string, ledgerEntryProvider LedgerEntryProvider, metricsService MetricsServiceInterface) *EffectsProcessor {
return &EffectsProcessor{
networkPassphrase: networkPassphrase,
ledgerEntryProvider: ledgerEntryProvider,
metricsService: metricsService,
}
}

Expand All @@ -82,6 +85,14 @@ func (p *EffectsProcessor) Name() string {
// home domain updates, data entry changes, and sponsorship relationship modifications.
// Returns a slice of state changes representing various account state changes.
func (p *EffectsProcessor) ProcessOperation(_ context.Context, opWrapper *operation_processor.TransactionOperationWrapper) ([]types.StateChange, error) {
startTime := time.Now()
defer func() {
if p.metricsService != nil {
duration := time.Since(startTime).Seconds()
p.metricsService.ObserveStateChangeProcessingDuration("EffectsProcessor", duration)
}
}()

ledgerCloseTime := opWrapper.Transaction.Ledger.LedgerCloseTime()
ledgerNumber := opWrapper.Transaction.Ledger.LedgerSequence()
txHash := opWrapper.Transaction.Result.TransactionHash.HexString()
Expand All @@ -100,7 +111,7 @@ func (p *EffectsProcessor) ProcessOperation(_ context.Context, opWrapper *operat
}

var stateChanges []types.StateChange
masterBuilder := NewStateChangeBuilder(ledgerNumber, ledgerCloseTime, txHash, txID).WithOperationID(opWrapper.ID())
masterBuilder := NewStateChangeBuilder(ledgerNumber, ledgerCloseTime, txHash, txID, p.metricsService).WithOperationID(opWrapper.ID())
// Process each effect and convert to our internal state change representation
for _, effect := range effectOutputs {
changeBuilder := masterBuilder.Clone().WithAccount(effect.Address)
Expand Down
20 changes: 10 additions & 10 deletions internal/indexer/processors/effects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {

op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
transaction := createTx(setTrustlineFlagsOp, nil, nil, false)
op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {

op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {

op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {

op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
)
op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
},
},
}, nil)
processor := NewEffectsProcessor(networkPassphrase, &mockProvider)
processor := NewEffectsProcessor(networkPassphrase, &mockProvider, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
)
op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
)
op, found := transaction.GetOperation(0)
require.True(t, found)
processor := NewEffectsProcessor(networkPassphrase, nil)
processor := NewEffectsProcessor(networkPassphrase, nil, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestEffects_ProcessTransaction(t *testing.T) {
fmt.Errorf("mock RPC error for testing error handling"),
)

processor := NewEffectsProcessor(networkPassphrase, &mockProvider)
processor := NewEffectsProcessor(networkPassphrase, &mockProvider, nil)
opWrapper := &operation_processor.TransactionOperationWrapper{
Index: 0,
Operation: op,
Expand Down
6 changes: 6 additions & 0 deletions internal/indexer/processors/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package processors

// MetricsServiceInterface defines the metrics operations needed by processors
type MetricsServiceInterface interface {
ObserveStateChangeProcessingDuration(processor string, duration float64)
}
10 changes: 7 additions & 3 deletions internal/indexer/processors/state_change_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (

// StateChangeBuilder provides a fluent interface for creating state changes
type StateChangeBuilder struct {
base types.StateChange
base types.StateChange
metricsService MetricsServiceInterface
}

// NewStateChangeBuilder creates a new builder with base state change fields
func NewStateChangeBuilder(ledgerNumber uint32, ledgerCloseTime int64, txHash string, txID int64) *StateChangeBuilder {
func NewStateChangeBuilder(ledgerNumber uint32, ledgerCloseTime int64, txHash string, txID int64, metricsService MetricsServiceInterface) *StateChangeBuilder {
return &StateChangeBuilder{
base: types.StateChange{
LedgerNumber: ledgerNumber,
Expand All @@ -26,6 +27,7 @@ func NewStateChangeBuilder(ledgerNumber uint32, ledgerCloseTime int64, txHash st
TxHash: txHash,
TxID: txID,
},
metricsService: metricsService,
}
}

Expand Down Expand Up @@ -128,6 +130,7 @@ func (b *StateChangeBuilder) Build() types.StateChange {
b.base.ToID = b.base.TxID
}
b.base.SortKey = b.generateSortKey()

return b.base
}

Expand Down Expand Up @@ -178,6 +181,7 @@ func (b *StateChangeBuilder) generateSortKey() string {

func (b *StateChangeBuilder) Clone() *StateChangeBuilder {
return &StateChangeBuilder{
base: b.base,
base: b.base,
metricsService: b.metricsService,
}
}
Loading
Loading