Skip to content

Commit 7c43183

Browse files
committed
impl chain-ro
1 parent 7c40067 commit 7c43183

16 files changed

+1009
-126
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/build-dep/.*
22
/build
33
/tmp-clone
4+
/bin

Makefile

+9
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,12 @@ dist-clean:
5454

5555
gen-proxy:
5656
go run scripts/proxy-gen.go
57+
58+
build-ro: $(BUILD_DEPS)
59+
mkdir -p ./bin
60+
rm -f ./bin/chain-ro
61+
go build $(GOFLAGS) -o ./bin/chain-ro ./chain-ro/cmd
62+
go run github.com/GeertJohan/go.rice/rice append --exec ./bin/chain-ro -i ./build
63+
64+
.PHONY: lotus
65+
BINS+=lotus

api/proxy.go api/api.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,13 @@ import (
2525
"github.com/filecoin-project/lotus/node/modules/dtypes"
2626
)
2727

28-
var _ Proxy = api.FullNode(nil)
28+
var _ api.FullNode = combined(nil)
29+
30+
type combined interface {
31+
Proxy
32+
Local
33+
UnSupport
34+
}
2935

3036
// Proxy is a subset of api.FullNode.
3137
// Requests involved will be proxied to the choosen remote node

chain-ro/cmd/main.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
7+
logging "github.com/ipfs/go-log/v2"
8+
"github.com/urfave/cli/v2"
9+
"go.opencensus.io/trace"
10+
11+
"github.com/filecoin-project/lotus/lib/lotuslog"
12+
"github.com/filecoin-project/lotus/lib/tracing"
13+
)
14+
15+
const cliName = "chain-ro"
16+
17+
var log = logging.Logger(cliName)
18+
19+
func main() {
20+
lotuslog.SetupLogLevels()
21+
22+
local := []*cli.Command{}
23+
24+
jaeger := tracing.SetupJaegerTracing(cliName)
25+
defer func() {
26+
if jaeger != nil {
27+
jaeger.Flush()
28+
}
29+
}()
30+
31+
for _, cmd := range local {
32+
cmd := cmd
33+
originBefore := cmd.Before
34+
cmd.Before = func(cctx *cli.Context) error {
35+
trace.UnregisterExporter(jaeger)
36+
jaeger = tracing.SetupJaegerTracing(cliName + "/" + cmd.Name)
37+
38+
if originBefore != nil {
39+
return originBefore(cctx)
40+
}
41+
return nil
42+
}
43+
}
44+
45+
ctx, span := trace.StartSpan(context.Background(), "/cli")
46+
defer span.End()
47+
48+
app := &cli.App{
49+
Name: cliName,
50+
Usage: "read-only chain node for filecoin",
51+
EnableBashCompletion: true,
52+
Flags: []cli.Flag{},
53+
54+
Commands: local,
55+
}
56+
57+
app.Setup()
58+
app.Metadata["traceContext"] = ctx
59+
60+
if err := app.Run(os.Args); err != nil {
61+
log.Errorf("CLI error: %s", err)
62+
os.Exit(1)
63+
}
64+
}

chain-ro/cmd/rpc.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"net"
6+
"net/http"
7+
_ "net/http/pprof"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
12+
"github.com/dtynn/dix"
13+
"github.com/filecoin-project/go-jsonrpc"
14+
"github.com/filecoin-project/lotus/api"
15+
)
16+
17+
func serveRPC(ctx context.Context, listen string, full api.FullNode, stop dix.StopFunc, maxRequestSize int64) error {
18+
rpcOpts := []jsonrpc.ServerOption{}
19+
if maxRequestSize > 0 {
20+
rpcOpts = append(rpcOpts, jsonrpc.WithMaxRequestSize(maxRequestSize))
21+
}
22+
23+
rpcServer := jsonrpc.NewServer(rpcOpts...)
24+
rpcServer.Register("Filecoin", full)
25+
26+
http.Handle("/rpc/v0", rpcServer)
27+
28+
server := http.Server{
29+
Addr: listen,
30+
Handler: http.DefaultServeMux,
31+
BaseContext: func(net.Listener) context.Context {
32+
return ctx
33+
},
34+
}
35+
36+
sigCh := make(chan os.Signal, 2)
37+
38+
go func() {
39+
select {
40+
case <-ctx.Done():
41+
42+
case sig := <-sigCh:
43+
log.Infof("signal %s captured", sig)
44+
}
45+
46+
if err := server.Shutdown(context.Background()); err != nil {
47+
log.Warnf("shutdown http server: %s", err)
48+
}
49+
50+
if err := stop(context.Background()); err != nil {
51+
log.Warnf("call app stop func: %s", err)
52+
}
53+
54+
log.Sync()
55+
}()
56+
57+
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
58+
59+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
60+
return err
61+
}
62+
63+
log.Info("gracefull down")
64+
return nil
65+
}

chain-ro/cmd/run.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"github.com/filecoin-project/lotus/api"
7+
"github.com/urfave/cli/v2"
8+
9+
"github.com/dtynn/chain-co/chain-ro/service"
10+
"github.com/dtynn/chain-co/dep"
11+
)
12+
13+
var runCmd = &cli.Command{
14+
Name: "run",
15+
Usage: "start the chain-ro daemon",
16+
Flags: []cli.Flag{
17+
&cli.StringFlag{
18+
Name: "listen",
19+
Usage: "listen address for the service",
20+
Value: ":1234",
21+
},
22+
&cli.Int64Flag{
23+
Name: "max-req-size",
24+
Usage: "max request size",
25+
Value: 10 << 20,
26+
},
27+
&cli.StringSliceFlag{
28+
Name: "node",
29+
Usage: "node info",
30+
},
31+
},
32+
Action: func(cctx *cli.Context) error {
33+
appCtx, appCancel := context.WithCancel(cctx.Context)
34+
defer appCancel()
35+
36+
var full api.FullNode
37+
38+
stop, err := service.Build(
39+
appCtx,
40+
41+
dep.MetricsCtxOption(appCtx, cliName),
42+
43+
service.ParseNodeInfoList(cctx.StringSlice("node")),
44+
service.FullNode(&full),
45+
)
46+
47+
if err != nil {
48+
return nil
49+
}
50+
51+
defer stop(context.Background())
52+
53+
return serveRPC(
54+
appCtx,
55+
cctx.String("listen"),
56+
full,
57+
func(ctx context.Context) error {
58+
appCancel()
59+
stop(ctx)
60+
return nil
61+
},
62+
cctx.Int64("max-req-size"),
63+
)
64+
},
65+
}

chain-ro/service/dep.go

+166
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package service
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/dtynn/dix"
9+
"github.com/filecoin-project/lotus/api"
10+
"github.com/filecoin-project/lotus/chain/types"
11+
"go.uber.org/fx"
12+
13+
"github.com/dtynn/chain-co/co"
14+
"github.com/dtynn/chain-co/proxy"
15+
)
16+
17+
const ExtractFullNodeAPIKey dix.Invoke = 1
18+
19+
func Build(ctx context.Context, overrides ...dix.Option) (dix.StopFunc, error) {
20+
opts := []dix.Option{
21+
dix.Override(new(co.NodeOption), co.DefaultNodeOption),
22+
dix.Override(new(*co.Ctx), co.NewCtx),
23+
dix.Override(new(*co.Connector), co.NewConnector),
24+
dix.Override(new(*co.Coordinator), buildCoordinator),
25+
dix.Override(new(*co.Selector), co.NewSelector),
26+
dix.Override(new(*proxy.Proxy), buildProxyAPI),
27+
dix.Override(new(*proxy.Local), buildLocalAPI),
28+
dix.Override(new(*proxy.UnSupport), buildUnSupportAPI),
29+
}
30+
opts = append(opts, overrides...)
31+
return dix.New(ctx, opts...)
32+
}
33+
34+
func FullNode(full *api.FullNode) dix.Option {
35+
return dix.Override(ExtractFullNodeAPIKey, func(srv Service) error {
36+
*full = &srv
37+
return nil
38+
})
39+
}
40+
41+
func ParseNodeInfoList(raws []string) dix.Option {
42+
return dix.Override(new(co.NodeInfoList), func() (co.NodeInfoList, error) {
43+
list := make(co.NodeInfoList, 0, len(raws))
44+
for _, str := range raws {
45+
info := co.ParseNodeInfo(str)
46+
if _, err := info.DialArgs(); err != nil {
47+
return nil, fmt.Errorf("invalid node info: %s", str)
48+
}
49+
50+
list = append(list, info)
51+
}
52+
53+
return list, nil
54+
})
55+
}
56+
57+
func buildCoordinator(lc fx.Lifecycle, ctx *co.Ctx, connector *co.Connector, infos co.NodeInfoList, sel *co.Selector) (*co.Coordinator, error) {
58+
nodes := make([]*co.Node, 0, len(infos))
59+
allDone := false
60+
defer func() {
61+
if !allDone {
62+
for i := range nodes {
63+
nodes[i].Stop()
64+
}
65+
}
66+
}()
67+
68+
var head *types.TipSet
69+
weight := types.NewInt(0)
70+
71+
for i := range infos {
72+
info := infos[i]
73+
nlog := log.With("host", info.Host)
74+
75+
node, err := connector.Connect(info)
76+
if err != nil {
77+
nlog.Errorf("connect failed: %s", err)
78+
continue
79+
}
80+
81+
full := node.FullNode()
82+
h, w, err := getHeadCandidate(full)
83+
if err != nil {
84+
node.Stop()
85+
nlog.Errorf("failed to get head: %s", err)
86+
continue
87+
}
88+
89+
if head == nil || w.GreaterThan(weight) {
90+
head = h
91+
weight = w
92+
}
93+
94+
nodes = append(nodes, node)
95+
}
96+
97+
if len(nodes) == 0 {
98+
return nil, fmt.Errorf("no available node")
99+
}
100+
101+
coordinator, err := co.NewCoordinator(ctx, head, weight, sel)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
lc.Append(fx.Hook{
107+
OnStart: func(context.Context) error {
108+
go coordinator.Start()
109+
sel.ReplaceNodes(nodes, nil, false)
110+
return nil
111+
},
112+
OnStop: func(context.Context) error {
113+
coordinator.Stop()
114+
return nil
115+
},
116+
})
117+
118+
allDone = true
119+
return coordinator, nil
120+
}
121+
122+
func getHeadCandidate(full api.FullNode) (*types.TipSet, types.BigInt, error) {
123+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
124+
defer cancel()
125+
126+
head, err := full.ChainHead(ctx)
127+
if err != nil {
128+
return nil, types.BigInt{}, err
129+
}
130+
131+
weight, err := full.ChainTipSetWeight(ctx, head.Key())
132+
if err != nil {
133+
return nil, types.BigInt{}, err
134+
}
135+
136+
return head, weight, nil
137+
}
138+
139+
func buildProxyAPI(sel *co.Selector) *proxy.Proxy {
140+
return &proxy.Proxy{
141+
Select: func() (proxy.ProxyAPI, error) {
142+
node, err := sel.Select()
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
return node.FullNode(), nil
148+
},
149+
}
150+
}
151+
152+
func buildLocalAPI(lsrv LocalChainService) *proxy.Local {
153+
return &proxy.Local{
154+
Select: func() (proxy.LocalAPI, error) {
155+
return &lsrv, nil
156+
},
157+
}
158+
}
159+
160+
func buildUnSupportAPI() *proxy.UnSupport {
161+
return &proxy.UnSupport{
162+
Select: func() (proxy.UnSupportAPI, error) {
163+
return nil, fmt.Errorf("api not supported")
164+
},
165+
}
166+
}

0 commit comments

Comments
 (0)