Skip to content

Commit 8ef1378

Browse files
committed
feat: add peers proxy
1 parent 36ec068 commit 8ef1378

File tree

8 files changed

+152
-46
lines changed

8 files changed

+152
-46
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ badgerStAskDb/
88
*.puml
99
market-client
1010
venus-market
11+
market-proxy
1112
sequence_chart.md
1213
.idea
1314
.vscode

cmd/market-proxy/main.go

+107-10
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@ import (
44
"context"
55
"crypto/rand"
66
"encoding/hex"
7+
"fmt"
78
"log"
89
"os"
10+
"time"
11+
12+
datatransfer "github.com/filecoin-project/go-data-transfer"
13+
14+
network2 "github.com/filecoin-project/venus-market/v2/network"
915

1016
"github.com/filecoin-project/venus-market/v2/config"
17+
"github.com/filecoin-project/venus-market/v2/utils"
1118

1219
logging "github.com/ipfs/go-log/v2"
1320
"github.com/urfave/cli/v2"
@@ -44,57 +51,104 @@ var protocols = []protocol.ID{
4451

4552
network.ProtocolGraphsync_1_0_0,
4653
network.ProtocolGraphsync_2_0_0,
54+
datatransfer.ProtocolDataTransfer1_2,
4755
}
4856

4957
var mainLog = logging.Logger("market-proxy")
5058

5159
func main() {
5260
app := &cli.App{
53-
Name: "venus-market",
54-
Usage: "venus-market",
61+
Name: "venus-market-proxy",
62+
Usage: "proxy multiple venus-market backends like nginx",
5563
Version: version.UserVersion(),
5664
EnableBashCompletion: true,
65+
Commands: []*cli.Command{
66+
{
67+
Name: "run",
68+
Usage: "start a libp2p proxy",
69+
Flags: []cli.Flag{
70+
&cli.StringFlag{
71+
Name: "listen",
72+
Usage: "specify listen address ",
73+
Value: "/ip4/0.0.0.0/tcp/11023",
74+
},
75+
&cli.StringFlag{
76+
Name: "peer-key",
77+
Usage: "peer key for p2p identify, if not specify, will generate new one",
78+
Value: "",
79+
},
80+
&cli.StringSliceFlag{
81+
Name: "backends",
82+
Usage: "a group of backends libp2p backends server",
83+
Required: true,
84+
},
85+
},
86+
Action: run,
87+
},
88+
{
89+
Name: "new-peer-key",
90+
Usage: "generate random peer key and corresponding private key ",
91+
Flags: []cli.Flag{},
92+
Action: genKey,
93+
},
94+
},
5795
}
5896

5997
if err := app.Run(os.Args); err != nil {
6098
log.Fatal(err)
6199
}
62100
}
63101

64-
type ProxyConfig struct {
65-
ProxyServer string
66-
}
102+
func run(c *cli.Context) error {
103+
ctx := c.Context
104+
utils.SetupLogLevels()
105+
cfg := config.ProxyServer{}
106+
cfg.Libp2p.ListenAddresses = []string{c.String("listen")}
107+
cfg.Libp2p.PrivateKey = c.String("peer-key")
108+
cfg.Backends = c.StringSlice("backends")
67109

68-
func run(cfg *config.ProxyServer) error {
69110
var pkey crypto.PrivKey
70111
var err error
71-
if len(cfg.PrivateKey) == 0 {
112+
if len(cfg.PrivateKey) > 0 {
72113
privateKeyBytes, err := hex.DecodeString(cfg.PrivateKey)
73114
if err != nil {
74115
return err
75116
}
76-
crypto.UnmarshalPrivateKey(privateKeyBytes)
117+
pkey, err = crypto.UnmarshalPrivateKey(privateKeyBytes)
118+
if err != nil {
119+
return err
120+
}
77121
} else {
78122
pkey, _, err = crypto.GenerateEd25519Key(rand.Reader)
79123
if err != nil {
80124
return err
81125
}
126+
127+
privateKeyBytes, err := crypto.MarshalPrivateKey(pkey)
128+
if err != nil {
129+
return err
130+
}
131+
132+
fmt.Println(hex.EncodeToString(privateKeyBytes))
82133
}
83134

84135
opts := []libp2p.Option{
136+
network2.MakeSmuxTransportOption(),
137+
libp2p.DefaultTransports,
85138
libp2p.ListenAddrStrings(cfg.ListenAddresses...),
86139
libp2p.Identity(pkey),
140+
libp2p.WithDialTimeout(time.Second * 5),
87141
libp2p.DefaultPeerstore,
88-
libp2p.NoListenAddrs,
142+
libp2p.DisableRelay(),
89143
libp2p.Ping(true),
144+
90145
libp2p.UserAgent("venus-market-proxy" + version.UserVersion()),
91146
}
92147

93148
h, err := libp2p.New(opts...)
94149
if err != nil {
95150
return err
96151
}
97-
98152
addrInfo, err := peer.AddrInfoFromString(cfg.Backends[0])
99153
if err != nil {
100154
return err
@@ -105,8 +159,51 @@ func run(cfg *config.ProxyServer) error {
105159
if err != nil {
106160
return err
107161
}
162+
108163
defer proxyHost.Close()
109164

110165
proxyHost.Start(context.Background())
166+
167+
//try to connect backends
168+
go func() {
169+
timer := time.NewTicker(time.Minute)
170+
defer timer.Stop()
171+
for {
172+
err = h.Connect(ctx, *addrInfo)
173+
if err != nil {
174+
mainLog.Errorf("connect to %s %v", addrInfo, err)
175+
}
176+
<-timer.C
177+
}
178+
}()
179+
180+
var libp2pAddrs []string
181+
for _, peer := range h.Addrs() {
182+
libp2pAddrs = append(libp2pAddrs, fmt.Sprintf("%s/p2p/%s\n", peer, h.ID()))
183+
}
184+
185+
mainLog.Infof("start listen at %v", libp2pAddrs)
186+
shutdownChan := make(chan struct{})
187+
finishCh := utils.MonitorShutdown(shutdownChan)
188+
<-finishCh
189+
return nil
190+
}
191+
192+
func genKey(c *cli.Context) error {
193+
pkey, _, err := crypto.GenerateEd25519Key(rand.Reader)
194+
if err != nil {
195+
return err
196+
}
197+
198+
privateKeyBytes, err := crypto.MarshalPrivateKey(pkey)
199+
if err != nil {
200+
return err
201+
}
202+
peerId, err := peer.IDFromPrivateKey(pkey)
203+
if err != nil {
204+
return err
205+
}
206+
fmt.Println("PeerId:", peerId)
207+
fmt.Println("Pkey:", hex.EncodeToString(privateKeyBytes))
111208
return nil
112209
}

network/host.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func Host(mctx metrics.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (host.Host
5151
return nil, err
5252
}
5353

54-
if len(params.Cfg.Proxy) == 0 {
54+
if len(params.Cfg.Proxy) > 0 {
5555
addrInfo, err := peer.AddrInfoFromString(params.Cfg.Proxy)
5656
if err != nil {
5757
return nil, err

network/smux.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux"
88
)
99

10-
func makeSmuxTransportOption() libp2p.Option {
10+
func MakeSmuxTransportOption() libp2p.Option {
1111
const yamuxID = "/yamux/1.0.0"
1212

1313
ymxtpt := *yamux.DefaultTransport
@@ -22,7 +22,7 @@ func makeSmuxTransportOption() libp2p.Option {
2222

2323
func SmuxTransport() func() (opts Libp2pOpts, err error) {
2424
return func() (opts Libp2pOpts, err error) {
25-
opts.Opts = append(opts.Opts, makeSmuxTransportOption())
25+
opts.Opts = append(opts.Opts, MakeSmuxTransportOption())
2626
return
2727
}
2828
}

protocolproxy/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Code inspired by boost project

protocolproxy/forwardinghost.go

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (fh *ForwardingHost) Close() error {
4747
func (fh *ForwardingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
4848
fh.Host.SetStreamHandler(pid, handler)
4949

50+
fmt.Println("protocol", pid)
5051
// Save the handler so it can be invoked from the forwarding protocol's handler
5152
// only set the handler if we are successful in registering the route
5253
fh.handlersLk.Lock()
@@ -141,6 +142,7 @@ func (fh *ForwardingHost) validateForwardingRequest(request *messages.Forwarding
141142
// Calls to "NewStream" open an outbound forwarding request to the proxy, that is then sent on
142143
// the the specified peer
143144
func (fh *ForwardingHost) NewStream(ctx context.Context, p peer.ID, protocols ...protocol.ID) (network.Stream, error) {
145+
log.Infow("ForwardingHost receive stream")
144146
// If there is a direct connection to the peer (or there was a connection
145147
// recently) open the stream over the direct connection
146148
if p != fh.proxy {

protocolproxy/protocolproxy.go

+36-33
Original file line numberDiff line numberDiff line change
@@ -192,39 +192,6 @@ func (pp *ProtocolProxy) processForwardingRequest(p peer.ID, remote peer.ID, pro
192192
return s, nil
193193
}
194194

195-
// pipe a stream through the PP
196-
func (pp *ProtocolProxy) bridgeStreams(s1, s2 network.Stream) {
197-
var wg sync.WaitGroup
198-
wg.Add(2)
199-
go func() {
200-
// pipe reads on s1 to writes on s2
201-
defer wg.Done()
202-
_, err := io.Copy(s2, s1)
203-
if err != nil {
204-
_ = s1.Reset()
205-
return
206-
}
207-
err = s2.CloseWrite()
208-
if err != nil {
209-
_ = s1.Reset()
210-
}
211-
}()
212-
go func() {
213-
// pipe reads on s2 to writes on s1
214-
defer wg.Done()
215-
_, err := io.Copy(s1, s2)
216-
if err != nil {
217-
_ = s2.Reset()
218-
return
219-
}
220-
err = s1.CloseWrite()
221-
if err != nil {
222-
_ = s2.Reset()
223-
}
224-
}()
225-
wg.Wait()
226-
}
227-
228195
func (pp *ProtocolProxy) handleIncoming(s network.Stream) {
229196
defer s.Close()
230197

@@ -262,3 +229,39 @@ func (pp *ProtocolProxy) handleIncoming(s network.Stream) {
262229

263230
pp.bridgeStreams(s, routedStream)
264231
}
232+
233+
// pipe a stream through the PP
234+
func (pp *ProtocolProxy) bridgeStreams(s1, s2 network.Stream) {
235+
defer func() {
236+
fmt.Println("exit stream")
237+
}()
238+
var wg sync.WaitGroup
239+
wg.Add(2)
240+
go func() {
241+
// pipe reads on s1 to writes on s2
242+
defer wg.Done()
243+
_, err := io.Copy(s2, s1)
244+
if err != nil {
245+
_ = s1.Reset()
246+
return
247+
}
248+
err = s2.CloseWrite()
249+
if err != nil {
250+
_ = s1.Reset()
251+
}
252+
}()
253+
go func() {
254+
// pipe reads on s2 to writes on s1
255+
defer wg.Done()
256+
_, err := io.Copy(s1, s2)
257+
if err != nil {
258+
_ = s2.Reset()
259+
return
260+
}
261+
err = s1.CloseWrite()
262+
if err != nil {
263+
_ = s2.Reset()
264+
}
265+
}()
266+
wg.Wait()
267+
}

storageprovider/stream.go

+2
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func (storageDealStream *StorageDealStream) HandleAskStream(s network.StorageAsk
7878
log.Errorf("unable to close err %v", err)
7979
}
8080
}()
81+
82+
log.Infof("receive ask stream")
8183
ar, err := s.ReadAskRequest()
8284
if err != nil {
8385
log.Errorf("failed to read AskRequest from incoming stream: %s", err)

0 commit comments

Comments
 (0)