Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add libp2p proxy #270

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ badgerStAskDb/
*.puml
market-client
venus-market
market-proxy
sequence_chart.md
.idea
.vscode
209 changes: 209 additions & 0 deletions cmd/market-proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package main

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"os"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"

network2 "github.com/filecoin-project/venus-market/v2/network"

"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/utils"

logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"

"github.com/filecoin-project/go-fil-markets/storagemarket"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-graphsync/network"

"github.com/filecoin-project/venus-market/v2/protocolproxy"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/filecoin-project/venus-market/v2/version"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/protocol"
)

var protocols = []protocol.ID{
//storage
storagemarket.AskProtocolID,
storagemarket.OldAskProtocolID,

storagemarket.DealProtocolID110,
storagemarket.DealProtocolID101,
storagemarket.DealProtocolID111,

storagemarket.OldDealStatusProtocolID,
storagemarket.DealStatusProtocolID,

//retrieval
retrievalmarket.QueryProtocolID,
retrievalmarket.OldQueryProtocolID,

network.ProtocolGraphsync_1_0_0,
network.ProtocolGraphsync_2_0_0,
datatransfer.ProtocolDataTransfer1_2,
}

var mainLog = logging.Logger("market-proxy")

func main() {
app := &cli.App{
Name: "venus-market-proxy",
Usage: "proxy multiple venus-market backends like nginx",
Version: version.UserVersion(),
EnableBashCompletion: true,
Commands: []*cli.Command{
{
Name: "run",
Usage: "start a libp2p proxy",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "specify listen address ",
Value: "/ip4/0.0.0.0/tcp/11023",
},
&cli.StringFlag{
Name: "peer-key",
Usage: "peer key for p2p identify, if not specify, will generate new one",
Value: "",
},
&cli.StringSliceFlag{
Name: "backends",
Usage: "a group of backends libp2p backends server",
Required: true,
},
},
Action: run,
},
{
Name: "new-peer-key",
Usage: "generate random peer key and corresponding private key ",
Flags: []cli.Flag{},
Action: genKey,
},
},
}

if err := app.Run(os.Args); err != nil {
log.Fatal(err)
}
}

func run(c *cli.Context) error {
ctx := c.Context
utils.SetupLogLevels()
cfg := config.ProxyServer{}
cfg.Libp2p.ListenAddresses = []string{c.String("listen")}
cfg.Libp2p.PrivateKey = c.String("peer-key")
cfg.Backends = c.StringSlice("backends")

var pkey crypto.PrivKey
var err error
if len(cfg.PrivateKey) > 0 {
privateKeyBytes, err := hex.DecodeString(cfg.PrivateKey)
if err != nil {
return err
}
pkey, err = crypto.UnmarshalPrivateKey(privateKeyBytes)
if err != nil {
return err
}
} else {
pkey, _, err = crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return err
}

privateKeyBytes, err := crypto.MarshalPrivateKey(pkey)
if err != nil {
return err
}

fmt.Println(hex.EncodeToString(privateKeyBytes))
}

opts := []libp2p.Option{
network2.MakeSmuxTransportOption(),
libp2p.DefaultTransports,
libp2p.ListenAddrStrings(cfg.ListenAddresses...),
libp2p.Identity(pkey),
libp2p.WithDialTimeout(time.Second * 5),
libp2p.DefaultPeerstore,
libp2p.DisableRelay(),
libp2p.Ping(true),

libp2p.UserAgent("venus-market-proxy" + version.UserVersion()),
}

h, err := libp2p.New(opts...)
if err != nil {
return err
}
addrInfo, err := peer.AddrInfoFromString(cfg.Backends[0])
if err != nil {
return err
}
proxyHost, err := protocolproxy.NewProtocolProxy(h, map[peer.ID][]protocol.ID{
addrInfo.ID: protocols,
})
if err != nil {
return err
}

defer proxyHost.Close()

proxyHost.Start(context.Background())

//try to connect backends
go func() {
timer := time.NewTicker(time.Minute)
defer timer.Stop()
for {
err = h.Connect(ctx, *addrInfo)
if err != nil {
mainLog.Errorf("connect to %s %v", addrInfo, err)
}
<-timer.C
}
}()

var libp2pAddrs []string
for _, peer := range h.Addrs() {
libp2pAddrs = append(libp2pAddrs, fmt.Sprintf("%s/p2p/%s\n", peer, h.ID()))
}

mainLog.Infof("start listen at %v", libp2pAddrs)
shutdownChan := make(chan struct{})
finishCh := utils.MonitorShutdown(shutdownChan)
<-finishCh
return nil
}

func genKey(c *cli.Context) error {
pkey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return err
}

privateKeyBytes, err := crypto.MarshalPrivateKey(pkey)
if err != nil {
return err
}
peerId, err := peer.IDFromPrivateKey(pkey)
if err != nil {
return err
}
fmt.Println("PeerId:", peerId)
fmt.Println("Pkey:", hex.EncodeToString(privateKeyBytes))
return nil
}
1 change: 1 addition & 0 deletions config/common.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ type Libp2p struct {
ProtectedPeers []string

PrivateKey string
Proxy string
}

type Common struct {
6 changes: 6 additions & 0 deletions config/proxy_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package config

type ProxyServer struct {
Libp2p
Backends []string
}
14 changes: 14 additions & 0 deletions network/host.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@ import (
"context"
"fmt"

"github.com/filecoin-project/venus-market/v2/config"

"github.com/filecoin-project/venus-market/v2/protocolproxy"

"github.com/filecoin-project/venus-market/v2/version"
"github.com/ipfs-force-community/metrics"
"github.com/libp2p/go-libp2p"
@@ -19,6 +23,8 @@ type P2PHostIn struct {
ID peer.ID
Peerstore peerstore.Peerstore

Cfg *config.Libp2p

Opts [][]libp2p.Option `group:"libp2p"`
}

@@ -45,6 +51,14 @@ func Host(mctx metrics.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (host.Host
return nil, err
}

if len(params.Cfg.Proxy) > 0 {
addrInfo, err := peer.AddrInfoFromString(params.Cfg.Proxy)
if err != nil {
return nil, err
}
h = protocolproxy.NewForwardingHost(h, *addrInfo)
}

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return h.Close()
4 changes: 2 additions & 2 deletions network/smux.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux"
)

func makeSmuxTransportOption() libp2p.Option {
func MakeSmuxTransportOption() libp2p.Option {
const yamuxID = "/yamux/1.0.0"

ymxtpt := *yamux.DefaultTransport
@@ -22,7 +22,7 @@ func makeSmuxTransportOption() libp2p.Option {

func SmuxTransport() func() (opts Libp2pOpts, err error) {
return func() (opts Libp2pOpts, err error) {
opts.Opts = append(opts.Opts, makeSmuxTransportOption())
opts.Opts = append(opts.Opts, MakeSmuxTransportOption())
return
}
}
1 change: 1 addition & 0 deletions protocolproxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Code inspired by boost project
29 changes: 29 additions & 0 deletions protocolproxy/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package protocolproxy

import (
"errors"
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

// ErrNotRegistered indicates a peer has not registered a given protocol but is
// trying to extend or terminate the registration
type ErrNotRegistered struct {
p peer.ID
protocolID protocol.ID
}

func (e ErrNotRegistered) Error() string {
return fmt.Sprintf("protocol %s is not registered to peer %s", e.protocolID, e.p)
}

// ErrNoInboundRequests is thrown by the load balancer when it receives and inbound request
var ErrNoInboundRequests = errors.New("inbound requests not accepted")

// ErrNoOutboundRequests is thrown by the service node when it receives and outbound request
var ErrNoOutboundRequests = errors.New("outbound requests not accepted")

// ErrInboundRequestsAreSingleProtocol is thrown by the service node when it receives and outbound request
var ErrInboundRequestsAreSingleProtocol = errors.New("inbound requests are single protocol")
Loading