Skip to content

Commit

Permalink
Refactors Subscriptions to be Constant (#140)
Browse files Browse the repository at this point in the history
Constant loading of Subscriptions
  • Loading branch information
anthonychernyak authored Mar 5, 2020
1 parent c3cb970 commit ccc38b2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 79 deletions.
38 changes: 18 additions & 20 deletions chains/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,30 @@ import (

// Config encapsulates all necessary parameters in ethereum compatible forms
type Config struct {
id msg.ChainId // ChainID
chainID *big.Int // Ethereum chain ID
endpoint string // url for rpc endpoint
from string // address of key to use
subscriptions []string // list of events to subscribe to
keystore *keystore.Keystore // Location of keyfiles
receiver common.Address
emitter common.Address
gasLimit *big.Int
gasPrice *big.Int
id msg.ChainId // ChainID
chainID *big.Int // Ethereum chain ID
endpoint string // url for rpc endpoint
from string // address of key to use
keystore *keystore.Keystore // Location of keyfiles
receiver common.Address
emitter common.Address
gasLimit *big.Int
gasPrice *big.Int
}

// ParseChainConfig uses a core.ChainConfig to construct a corresponding Config
func ParseChainConfig(chainCfg *core.ChainConfig) (*Config, error) {

config := &Config{
id: chainCfg.Id,
endpoint: chainCfg.Endpoint,
from: chainCfg.From,
subscriptions: chainCfg.Subscriptions,
keystore: chainCfg.Keystore,
chainID: big.NewInt(1),
receiver: common.HexToAddress("0x0"),
emitter: common.HexToAddress("0x0"),
gasLimit: big.NewInt(6721975),
gasPrice: big.NewInt(20000000000),
id: chainCfg.Id,
endpoint: chainCfg.Endpoint,
from: chainCfg.From,
keystore: chainCfg.Keystore,
chainID: big.NewInt(1),
receiver: common.HexToAddress("0x0"),
emitter: common.HexToAddress("0x0"),
gasLimit: big.NewInt(6721975),
gasPrice: big.NewInt(20000000000),
}

if chainID, ok := chainCfg.Opts["chainID"]; ok {
Expand Down
4 changes: 2 additions & 2 deletions chains/ethereum/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ func (c *Connection) NetworkId() (*big.Int, error) {
}

// subscribeToEvent registers an rpc subscription for the event with the signature sig for contract at address
func (c *Connection) subscribeToEvent(query eth.FilterQuery) (*Subscription, error) {
func (c *Connection) subscribeToEvent(query eth.FilterQuery) (*ActiveSubscription, error) {
ch := make(chan ethtypes.Log)
sub, err := c.conn.SubscribeFilterLogs(c.ctx, query, ch)
if err != nil {
close(ch)
return nil, err
}
return &Subscription{
return &ActiveSubscription{
ch: ch,
sub: sub,
}, nil
Expand Down
11 changes: 11 additions & 0 deletions chains/ethereum/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ import (
ethcrypto "github.com/ethereum/go-ethereum/crypto"
)

const (
DepositAsset = "DepositAsset"
NftTransfer = "NftTransfer"
ErcTransfer = "ErcTransfer"

// For internal usage only
DepositAssetSignature = "DepositAsset(address,bytes32)"
NftTransferSignature = "NFTTransfer(uint256,uint256,address,address,uint256,bytes)"
ErcTransferSignature = "ERCTransfer(uint256,uint256,address,uint256,address)"
)

func (l *Listener) handleTransferEvent(eventI interface{}) msg.Message {
log15.Debug("Handling deposit proposal event")
event := eventI.(ethtypes.Log)
Expand Down
29 changes: 0 additions & 29 deletions chains/ethereum/helpers.go

This file was deleted.

52 changes: 32 additions & 20 deletions chains/ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package ethereum

import (
"fmt"
"math/big"

"github.com/ChainSafe/ChainBridgeV2/chains"
Expand All @@ -18,14 +17,19 @@ import (
var _ chains.Listener = &Listener{}

type Subscription struct {
signature string
handler chains.EvtHandlerFn
}

type ActiveSubscription struct {
ch <-chan ethtypes.Log
sub eth.Subscription
}

type Listener struct {
cfg Config
conn *Connection
subscriptions map[EventSig]*Subscription
subscriptions map[EventSig]*ActiveSubscription
router chains.Router
emitterContract EmitterContract // instance of bound emitter contract
}
Expand All @@ -34,7 +38,7 @@ func NewListener(conn *Connection, cfg *Config) *Listener {
return &Listener{
cfg: *cfg,
conn: conn,
subscriptions: make(map[EventSig]*Subscription),
subscriptions: make(map[EventSig]*ActiveSubscription),
}
}

Expand All @@ -46,24 +50,32 @@ func (l *Listener) SetRouter(r chains.Router) {
l.router = r
}

func (l *Listener) GetSubscriptions() []*Subscription {
return []*Subscription{
{
signature: ErcTransfer,
handler: l.handleTransferEvent,
},
{
signature: NftTransfer,
handler: l.handleTransferEvent,
},
{
signature: DepositAsset,
handler: l.handleTestDeposit,
},
}

}

// Start registers all subscriptions provided by the config
func (l *Listener) Start() error {
log15.Debug("Starting listener...", "chainID", l.cfg.id, "subs", l.cfg.subscriptions)
for _, subscription := range l.cfg.subscriptions {
sub := subscription
switch sub {
case ErcTransferSignature, NftTransferSignature:
err := l.RegisterEventHandler(sub, l.handleTransferEvent)
if err != nil {
log15.Error("failed to register event handler", "err", err)
}
case DepositAssetSignature:
err := l.RegisterEventHandler(sub, l.handleTestDeposit)
if err != nil {
log15.Error("failed to register event handler", "err", err)
}
default:
return fmt.Errorf("Unrecognized event: %s", sub)
log15.Debug("Starting listener...", "chainID", l.cfg.id)
subscriptions := l.GetSubscriptions()
for _, sub := range subscriptions {
err := l.RegisterEventHandler(sub.signature, sub.handler)
if err != nil {
log15.Error("failed to register event handler", "err", err)
}
}
return nil
Expand Down Expand Up @@ -100,7 +112,7 @@ func (l *Listener) RegisterEventHandler(subscription string, handler chains.EvtH

// watchEvent will call the handler for every occurrence of the corresponding event. It should be run in a separate
// goroutine to monitor the subscription channel.
func (l *Listener) watchEvent(eventSubscription *Subscription, handler func(interface{}) msg.Message) {
func (l *Listener) watchEvent(eventSubscription *ActiveSubscription, handler func(interface{}) msg.Message) {
for {
select {
case evt := <-eventSubscription.ch:
Expand Down
12 changes: 4 additions & 8 deletions cmd/chainbridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,16 @@ func run(ctx *cli.Context) error {
Id: chain.Id,
Endpoint: chain.Endpoint,
From: chain.From,
// TODO remove this in favour of OPTS when config PR lands
Subscriptions: ethereum.BuildEventSubscriptions([]string{"DepositAsset", "NftTransfer", "ErcTransfer"}),
Keystore: ks,
Opts: chain.Opts,
Keystore: ks,
Opts: chain.Opts,
})
} else if chain.Type == "substrate" {
chainconfig, err = ethereum.InitializeChain(&core.ChainConfig{
Id: chain.Id,
Endpoint: chain.Endpoint,
From: chain.From,
// TODO remove this in favour of OPTS when config PR lands
Subscriptions: ethereum.BuildEventSubscriptions([]string{"DepositAsset", "NftTransfer", "ErcTransfer"}),
Keystore: ks,
Opts: chain.Opts,
Keystore: ks,
Opts: chain.Opts,
})
} else {
return errors.New("Unrecognized Chain Type")
Expand Down

0 comments on commit ccc38b2

Please sign in to comment.