Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eventindexer): add disperser log, remove unused stats from previous testnets #16938

Merged
merged 8 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions packages/eventindexer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error {
return err
}

statRepository, err := repo.NewStatRepository(db)
if err != nil {
return err
}

nftBalanceRepository, err := repo.NewNFTBalanceRepository(db)
if err != nil {
return err
Expand All @@ -79,7 +74,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error {

srv, err := http.NewServer(http.NewServerOpts{
EventRepo: eventRepository,
StatRepo: statRepository,
NFTBalanceRepo: nftBalanceRepository,
ChartRepo: chartRepository,
Echo: echo.New(),
Expand Down
2 changes: 2 additions & 0 deletions packages/eventindexer/disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (d *Disperser) Start() error {
return err
}

slog.Info("addresses", "addresses", addresses)

for _, address := range addresses {
slog.Info("dispersing to", "address", address)

Expand Down
104 changes: 0 additions & 104 deletions packages/eventindexer/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,110 +403,6 @@ func (g *Generator) queryByTask(task string, date time.Time) error {

// return early for array processing data
return nil
case tasks.TotalProofRewards:
var feeTokenAddresses []string = make([]string, 0)
// get unique fee token addresses
query := "SELECT DISTINCT(fee_token_address) FROM stats WHERE stat_type = ?"

err = g.db.GormDB().
Raw(query, eventindexer.StatTypeProofReward).
Scan(&feeTokenAddresses).Error
if err != nil {
return err
}

slog.Info("feeTokenAddresses", "addresses", feeTokenAddresses)

for _, feeTokenAddress := range feeTokenAddresses {
f := feeTokenAddress

var dailyProofRewards decimal.NullDecimal

// nolint: lll
query := "SELECT COALESCE(SUM(proof_reward), 0) FROM events WHERE event = ? AND DATE(transacted_at) = ? AND fee_token_address = ?"
err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned, dateString, f).
Scan(&dailyProofRewards).Error

if err != nil {
return err
}

tsdResult, err := g.previousDayTsdResultByTask(task, date, &f, nil)
if err != nil {
return err
}

result := tsdResult.Decimal.Add(dailyProofRewards.Decimal)

slog.Info("Query successful",
"task", task,
"date", dateString,
"result", result.String(),
"feeTokenAddress", f,
)

insertStmt := `
INSERT INTO time_series_data(task, value, date, fee_token_address)
VALUES (?, ?, ?, ?)`

err = g.db.GormDB().Exec(insertStmt, task, result, dateString, f).Error
if err != nil {
slog.Info("Insert failed", "task", task, "date", dateString, "error", err.Error())
return err
}
}

// return early for array processing data
return nil
case tasks.ProofRewardsPerDay:
var feeTokenAddresses []string = make([]string, 0)
// get unique fee token addresses
query := "SELECT DISTINCT(fee_token_address) FROM stats WHERE stat_type = ?"

err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned).
Scan(&feeTokenAddresses).Error
if err != nil {
return err
}

for _, feeTokenAddress := range feeTokenAddresses {
f := feeTokenAddress

var result decimal.Decimal

// nolint: lll
query := `SELECT COALESCE(SUM(proof_reward), 0) FROM events WHERE event = ? AND DATE(transacted_at) = ? AND fee_token_address = ?`
err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned, dateString, f).
Scan(&result).Error

if err != nil {
return err
}

slog.Info("Query successful",
"task", task,
"date", dateString,
"result", result.String(),
"feeTokenAddress", f,
)

insertStmt := `
INSERT INTO time_series_data(task, value, date, fee_token_address)
VALUES (?, ?, ?, ?)`

err = g.db.GormDB().Exec(insertStmt, task, result, dateString, f).Error
if err != nil {
slog.Info("Insert failed", "task", task, "date", dateString, "error", err.Error())
return err
}
}

// return early for array processing data
return nil

case tasks.BridgeMessagesSentPerDay:
err = g.eventCount(task, date, eventindexer.EventNameMessageSent, &result)
case tasks.TotalBridgeMessagesSent:
Expand Down
2 changes: 1 addition & 1 deletion packages/eventindexer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
L1TaikoAddress: common.HexToAddress(c.String(flags.L1TaikoAddress.Name)),
BridgeAddress: common.HexToAddress(c.String(flags.BridgeAddress.Name)),
AssignmentHookAddress: common.HexToAddress(c.String(flags.AssignmentHookAddress.Name)),
SgxVerifierAddress: common.HexToAddress(flags.SgxVerifierAddress.Name),
SgxVerifierAddress: common.HexToAddress(c.String(flags.SgxVerifierAddress.Name)),
SwapAddresses: swaps,
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
Expand Down
7 changes: 0 additions & 7 deletions packages/eventindexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ var (
type Indexer struct {
accountRepo eventindexer.AccountRepository
eventRepo eventindexer.EventRepository
statRepo eventindexer.StatRepository
nftBalanceRepo eventindexer.NFTBalanceRepository
txRepo eventindexer.TransactionRepository

Expand Down Expand Up @@ -132,11 +131,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
return err
}

statRepository, err := repo.NewStatRepository(db)
if err != nil {
return err
}

nftBalanceRepository, err := repo.NewNFTBalanceRepository(db)
if err != nil {
return err
Expand Down Expand Up @@ -209,7 +203,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
i.blockSaveMutex = &sync.Mutex{}
i.accountRepo = accountRepository
i.eventRepo = eventRepository
i.statRepo = statRepository
i.nftBalanceRepo = nftBalanceRepository
i.txRepo = txRepository

Expand Down
70 changes: 2 additions & 68 deletions packages/eventindexer/indexer/save_block_assigned_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ func (i *Indexer) saveBlockAssignedEvent(
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

proverReward, err := i.updateAverageProverReward(ctx, event)
if err != nil {
return errors.Wrap(err, "i.updateAverageProverReward")
}

feeToken := event.Assignment.FeeToken.Hex()

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Expand All @@ -72,8 +67,8 @@ func (i *Indexer) saveBlockAssignedEvent(
Address: "",
AssignedProver: &assignedProver,
TransactedAt: time.Unix(int64(block.Time()), 0).UTC(),
Amount: proverReward,
ProofReward: proverReward,
Amount: big.NewInt(0),
ProofReward: big.NewInt(0),
FeeTokenAddress: &feeToken,
EmittedBlockID: event.Raw.BlockNumber,
})
Expand All @@ -85,64 +80,3 @@ func (i *Indexer) saveBlockAssignedEvent(

return nil
}

func (i *Indexer) updateAverageProverReward(
ctx context.Context,
event *assignmenthook.AssignmentHookBlockAssigned,
) (*big.Int, error) {
feeToken := event.Assignment.FeeToken.Hex()

stat, err := i.statRepo.Find(ctx, eventindexer.StatTypeProofReward, &feeToken)
if err != nil {
return nil, errors.Wrap(err, "i.statRepo.Find")
}

avg, ok := new(big.Int).SetString(stat.AverageProofReward, 10)
if !ok {
return nil, errors.New("unable to convert average proof time to string")
}

var proverFee *big.Int

tiers := event.Assignment.TierFees
minTier := event.Meta.MinTier

for _, tier := range tiers {
if tier.Tier == minTier {
proverFee = tier.Fee
break
}
}

newAverageProofReward := calcNewAverage(
avg,
new(big.Int).SetUint64(stat.NumProofs),
proverFee,
)

slog.Info("newAverageProofReward update",
"prover",
event.AssignedProver.Hex(),
"proverFee",
proverFee.String(),
"tiers",
event.Assignment.TierFees,
"minTier",
event.Meta.MinTier,
"avg",
avg.String(),
"newAvg",
newAverageProofReward.String(),
)

_, err = i.statRepo.Save(ctx, eventindexer.SaveStatOpts{
ProofReward: newAverageProofReward,
StatType: eventindexer.StatTypeProofReward,
FeeTokenAddress: &feeToken,
})
if err != nil {
return nil, errors.Wrap(err, "i.statRepo.Save")
}

return big.NewInt(0), nil
}
84 changes: 0 additions & 84 deletions packages/eventindexer/indexer/save_transition_proved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@ import (

"log/slog"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/taikol1"
)

var (
systemProver = common.HexToAddress("0x0000000000000000000000000000000000000001")
oracleProver = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

func (i *Indexer) saveTransitionProvedEvents(
ctx context.Context,
chainID *big.Int,
Expand Down Expand Up @@ -82,83 +76,5 @@ func (i *Indexer) saveTransitionProvedEvent(

eventindexer.TransitionProvedEventsProcessed.Inc()

if event.Prover.Hex() != systemProver.Hex() && event.Prover.Hex() != oracleProver.Hex() {
if err := i.updateAverageProofTime(ctx, event); err != nil {
return errors.Wrap(err, "i.updateAverageProofTime")
}
}

return nil
}

func (i *Indexer) updateAverageProofTime(ctx context.Context, event *taikol1.TaikoL1TransitionProved) error {
block, err := i.taikol1.GetBlock(nil, event.BlockId.Uint64())
// will be unable to GetBlock for older blocks, just return nil, we dont
// care about averageProofTime that much to be honest for older blocks
if err != nil {
slog.Error("getBlock error", "err", err.Error())

return nil
}

eventBlock, err := i.ethClient.BlockByHash(ctx, event.Raw.BlockHash)
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByHash")
}

stat, err := i.statRepo.Find(ctx, eventindexer.StatTypeProofTime, nil)
if err != nil {
return errors.Wrap(err, "i.statRepo.Find")
}

proposedAt := block.ProposedAt

provenAt := eventBlock.Time()

proofTime := provenAt - proposedAt

avg, ok := new(big.Int).SetString(stat.AverageProofTime, 10)
if !ok {
return errors.New("unable to convert average proof time to string")
}

newAverageProofTime := calcNewAverage(
avg,
new(big.Int).SetUint64(stat.NumProofs),
new(big.Int).SetUint64(proofTime),
)

slog.Info("avgProofWindow update",
"id",
event.BlockId.Int64(),
"prover",
event.Prover.Hex(),
"proposedAt",
proposedAt,
"provenAt",
provenAt,
"proofTime",
proofTime,
"avg",
avg.String(),
"newAvg",
newAverageProofTime.String(),
)

_, err = i.statRepo.Save(ctx, eventindexer.SaveStatOpts{
ProofTime: newAverageProofTime,
StatType: eventindexer.StatTypeProofTime,
})
if err != nil {
return errors.Wrap(err, "i.statRepo.Save")
}

return nil
}

func calcNewAverage(a, t, n *big.Int) *big.Int {
m := new(big.Int).Mul(a, t)
added := new(big.Int).Add(m, n)

return new(big.Int).Div(added, t.Add(t, big.NewInt(1)))
}
20 changes: 0 additions & 20 deletions packages/eventindexer/migrations/1666650701_create_stats_table.sql

This file was deleted.

Loading
Loading