Skip to content

Commit 760663a

Browse files
authored
feat: op-batcher auto switch to economic DA type (#209)
1 parent 40cf16b commit 760663a

File tree

9 files changed

+291
-9
lines changed

9 files changed

+291
-9
lines changed

op-batcher/batcher/channel_manager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"sync"
88

9+
"github.com/ethereum-optimism/optimism/op-batcher/flags"
910
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
1011
"github.com/ethereum-optimism/optimism/op-node/rollup"
1112
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
@@ -225,6 +226,21 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
225226
return nil
226227
}
227228

229+
func (s *channelManager) SwitchDAType(targetDAType flags.DataAvailabilityType) {
230+
s.mu.Lock()
231+
defer s.mu.Unlock()
232+
switch targetDAType {
233+
case flags.BlobsType:
234+
s.cfg.MaxFrameSize = eth.MaxBlobDataSize - 1
235+
s.cfg.MultiFrameTxs = true
236+
case flags.CalldataType:
237+
s.cfg.MaxFrameSize = CallDataMaxTxSize - 1
238+
s.cfg.MultiFrameTxs = false
239+
default:
240+
s.log.Crit("channel manager switch to a invalid DA type", "targetDAType", targetDAType.String())
241+
}
242+
}
243+
228244
// registerL1Block registers the given block at the pending channel.
229245
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
230246
s.currentChannel.CheckTimeout(l1Head.Number)

op-batcher/batcher/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (c *CLIConfig) Check() error {
118118
if c.BatchType > 1 {
119119
return fmt.Errorf("unknown batch type: %v", c.BatchType)
120120
}
121-
if c.DataAvailabilityType == flags.BlobsType && c.TargetNumFrames > 6 {
121+
if (c.DataAvailabilityType == flags.BlobsType || c.DataAvailabilityType == flags.AutoType) && c.TargetNumFrames > 6 {
122122
return errors.New("too many frames for blob transactions, max 6")
123123
}
124124
if !flags.ValidDataAvailabilityType(c.DataAvailabilityType) {

op-batcher/batcher/driver.go

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,39 @@ import (
77
"io"
88
"math/big"
99
_ "net/http/pprof"
10+
"strings"
1011
"sync"
12+
"sync/atomic"
1113
"time"
1214

15+
"github.com/ethereum-optimism/optimism/op-batcher/flags"
1316
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
1417
"github.com/ethereum-optimism/optimism/op-node/rollup"
1518
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
1619
plasma "github.com/ethereum-optimism/optimism/op-plasma"
1720
"github.com/ethereum-optimism/optimism/op-service/dial"
1821
"github.com/ethereum-optimism/optimism/op-service/eth"
1922
"github.com/ethereum-optimism/optimism/op-service/txmgr"
23+
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
2024
"github.com/ethereum/go-ethereum/core"
2125
"github.com/ethereum/go-ethereum/core/types"
2226
"github.com/ethereum/go-ethereum/log"
27+
"github.com/ethereum/go-ethereum/params"
2328
)
2429

30+
const LimitLoadBlocksOneTime uint64 = 30
31+
32+
// Auto DA params
33+
const DATypeSwitchThrehold int = 5
34+
const CallDataMaxTxSize uint64 = 120000
35+
const ApproximateGasPerCallDataTx int64 = 1934892
36+
const MaxBlobsNumberPerTx int64 = 6
37+
2538
var ErrBatcherNotRunning = errors.New("batcher is not running")
2639

2740
type L1Client interface {
2841
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
42+
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
2943
}
3044

3145
type L2Client interface {
@@ -47,6 +61,7 @@ type DriverSetup struct {
4761
EndpointProvider dial.L2EndpointProvider
4862
ChannelConfig ChannelConfig
4963
PlasmaDA *plasma.DAClient
64+
AutoSwitchDA bool
5065
}
5166

5267
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
@@ -68,6 +83,9 @@ type BatchSubmitter struct {
6883
lastStoredBlock eth.BlockID
6984
lastL1Tip eth.L1BlockRef
7085

86+
// addressReservedError is recorded from L1 txpool, which may occur when switch DA type
87+
addressReservedError atomic.Bool
88+
7189
state *channelManager
7290
}
7391

@@ -155,10 +173,15 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
155173
} else if start.Number >= end.Number {
156174
return errors.New("start number is >= end number")
157175
}
176+
// Limit the max loaded blocks one time
177+
endNumber := end.Number
178+
if endNumber-start.Number > LimitLoadBlocksOneTime {
179+
endNumber = start.Number + LimitLoadBlocksOneTime
180+
}
158181

159182
var latestBlock *types.Block
160183
// Add all blocks to "state"
161-
for i := start.Number + 1; i < end.Number+1; i++ {
184+
for i := start.Number + 1; i < endNumber+1; i++ {
162185
block, err := l.loadBlockIntoState(ctx, i)
163186
if errors.Is(err, ErrReorg) {
164187
l.Log.Warn("Found L2 reorg", "block_number", i)
@@ -272,6 +295,78 @@ func (l *BatchSubmitter) loop() {
272295
}
273296
}()
274297

298+
economicDATypeCh := make(chan flags.DataAvailabilityType)
299+
waitSwitchDACh := make(chan struct{})
300+
if l.AutoSwitchDA {
301+
// start auto choose economic DA type processing loop
302+
economicDALoopDone := make(chan struct{})
303+
defer close(economicDALoopDone) // shut down auto DA loop
304+
go func() {
305+
economicDAType := flags.BlobsType
306+
l.Metr.RecordAutoChoosedDAType(economicDAType)
307+
switchCount := 0
308+
economicDATicker := time.NewTicker(5 * time.Second)
309+
defer economicDATicker.Stop()
310+
addressReservedErrorTicker := time.NewTicker(time.Second)
311+
defer addressReservedErrorTicker.Stop()
312+
for {
313+
select {
314+
case <-economicDATicker.C:
315+
newEconomicDAType, err := l.getEconomicDAType(l.shutdownCtx)
316+
if err != nil {
317+
l.Log.Error("getEconomicDAType failed: %w", err)
318+
continue
319+
}
320+
if newEconomicDAType != economicDAType {
321+
switchCount++
322+
} else {
323+
switchCount = 0
324+
}
325+
threhold := DATypeSwitchThrehold
326+
if economicDAType == flags.CalldataType {
327+
threhold = 20 * DATypeSwitchThrehold
328+
}
329+
if switchCount >= threhold {
330+
l.Log.Info("start economic switch", "from type", economicDAType.String(), "to type", newEconomicDAType.String())
331+
start := time.Now()
332+
economicDAType = newEconomicDAType
333+
switchCount = 0
334+
economicDATypeCh <- economicDAType
335+
<-waitSwitchDACh
336+
l.Log.Info("finish economic switch", "duration", time.Since(start))
337+
l.Metr.RecordAutoChoosedDAType(economicDAType)
338+
l.Metr.RecordEconomicAutoSwitchCount()
339+
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
340+
}
341+
case <-addressReservedErrorTicker.C:
342+
if l.addressReservedError.Load() {
343+
if economicDAType == flags.BlobsType {
344+
economicDAType = flags.CalldataType
345+
l.Log.Info("start resolve addressReservedError switch", "from type", flags.BlobsType.String(), "to type", flags.CalldataType.String())
346+
} else if economicDAType == flags.CalldataType {
347+
economicDAType = flags.BlobsType
348+
l.Log.Info("start resolve addressReservedError switch", "from type", flags.CalldataType.String(), "to type", flags.BlobsType.String())
349+
} else {
350+
l.Log.Crit("invalid DA type in economic switch loop", "invalid type", economicDAType.String())
351+
}
352+
switchCount = 0
353+
start := time.Now()
354+
economicDATypeCh <- economicDAType
355+
<-waitSwitchDACh
356+
l.Log.Info("finish resolve addressReservedError switch", "duration", time.Since(start))
357+
l.Metr.RecordAutoChoosedDAType(economicDAType)
358+
l.Metr.RecordReservedErrorSwitchCount()
359+
l.Metr.RecordAutoSwitchTimeDuration(time.Since(start))
360+
l.addressReservedError.Store(false)
361+
}
362+
case <-economicDALoopDone:
363+
l.Log.Info("auto DA processing loop done")
364+
return
365+
}
366+
}
367+
}()
368+
}
369+
275370
ticker := time.NewTicker(l.Config.PollInterval)
276371
defer ticker.Stop()
277372

@@ -302,6 +397,26 @@ func (l *BatchSubmitter) loop() {
302397
continue
303398
}
304399
l.publishStateToL1(queue, receiptsCh)
400+
case targetDAType := <-economicDATypeCh:
401+
l.lastStoredBlock = eth.BlockID{}
402+
// close current state to prepare for switch
403+
err := l.state.Close()
404+
if err != nil {
405+
if errors.Is(err, ErrPendingAfterClose) {
406+
l.Log.Warn("Closed channel manager to handle DA type switch with pending channel(s) remaining - submitting")
407+
} else {
408+
l.Log.Error("Error closing the channel manager to handle a DA type switch", "err", err)
409+
}
410+
}
411+
// on DA type switch we want to publish all pending state then wait until each result clears before resetting
412+
// the state.
413+
publishAndWait()
414+
l.clearState(l.shutdownCtx)
415+
// switch action after clear state
416+
l.switchDAType(targetDAType)
417+
time.Sleep(time.Minute) // wait op-node derivation published DA data
418+
waitSwitchDACh <- struct{}{}
419+
continue
305420
case <-l.shutdownCtx.Done():
306421
if l.Txmgr.IsClosed() {
307422
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
@@ -324,6 +439,54 @@ func (l *BatchSubmitter) loop() {
324439
}
325440
}
326441

442+
func (l *BatchSubmitter) getEconomicDAType(ctx context.Context) (flags.DataAvailabilityType, error) {
443+
sCtx, sCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
444+
defer sCancel()
445+
gasPrice, err := l.L1Client.SuggestGasTipCap(sCtx)
446+
if err != nil {
447+
return "", fmt.Errorf("getEconomicDAType failed to fetch the suggested gas tip cap: %w", err)
448+
}
449+
calldataCost := big.NewInt(0).Mul(big.NewInt(MaxBlobsNumberPerTx*ApproximateGasPerCallDataTx), gasPrice)
450+
451+
hCtx, hCancel := context.WithTimeout(ctx, l.Config.NetworkTimeout)
452+
defer hCancel()
453+
header, err := l.L1Client.HeaderByNumber(hCtx, nil)
454+
if err != nil {
455+
return "", fmt.Errorf("getEconomicDAType failed to fetch the latest header: %w", err)
456+
}
457+
if header.ExcessBlobGas == nil {
458+
return "", fmt.Errorf("getEconomicDAType fetched header with nil ExcessBlobGas: %v", header)
459+
}
460+
blobGasPrice := eip4844.CalcBlobFee(*header.ExcessBlobGas)
461+
blobCost := big.NewInt(0).Add(big.NewInt(0).Mul(big.NewInt(int64(params.TxGas)), gasPrice), big.NewInt(0).Mul(big.NewInt(params.MaxBlobGasPerBlock), blobGasPrice))
462+
463+
l.Metr.RecordEstimatedCalldataTypeFee(calldataCost)
464+
l.Metr.RecordEstimatedBlobTypeFee(blobCost)
465+
if calldataCost.Cmp(blobCost) < 0 {
466+
l.Log.Info("Economic DA type is calldata", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
467+
return flags.CalldataType, nil
468+
}
469+
l.Log.Info("Economic DA type is blobs", "gas price", gasPrice, "calldata cost", calldataCost, "blob gas price", blobGasPrice, "blob cost", blobCost)
470+
return flags.BlobsType, nil
471+
}
472+
473+
func (l *BatchSubmitter) switchDAType(targetDAType flags.DataAvailabilityType) {
474+
switch targetDAType {
475+
case flags.BlobsType:
476+
l.Config.UseBlobs = true
477+
l.ChannelConfig.MaxFrameSize = eth.MaxBlobDataSize - 1
478+
l.ChannelConfig.MultiFrameTxs = true
479+
l.state.SwitchDAType(targetDAType)
480+
case flags.CalldataType:
481+
l.Config.UseBlobs = false
482+
l.ChannelConfig.MaxFrameSize = CallDataMaxTxSize - 1
483+
l.ChannelConfig.MultiFrameTxs = false
484+
l.state.SwitchDAType(targetDAType)
485+
default:
486+
l.Log.Crit("batch submitter switch to a invalid DA type", "targetDAType", targetDAType.String())
487+
}
488+
}
489+
327490
// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
328491
// no more data to queue for publishing or if there was an error queing the data.
329492
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txID], receiptsCh chan txmgr.TxReceipt[txID]) {
@@ -525,6 +688,10 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
525688
func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
526689
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
527690
l.state.TxFailed(id)
691+
if errStringMatch(err, txmgr.ErrAlreadyReserved) && l.AutoSwitchDA {
692+
l.Log.Warn("Encounter ErrAlreadyReserved", "id", id.String())
693+
l.addressReservedError.Store(true)
694+
}
528695
}
529696

530697
func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
@@ -560,3 +727,12 @@ func logFields(xs ...any) (fs []any) {
560727
}
561728
return fs
562729
}
730+
731+
func errStringMatch(err, target error) bool {
732+
if err == nil && target == nil {
733+
return true
734+
} else if err == nil || target == nil {
735+
return false
736+
}
737+
return strings.Contains(err.Error(), target.Error())
738+
}

op-batcher/batcher/service.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
120120
if err := bs.initPlasmaDA(cfg); err != nil {
121121
return fmt.Errorf("failed to init plasma DA: %w", err)
122122
}
123-
bs.initDriver()
123+
bs.initDriver(cfg)
124124
if err := bs.initRPCServer(cfg); err != nil {
125125
return fmt.Errorf("failed to start RPC server: %w", err)
126126
}
@@ -198,7 +198,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
198198
}
199199

200200
switch cfg.DataAvailabilityType {
201-
case flags.BlobsType:
201+
case flags.BlobsType, flags.AutoType:
202202
if !cfg.TestUseMaxTxSizeForBlobs {
203203
// account for version byte prefix
204204
cc.MaxFrameSize = eth.MaxBlobDataSize - 1
@@ -228,6 +228,7 @@ func (bs *BatcherService) initChannelConfig(cfg *CLIConfig) error {
228228
return fmt.Errorf("invalid channel configuration: %w", err)
229229
}
230230
bs.Log.Info("Initialized channel-config",
231+
"da_type", cfg.DataAvailabilityType.String(),
231232
"use_blobs", bs.UseBlobs,
232233
"use_plasma", bs.UsePlasma,
233234
"max_frame_size", cc.MaxFrameSize,
@@ -286,7 +287,7 @@ func (bs *BatcherService) initMetricsServer(cfg *CLIConfig) error {
286287
return nil
287288
}
288289

289-
func (bs *BatcherService) initDriver() {
290+
func (bs *BatcherService) initDriver(cfg *CLIConfig) {
290291
bs.driver = NewBatchSubmitter(DriverSetup{
291292
Log: bs.Log,
292293
Metr: bs.Metrics,
@@ -297,6 +298,7 @@ func (bs *BatcherService) initDriver() {
297298
EndpointProvider: bs.EndpointProvider,
298299
ChannelConfig: bs.ChannelConfig,
299300
PlasmaDA: bs.PlasmaDA,
301+
AutoSwitchDA: cfg.DataAvailabilityType == flags.AutoType,
300302
})
301303
}
302304

op-batcher/flags/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ const (
88
// data availability types
99
CalldataType DataAvailabilityType = "calldata"
1010
BlobsType DataAvailabilityType = "blobs"
11+
AutoType DataAvailabilityType = "auto"
1112
)
1213

1314
var DataAvailabilityTypes = []DataAvailabilityType{
1415
CalldataType,
1516
BlobsType,
17+
AutoType,
1618
}
1719

1820
func (kind DataAvailabilityType) String() string {

0 commit comments

Comments
 (0)