Skip to content

Make output directory optional for collector #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/collect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ var cliFlags = []cli.Flag{
&cli.StringFlag{
Name: "out",
EnvVars: []string{"OUT"},
Required: true,
Usage: "output base directory",
Required: false,
Usage: "output base directory (optional, file writes disabled if not set)",
Category: "Collector Configuration",
},
&cli.StringFlag{
Expand Down Expand Up @@ -165,6 +165,10 @@ func runCollector(cCtx *cli.Context) error {
log.Fatal("No nodes, bloxroute, or eden token set (use -nodes <url1>,<url2> / -blx-token <token> / -eden-token <token>)")
}

if outDir == "" && clickhouseDSN == "" {
log.Fatal("Either --out or --clickhouse-dsn must be specified")
}

log.Infow("Starting mempool-collector", "version", common.Version, "outDir", outDir, "uid", uid, "enablePprof", enablePprof)

aliases := common.SourceAliasesFromEnv()
Expand Down
182 changes: 107 additions & 75 deletions collector/tx_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"strings"
"sync"
"time"
Expand All @@ -26,11 +27,14 @@ const (
receiverTimeout = 5 * time.Second
)

var ErrBlobMissingSidecar = errors.New("missing blob sidecar")
var (
errInvalidSender = errors.New("invalid sender")
errBlobMissingSidecar = errors.New("missing blob sidecar")
)

type TxProcessorOpts struct {
Log *zap.SugaredLogger
OutDir string
OutDir string // if empty no files will be written
UID string
Location string // location of the collector, will be stored in sourcelogs
CheckNodeURI string
Expand All @@ -50,7 +54,7 @@ type TxProcessor struct {
txC chan common.TxIn // note: it's important that the value is sent in here instead of a pointer, otherwise there are memory race conditions

outFilesLock sync.RWMutex
outFiles map[int64]*OutFiles
outFiles map[int64]OutFiles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change from pointer to value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly to avoid nil pointer deref, it was a bit more convenient that way in processTx. E.g. we can have zero value of OutFiles, and then reference its fields, just check that fd is not nil.

Also, it's probably easier on memory, storing struct with three pointers in map is faster (I think?) than storing a pointer to a different location on heap. This doesn't matter that much here, though.


knownTxs map[string]time.Time
knownTxsLock sync.RWMutex
Expand Down Expand Up @@ -91,7 +95,7 @@ func NewTxProcessor(opts TxProcessorOpts) *TxProcessor {
location: opts.Location,

outDir: opts.OutDir,
outFiles: make(map[int64]*OutFiles),
outFiles: make(map[int64]OutFiles),

knownTxs: make(map[string]time.Time),
srcMetrics: NewMetricsCounter(),
Expand Down Expand Up @@ -130,10 +134,12 @@ func (p *TxProcessor) Start() {
}
}

// Ensure output directory exists
err = os.MkdirAll(p.outDir, os.ModePerm)
if err != nil {
p.log.Fatal(err)
// Ensure output directory exists (only if outDir is set)
if p.outDir != "" {
err = os.MkdirAll(p.outDir, os.ModePerm)
if err != nil {
p.log.Fatal(err)
}
}

// start the txn map cleaner background task
Expand Down Expand Up @@ -162,14 +168,7 @@ func (p *TxProcessor) startTransactionReceiver() {
}

func (p *TxProcessor) sendTxToReceivers(txIn common.TxIn) {
sourceOk := false
for _, allowedSource := range p.receiversAllowedSources {
if txIn.Source == allowedSource {
sourceOk = true
break
}
}
if !sourceOk {
if !slices.Contains(p.receiversAllowedSources, txIn.Source) {
return
}

Expand Down Expand Up @@ -202,22 +201,27 @@ func (p *TxProcessor) processTx(txIn common.TxIn) {
p.srcMetrics.Inc(KeyStatsAll, txIn.Source)
p.srcMetrics.IncKey(KeyStatsUnique, txIn.Source, tx.Hash().Hex())

// get output file handles
outFiles, isCreated, err := p.getOutputCSVFiles(txIn.T.Unix())
if err != nil {
log.Errorw("getOutputFiles", "error", err)
return
} else if isCreated {
p.log.Infof("new file created: %s", outFiles.FTxs.Name())
p.log.Infof("new file created: %s", outFiles.FSourcelog.Name())
p.log.Infof("new file created: %s", outFiles.FTrash.Name())
}
// get output file handles (only if outDir is set)
var outFiles OutFiles
var err error
if p.outDir != "" {
var isCreated bool
outFiles, isCreated, err = p.getOutputCSVFiles(txIn.T.Unix())
if err != nil {
log.Errorw("getOutputFiles", "error", err)
return
} else if isCreated {
p.log.Infof("new file created: %s", outFiles.FTxs.Name())
p.log.Infof("new file created: %s", outFiles.FSourcelog.Name())
p.log.Infof("new file created: %s", outFiles.FTrash.Name())
}

// write sourcelog
_, err = fmt.Fprintf(outFiles.FSourcelog, "%d,%s,%s\n", txIn.T.UnixMilli(), txHashLower, txIn.Source)
if err != nil {
log.Errorw("fmt.Fprintf", "error", err)
return
// write sourcelog
_, err = fmt.Fprintf(outFiles.FSourcelog, "%d,%s,%s\n", txIn.T.UnixMilli(), txHashLower, txIn.Source)
if err != nil {
log.Errorw("fmt.Fprintf", "error", err)
return
}
}

if p.clickhouseConn != nil {
Expand All @@ -237,7 +241,8 @@ func (p *TxProcessor) processTx(txIn common.TxIn) {
}

// Sanity check transaction
if err = p.validateTx(outFiles.FTrash, txIn); err != nil {
if err = p.validateTx(txIn); err != nil {
p.writeInvalidTx(outFiles.FTrash, txIn, err)
metrics.IncTxReceivedTrash(txIn.Source)
p.srcMetrics.Inc(KeyStatsTxTrash, txIn.Source)
return
Expand Down Expand Up @@ -276,18 +281,20 @@ func (p *TxProcessor) processTx(txIn common.TxIn) {
p.srcMetrics.Inc(KeyStatsFirst, txIn.Source)
metrics.IncTxReceivedFirst(txIn.Source)

// create tx rlp
rlpHex, err := common.TxToRLPString(tx)
if err != nil {
log.Errorw("failed to encode rlp", "error", err)
return
}
// write the transaction file (only if outDir is set)
if p.outDir != "" {
// create tx rlp
rlpHex, err := common.TxToRLPString(tx)
if err != nil {
log.Errorw("failed to encode rlp", "error", err)
return
}

// write the transaction file
_, err = fmt.Fprintf(outFiles.FTxs, "%d,%s,%s\n", txIn.T.UnixMilli(), txHashLower, rlpHex)
if err != nil {
log.Errorw("fmt.Fprintf", "error", err)
return
_, err = fmt.Fprintf(outFiles.FTxs, "%d,%s,%s\n", txIn.T.UnixMilli(), txHashLower, rlpHex)
if err != nil {
log.Errorw("fmt.Fprintf", "error", err)
return
}
}

// Remember that this transaction was processed
Expand All @@ -297,70 +304,110 @@ func (p *TxProcessor) processTx(txIn common.TxIn) {
}

func (p *TxProcessor) writeTrash(fTrash *os.File, txIn common.TxIn, message, notes string) {
if fTrash == nil {
return // skip writing if file handle is nil (no-write mode)
}

txHashLower := strings.ToLower(txIn.Tx.Hash().Hex())
_, err := fmt.Fprintf(fTrash, "%d,%s,%s,%s,%s\n", txIn.T.UnixMilli(), txHashLower, txIn.Source, message, notes)
if err != nil {
p.log.With("tx_hash", txHashLower).With("source", txIn.Source).Errorw("fmt.Fprintf", "error", err)
}
}

func (p *TxProcessor) validateTx(fTrash *os.File, txIn common.TxIn) error { // inspired by https://github.com/flashbots/suave-geth/blob/dd3875eccde5b11feb621f10d9aae6417c98bdb0/core/txpool/txpool.go#L600
func (p *TxProcessor) writeInvalidTx(fTrash *os.File, txIn common.TxIn, err error) {
if fTrash == nil || err == nil {
return // skip writing if file handle is nil (no-write mode) or no error
}

var message, notes string
switch {
case errors.Is(err, common.ErrChainIDNotSet):
message = "chainId not set"
case errors.Is(err, errInvalidSender):
message = common.TrashTxSignatureError
case errors.Is(err, txpool.ErrNegativeValue):
message = "negative value"
case errors.Is(err, core.ErrFeeCapVeryHigh):
message = "extremely large gasFeeCap"
case errors.Is(err, core.ErrTipVeryHigh):
message = "extremely large gasTipCap"
case errors.Is(err, core.ErrTipAboveFeeCap):
message = "gasFeeCap lower than gasTipCap"
case errors.Is(err, errBlobMissingSidecar):
message = "invalid blob transaction"
default:
message = "validation error"
notes = err.Error()
}
p.writeTrash(fTrash, txIn, message, notes)
}

func (p *TxProcessor) validateTx(txIn common.TxIn) error { // inspired by https://github.com/flashbots/suave-geth/blob/dd3875eccde5b11feb621f10d9aae6417c98bdb0/core/txpool/txpool.go#L600
tx := txIn.Tx
txHashLower := strings.ToLower(tx.Hash().Hex())
log := p.log.With("tx_hash", txHashLower).With("source", txIn.Source)

if tx.ChainId().Sign() <= 0 {
log.Debugf("error: invalid chainID %w", tx.ChainId())
p.writeTrash(fTrash, txIn, "chainId not set", "")
return common.ErrChainIDNotSet
}

// Make sure the transaction is signed properly.
if _, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx); err != nil {
log.Debugw("error: transaction signature incorrect")
p.writeTrash(fTrash, txIn, common.TrashTxSignatureError, "")
return err
return errors.Join(errInvalidSender, err)
}

if tx.Value().Sign() < 0 {
log.Debugw("error: transaction with negative value")
p.writeTrash(fTrash, txIn, "negative value", "")
return txpool.ErrNegativeValue
}

// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
log.Debugw("error: transaction gasFeeCap extremely large")
p.writeTrash(fTrash, txIn, "extremely large gasFeeCap", "")
return core.ErrFeeCapVeryHigh
}

if tx.GasTipCap().BitLen() > 256 {
log.Debugw("error: gasTipCap extremely large")
p.writeTrash(fTrash, txIn, "extremely large gasTipCap", "")
return core.ErrTipVeryHigh
}

// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
log.Debugw("error: transaction gasFeeCap lower than gasTipCap")
p.writeTrash(fTrash, txIn, "gasFeeCap lower than gasTipCap", "")
return core.ErrTipAboveFeeCap
}

// Ensure blob txs are correctly formed
if err := p.validateBlobTx(tx); err != nil {
log.Debugw("error: invalid blob transaction", "reason", err)
p.writeTrash(fTrash, txIn, "invalid blob transaction", "")
return err
}

// all good
return nil
}

// validateBlobTx ensures that a blob tx is capable of being consumed
// by our system. Namely, the blob tx should be in the "full" PooledTransactions
// network representation with the full sidecar available.
func (p *TxProcessor) validateBlobTx(tx *types.Transaction) error {
if tx.Type() != types.BlobTxType {
return nil
}

if tx.BlobTxSidecar() == nil {
return errBlobMissingSidecar
}

return nil
}

// getOutputCSVFiles returns two file handles - one for the transactions and one for source stats, if needed - and a boolean indicating whether the file was created
func (p *TxProcessor) getOutputCSVFiles(timestamp int64) (outFiles *OutFiles, isCreated bool, err error) {
func (p *TxProcessor) getOutputCSVFiles(timestamp int64) (outFiles OutFiles, isCreated bool, err error) {
// bucketTS := timestamp / secPerDay * secPerDay // down-round timestamp to start of bucket
sec := int64(bucketMinutes * 60)
bucketTS := timestamp / sec * sec // timestamp down-round to start of bucket
Expand All @@ -378,42 +425,42 @@ func (p *TxProcessor) getOutputCSVFiles(timestamp int64) (outFiles *OutFiles, is
dir := filepath.Join(p.outDir, t.Format(time.DateOnly), "transactions")
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

fn := filepath.Join(dir, p.getFilename("txs", bucketTS))
fTx, err := os.OpenFile(fn, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

// open sourcelog for writing
dir = filepath.Join(p.outDir, t.Format(time.DateOnly), "sourcelog")
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

fn = filepath.Join(dir, p.getFilename("src", bucketTS))
fSourcelog, err := os.OpenFile(fn, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

// open trash for writing
dir = filepath.Join(p.outDir, t.Format(time.DateOnly), "trash")
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

fn = filepath.Join(dir, p.getFilename("trash", bucketTS))
fTrash, err := os.OpenFile(fn, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return nil, false, err
return OutFiles{}, false, err
}

outFiles = &OutFiles{
outFiles = OutFiles{
FTxs: fTx,
FSourcelog: fSourcelog,
FTrash: fTrash,
Expand Down Expand Up @@ -511,18 +558,3 @@ func (p *TxProcessor) healthCheckCall() {
}
resp.Body.Close()
}

// validateBlobTx ensures that a blob tx is capable of being consumed
// by our system. Namely, the blob tx should be in the "full" PooledTransactions
// network representation with the full sidecar available.
func (p *TxProcessor) validateBlobTx(tx *types.Transaction) error {
if tx.Type() != types.BlobTxType {
return nil
}

if tx.BlobTxSidecar() == nil {
return ErrBlobMissingSidecar
}

return nil
}
Loading