Skip to content

Commit 36ec068

Browse files
committed
feat: add forwarding proxy server
1 parent c098232 commit 36ec068

14 files changed

+1414
-0
lines changed

cmd/market-proxy/main.go

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"log"
8+
"os"
9+
10+
"github.com/filecoin-project/venus-market/v2/config"
11+
12+
logging "github.com/ipfs/go-log/v2"
13+
"github.com/urfave/cli/v2"
14+
15+
"github.com/filecoin-project/go-fil-markets/storagemarket"
16+
17+
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
18+
"github.com/ipfs/go-graphsync/network"
19+
20+
"github.com/filecoin-project/venus-market/v2/protocolproxy"
21+
"github.com/libp2p/go-libp2p/core/peer"
22+
23+
"github.com/filecoin-project/venus-market/v2/version"
24+
"github.com/libp2p/go-libp2p"
25+
"github.com/libp2p/go-libp2p/core/crypto"
26+
"github.com/libp2p/go-libp2p/core/protocol"
27+
)
28+
29+
var protocols = []protocol.ID{
30+
//storage
31+
storagemarket.AskProtocolID,
32+
storagemarket.OldAskProtocolID,
33+
34+
storagemarket.DealProtocolID110,
35+
storagemarket.DealProtocolID101,
36+
storagemarket.DealProtocolID111,
37+
38+
storagemarket.OldDealStatusProtocolID,
39+
storagemarket.DealStatusProtocolID,
40+
41+
//retrieval
42+
retrievalmarket.QueryProtocolID,
43+
retrievalmarket.OldQueryProtocolID,
44+
45+
network.ProtocolGraphsync_1_0_0,
46+
network.ProtocolGraphsync_2_0_0,
47+
}
48+
49+
var mainLog = logging.Logger("market-proxy")
50+
51+
func main() {
52+
app := &cli.App{
53+
Name: "venus-market",
54+
Usage: "venus-market",
55+
Version: version.UserVersion(),
56+
EnableBashCompletion: true,
57+
}
58+
59+
if err := app.Run(os.Args); err != nil {
60+
log.Fatal(err)
61+
}
62+
}
63+
64+
type ProxyConfig struct {
65+
ProxyServer string
66+
}
67+
68+
func run(cfg *config.ProxyServer) error {
69+
var pkey crypto.PrivKey
70+
var err error
71+
if len(cfg.PrivateKey) == 0 {
72+
privateKeyBytes, err := hex.DecodeString(cfg.PrivateKey)
73+
if err != nil {
74+
return err
75+
}
76+
crypto.UnmarshalPrivateKey(privateKeyBytes)
77+
} else {
78+
pkey, _, err = crypto.GenerateEd25519Key(rand.Reader)
79+
if err != nil {
80+
return err
81+
}
82+
}
83+
84+
opts := []libp2p.Option{
85+
libp2p.ListenAddrStrings(cfg.ListenAddresses...),
86+
libp2p.Identity(pkey),
87+
libp2p.DefaultPeerstore,
88+
libp2p.NoListenAddrs,
89+
libp2p.Ping(true),
90+
libp2p.UserAgent("venus-market-proxy" + version.UserVersion()),
91+
}
92+
93+
h, err := libp2p.New(opts...)
94+
if err != nil {
95+
return err
96+
}
97+
98+
addrInfo, err := peer.AddrInfoFromString(cfg.Backends[0])
99+
if err != nil {
100+
return err
101+
}
102+
proxyHost, err := protocolproxy.NewProtocolProxy(h, map[peer.ID][]protocol.ID{
103+
addrInfo.ID: protocols,
104+
})
105+
if err != nil {
106+
return err
107+
}
108+
defer proxyHost.Close()
109+
110+
proxyHost.Start(context.Background())
111+
return nil
112+
}

config/common.go

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Libp2p struct {
3232
ProtectedPeers []string
3333

3434
PrivateKey string
35+
Proxy string
3536
}
3637

3738
type Common struct {

config/proxy_server.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package config
2+
3+
type ProxyServer struct {
4+
Libp2p
5+
Backends []string
6+
}

network/host.go

+14
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/filecoin-project/venus-market/v2/config"
8+
9+
"github.com/filecoin-project/venus-market/v2/protocolproxy"
10+
711
"github.com/filecoin-project/venus-market/v2/version"
812
"github.com/ipfs-force-community/metrics"
913
"github.com/libp2p/go-libp2p"
@@ -19,6 +23,8 @@ type P2PHostIn struct {
1923
ID peer.ID
2024
Peerstore peerstore.Peerstore
2125

26+
Cfg *config.Libp2p
27+
2228
Opts [][]libp2p.Option `group:"libp2p"`
2329
}
2430

@@ -45,6 +51,14 @@ func Host(mctx metrics.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (host.Host
4551
return nil, err
4652
}
4753

54+
if len(params.Cfg.Proxy) == 0 {
55+
addrInfo, err := peer.AddrInfoFromString(params.Cfg.Proxy)
56+
if err != nil {
57+
return nil, err
58+
}
59+
h = protocolproxy.NewForwardingHost(h, *addrInfo)
60+
}
61+
4862
lc.Append(fx.Hook{
4963
OnStop: func(ctx context.Context) error {
5064
return h.Close()

protocolproxy/errors.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package protocolproxy
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/libp2p/go-libp2p/core/peer"
8+
"github.com/libp2p/go-libp2p/core/protocol"
9+
)
10+
11+
// ErrNotRegistered indicates a peer has not registered a given protocol but is
12+
// trying to extend or terminate the registration
13+
type ErrNotRegistered struct {
14+
p peer.ID
15+
protocolID protocol.ID
16+
}
17+
18+
func (e ErrNotRegistered) Error() string {
19+
return fmt.Sprintf("protocol %s is not registered to peer %s", e.protocolID, e.p)
20+
}
21+
22+
// ErrNoInboundRequests is thrown by the load balancer when it receives and inbound request
23+
var ErrNoInboundRequests = errors.New("inbound requests not accepted")
24+
25+
// ErrNoOutboundRequests is thrown by the service node when it receives and outbound request
26+
var ErrNoOutboundRequests = errors.New("outbound requests not accepted")
27+
28+
// ErrInboundRequestsAreSingleProtocol is thrown by the service node when it receives and outbound request
29+
var ErrInboundRequestsAreSingleProtocol = errors.New("inbound requests are single protocol")

protocolproxy/forwardinghost.go

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package protocolproxy
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/filecoin-project/venus-market/v2/protocolproxy/messages"
9+
"github.com/libp2p/go-libp2p/core/host"
10+
"github.com/libp2p/go-libp2p/core/network"
11+
"github.com/libp2p/go-libp2p/core/peer"
12+
"github.com/libp2p/go-libp2p/core/protocol"
13+
)
14+
15+
// ForwardingHost is a host that behaves as a service node connected to a proxy
16+
// -- all traffic is routed through the proxy for each registered protocol
17+
type ForwardingHost struct {
18+
host.Host
19+
proxy peer.ID
20+
handlersLk sync.RWMutex
21+
handlers map[protocol.ID]network.StreamHandler
22+
}
23+
24+
// NewForwardingHost node constructs a service node connected to the given proxy on the passed
25+
// in host. A forwarding host behaves exactly like a host.Host but setting up new protocol handlers
26+
// registers routes on the proxy node.
27+
func NewForwardingHost(h host.Host, proxy peer.AddrInfo) host.Host {
28+
fh := &ForwardingHost{
29+
Host: h,
30+
proxy: proxy.ID,
31+
handlers: make(map[protocol.ID]network.StreamHandler),
32+
}
33+
fh.Host.SetStreamHandler(ForwardingProtocolID, fh.handleForwarding)
34+
return fh
35+
}
36+
37+
// Close shuts down a service node host's forwarding
38+
func (fh *ForwardingHost) Close() error {
39+
fh.Host.RemoveStreamHandler(ForwardingProtocolID)
40+
return fh.Host.Close()
41+
}
42+
43+
// SetStreamHandler interrupts the normal process of setting up stream handlers by also
44+
// registering a route on the connected protocol proxy. All traffic on the forwarding
45+
// protocol will go through the forwarding handshake with the proxy, then the native
46+
// handler will be called
47+
func (fh *ForwardingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
48+
fh.Host.SetStreamHandler(pid, handler)
49+
50+
// Save the handler so it can be invoked from the forwarding protocol's handler
51+
// only set the handler if we are successful in registering the route
52+
fh.handlersLk.Lock()
53+
fh.handlers[pid] = handler
54+
fh.handlersLk.Unlock()
55+
}
56+
57+
// these wrappings on the stream or conn make it SEEM like the request is coming
58+
// from the original peer, so that it's processed as if it were
59+
type wrappedStream struct {
60+
network.Stream
61+
protocol protocol.ID
62+
remote peer.ID
63+
}
64+
65+
type wrappedConn struct {
66+
network.Conn
67+
remote peer.ID
68+
}
69+
70+
func (ws *wrappedStream) Protocol() protocol.ID {
71+
return ws.protocol
72+
}
73+
74+
func (ws *wrappedStream) Conn() network.Conn {
75+
conn := ws.Stream.Conn()
76+
return &wrappedConn{conn, ws.remote}
77+
}
78+
79+
func (wc *wrappedConn) RemotePeer() peer.ID {
80+
return wc.remote
81+
}
82+
83+
// handle inbound forwarding requests
84+
func (fh *ForwardingHost) handleForwarding(s network.Stream) {
85+
// only accept requests from the proxy
86+
if s.Conn().RemotePeer() != fh.proxy {
87+
_ = s.Reset()
88+
return
89+
}
90+
91+
// read the forwarding request
92+
request, err := messages.ReadForwardingRequest(s)
93+
if err != nil {
94+
log.Warnf("reading forwarding request: %s", err)
95+
_ = s.Reset()
96+
return
97+
}
98+
99+
log.Debugw("received forwarding request for protocol", "protocols", request.Protocols, "remote", request.Remote)
100+
101+
// validate the request
102+
handler, responseErr := fh.validateForwardingRequest(request)
103+
104+
if responseErr != nil {
105+
log.Infof("rejected forwarding request: %s", responseErr)
106+
_ = s.Reset()
107+
return
108+
}
109+
110+
// forward to regular handler, which will close stream
111+
handler(&wrappedStream{s, request.Protocols[0], request.Remote})
112+
}
113+
114+
// validates a forwarding request is one we can accept
115+
func (fh *ForwardingHost) validateForwardingRequest(request *messages.ForwardingRequest) (network.StreamHandler, error) {
116+
fh.handlersLk.RLock()
117+
defer fh.handlersLk.RUnlock()
118+
119+
// only accept inbound requests
120+
if request.Kind != messages.ForwardingInbound {
121+
return nil, ErrNoOutboundRequests
122+
}
123+
124+
// only accept inbound requests for one protocol
125+
if len(request.Protocols) != 1 {
126+
return nil, ErrInboundRequestsAreSingleProtocol
127+
}
128+
129+
// check for a registered handler
130+
registeredHandler, ok := fh.handlers[request.Protocols[0]]
131+
132+
// don't accept inbound requests on protocols we didn't setup routing for
133+
if !ok {
134+
return nil, ErrNotRegistered{fh.ID(), request.Protocols[0]}
135+
}
136+
137+
// return the registered handler
138+
return registeredHandler, nil
139+
}
140+
141+
// Calls to "NewStream" open an outbound forwarding request to the proxy, that is then sent on
142+
// the the specified peer
143+
func (fh *ForwardingHost) NewStream(ctx context.Context, p peer.ID, protocols ...protocol.ID) (network.Stream, error) {
144+
// If there is a direct connection to the peer (or there was a connection
145+
// recently) open the stream over the direct connection
146+
if p != fh.proxy {
147+
connectedness := fh.Host.Network().Connectedness(p)
148+
if connectedness == network.Connected || connectedness == network.CanConnect {
149+
return fh.Host.NewStream(ctx, p, protocols...)
150+
}
151+
}
152+
153+
// open a forwarding stream
154+
routedStream, err := fh.Host.NewStream(ctx, fh.proxy, ForwardingProtocolID)
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// write an outbound forwarding request with the remote peer and protocol
160+
err = messages.WriteOutboundForwardingRequest(routedStream, p, protocols)
161+
if err != nil {
162+
routedStream.Close()
163+
return nil, err
164+
}
165+
166+
// read the response
167+
outbound, err := messages.ReadForwardingResponse(routedStream)
168+
// check for error writing the response
169+
if err != nil {
170+
routedStream.Close()
171+
return nil, err
172+
}
173+
// check the response was accepted
174+
if outbound.Code != messages.ResponseOk {
175+
routedStream.Close()
176+
return nil, fmt.Errorf("opening forwarded stream: %s", outbound.Message)
177+
}
178+
179+
// return a wrapped stream that appears like a normal stream with the original peer
180+
return &wrappedStream{routedStream, *outbound.ProtocolID, p}, nil
181+
}
182+
183+
// RemoveStreamHandler removes a stream handler by shutting down registered route with the original host
184+
func (fh *ForwardingHost) RemoveStreamHandler(pid protocol.ID) {
185+
// check if the handler exists
186+
fh.handlersLk.Lock()
187+
delete(fh.handlers, pid)
188+
fh.handlersLk.Unlock()
189+
}
190+
191+
// Connect for now does nothing
192+
func (fh *ForwardingHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
193+
// for now, this does nothing -- see discussion/improvements
194+
return nil
195+
}

0 commit comments

Comments
 (0)