Skip to content

Commit 8df83eb

Browse files
authored
Improve lifecycle management (#416)
* Improve lifecycle management 1. Merge the client and the runner. The distinction was unclear and the client/runner/module tended to reach into each other. This change merges the client/runner and then separates the new "runner" from the module as much as possible. 2. Completely stop/discard the runner when rebootstrapping. The new logic carefully waits for all components to stop before moving on. 3. Simplify locking and make sure we take the locks where appropriate. 4. Merge bootstrap and re-configure logic. The dynamic manifest client no longer cares about _when_ a manifest should be applied, it simply gives it to the module (F3) and let's F3 us its normal bootstrap logic. Finally, I've improved the tests to: 1. Always on exit (checking for errors). 2. Never fail from goroutines. 3. Correctly wait for manifest changes (previously, it would wait for at least one node to change manifests). NOTEs: 1. This removes the ability to reconfig without rebootstrap, but preserves the ability to _pause_ without rebootstrap. 2. This causes bootstrap to start at the time the bootstrap epoch _should_ have happened instead of starting at the next non-null epoch. In practice, this should behave better as all nodes will start at the same time (and will look back 900 epochs anyways). * Address feedback * Test delayed bootstrap And have the test chain "catch up" if it's too far behind. * fully wait for finality before bootstrapping
1 parent de5c871 commit 8df83eb

File tree

13 files changed

+911
-1003
lines changed

13 files changed

+911
-1003
lines changed

cmd/f3/manifest.go

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/libp2p/go-libp2p/core/crypto"
1515
"github.com/libp2p/go-libp2p/core/peer"
1616
"github.com/urfave/cli/v2"
17+
"golang.org/x/sync/errgroup"
1718
"golang.org/x/xerrors"
1819
)
1920

@@ -137,6 +138,8 @@ var manifestServeCmd = cli.Command{
137138
return xerrors.Errorf("initializing libp2p host: %w", err)
138139
}
139140

141+
defer func() { _ = host.Close() }()
142+
140143
// Connect to all bootstrap addresses once. This should be sufficient to build
141144
// the pubsub mesh, if not then we need to periodically re-connect and/or pull in
142145
// the Lotus bootstrapping, which includes DHT connectivity.
@@ -171,15 +174,15 @@ var manifestServeCmd = cli.Command{
171174
}
172175

173176
manifestPath := c.String("manifest")
174-
loadManifestAndVersion := func() (manifest.Manifest, manifest.Version, error) {
177+
loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) {
175178

176179
m, err := loadManifest(manifestPath)
177180
if err != nil {
178-
return manifest.Manifest{}, "", xerrors.Errorf("loading manifest: %w", err)
181+
return nil, "", xerrors.Errorf("loading manifest: %w", err)
179182
}
180183
version, err := m.Version()
181184
if err != nil {
182-
return manifest.Manifest{}, "", xerrors.Errorf("versioning manifest: %w", err)
185+
return nil, "", xerrors.Errorf("versioning manifest: %w", err)
183186
}
184187
return m, version, nil
185188
}
@@ -200,46 +203,49 @@ var manifestServeCmd = cli.Command{
200203
}
201204
_, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion)
202205

203-
go func() {
204-
sender.Start(c.Context)
205-
}()
206-
defer func() {
207-
sender.Stop()
208-
_ = host.Close()
209-
}()
210-
211-
checkTicker := time.NewTicker(c.Duration("checkInterval"))
212-
for c.Context.Err() == nil {
213-
select {
214-
case <-c.Context.Done():
215-
return c.Context.Err()
216-
case <-checkTicker.C:
217-
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
218-
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
219-
} else if manifestVersion != nextManifestVersion {
220-
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
221-
sender.UpdateManifest(nextManifest)
222-
manifestVersion = nextManifestVersion
206+
checkInterval := c.Duration("checkInterval")
207+
208+
errgrp, ctx := errgroup.WithContext(c.Context)
209+
errgrp.Go(func() error { return sender.Run(ctx) })
210+
errgrp.Go(func() error {
211+
checkTicker := time.NewTicker(checkInterval)
212+
defer checkTicker.Stop()
213+
214+
for ctx.Err() == nil {
215+
select {
216+
case <-ctx.Done():
217+
return nil
218+
case <-checkTicker.C:
219+
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
220+
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
221+
} else if manifestVersion != nextManifestVersion {
222+
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
223+
sender.UpdateManifest(nextManifest)
224+
manifestVersion = nextManifestVersion
225+
}
223226
}
224227
}
225-
}
226-
return nil
228+
229+
return nil
230+
})
231+
232+
return errgrp.Wait()
227233
},
228234
}
229235

230-
func getManifest(c *cli.Context) (manifest.Manifest, error) {
236+
func getManifest(c *cli.Context) (*manifest.Manifest, error) {
231237
manifestPath := c.String("manifest")
232238
return loadManifest(manifestPath)
233239
}
234240

235-
func loadManifest(path string) (manifest.Manifest, error) {
241+
func loadManifest(path string) (*manifest.Manifest, error) {
236242
f, err := os.Open(path)
237243
if err != nil {
238-
return manifest.Manifest{}, xerrors.Errorf("opening %s to load manifest: %w", path, err)
244+
return nil, xerrors.Errorf("opening %s to load manifest: %w", path, err)
239245
}
240246
defer f.Close()
241247
var m manifest.Manifest
242248

243249
err = m.Unmarshal(f)
244-
return m, err
250+
return &m, err
245251
}

cmd/f3/run.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,20 +121,18 @@ var runCmd = cli.Command{
121121
return xerrors.Errorf("creating module: %w", err)
122122
}
123123

124-
mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module))
125124
go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend)
126125

127-
return module.Run(ctx)
126+
if err := module.Start(ctx); err != nil {
127+
return nil
128+
}
129+
<-ctx.Done()
130+
return module.Stop(context.Background())
128131
},
129132
}
130133

131134
func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) {
132-
for {
133-
select {
134-
case <-ctx.Done():
135-
return
136-
default:
137-
}
135+
for ctx.Err() == nil {
138136

139137
ch := make(chan *gpbft.MessageBuilder, 4)
140138
module.SubscribeForMessagesToSign(ch)

ec/powerdelta.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package ec
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/filecoin-project/go-f3/certs"
8+
"github.com/filecoin-project/go-f3/gpbft"
9+
)
10+
11+
func WithModifiedPower(backend Backend, delta []certs.PowerTableDelta) Backend {
12+
return &withModifiedPower{
13+
Backend: backend,
14+
delta: delta,
15+
}
16+
}
17+
18+
type withModifiedPower struct {
19+
Backend
20+
delta []certs.PowerTableDelta
21+
}
22+
23+
func (b *withModifiedPower) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) {
24+
pt, err := b.Backend.GetPowerTable(ctx, ts)
25+
if err != nil {
26+
return nil, fmt.Errorf("getting power table: %w", err)
27+
}
28+
return certs.ApplyPowerTableDiffs(pt, b.delta)
29+
}

0 commit comments

Comments
 (0)