@@ -22,6 +22,7 @@ import (
22
22
crand "crypto/rand"
23
23
"crypto/sha256"
24
24
"fmt"
25
+ "math"
25
26
"runtime"
26
27
"sync"
27
28
"time"
@@ -30,6 +31,7 @@ import (
30
31
"github.com/ethereum/go-ethereum/crypto"
31
32
"github.com/ethereum/go-ethereum/log"
32
33
"github.com/ethereum/go-ethereum/p2p"
34
+ "github.com/ethereum/go-ethereum/rlp"
33
35
"github.com/ethereum/go-ethereum/rpc"
34
36
"github.com/syndtr/goleveldb/leveldb/errors"
35
37
"golang.org/x/crypto/pbkdf2"
@@ -74,6 +76,8 @@ type Whisper struct {
74
76
75
77
settings syncmap.Map // holds configuration settings that can be dynamically changed
76
78
79
+ reactionAllowance int // maximum time in seconds allowed to process the whisper-related messages
80
+
77
81
statsMu sync.Mutex // guard stats
78
82
stats Statistics // Statistics of whisper node
79
83
@@ -87,14 +91,15 @@ func New(cfg *Config) *Whisper {
87
91
}
88
92
89
93
whisper := & Whisper {
90
- privateKeys : make (map [string ]* ecdsa.PrivateKey ),
91
- symKeys : make (map [string ][]byte ),
92
- envelopes : make (map [common.Hash ]* Envelope ),
93
- expirations : make (map [uint32 ]* set.SetNonTS ),
94
- peers : make (map [* Peer ]struct {}),
95
- messageQueue : make (chan * Envelope , messageQueueLimit ),
96
- p2pMsgQueue : make (chan * Envelope , messageQueueLimit ),
97
- quit : make (chan struct {}),
94
+ privateKeys : make (map [string ]* ecdsa.PrivateKey ),
95
+ symKeys : make (map [string ][]byte ),
96
+ envelopes : make (map [common.Hash ]* Envelope ),
97
+ expirations : make (map [uint32 ]* set.SetNonTS ),
98
+ peers : make (map [* Peer ]struct {}),
99
+ messageQueue : make (chan * Envelope , messageQueueLimit ),
100
+ p2pMsgQueue : make (chan * Envelope , messageQueueLimit ),
101
+ quit : make (chan struct {}),
102
+ reactionAllowance : SynchAllowance ,
98
103
}
99
104
100
105
whisper .filters = NewFilters (whisper )
@@ -177,13 +182,50 @@ func (w *Whisper) SetMaxMessageSize(size uint32) error {
177
182
178
183
// SetMinimumPoW sets the minimal PoW required by this node
179
184
func (w * Whisper ) SetMinimumPoW (val float64 ) error {
180
- if val <= 0.0 {
185
+ if val < 0.0 {
181
186
return fmt .Errorf ("invalid PoW: %f" , val )
182
187
}
183
- w .settings .Store (minPowIdx , val )
188
+
189
+ w .notifyPeersAboutPowRequirementChange (val )
190
+
191
+ go func () {
192
+ // allow some time before all the peers have processed the notification
193
+ time .Sleep (time .Duration (w .reactionAllowance ) * time .Second )
194
+ w .settings .Store (minPowIdx , val )
195
+ }()
196
+
184
197
return nil
185
198
}
186
199
200
+ // SetMinimumPoW sets the minimal PoW in test environment
201
+ func (w * Whisper ) SetMinimumPowTest (val float64 ) {
202
+ w .notifyPeersAboutPowRequirementChange (val )
203
+ w .settings .Store (minPowIdx , val )
204
+ }
205
+
206
+ func (w * Whisper ) notifyPeersAboutPowRequirementChange (pow float64 ) {
207
+ arr := make ([]* Peer , len (w .peers ))
208
+ i := 0
209
+
210
+ w .peerMu .Lock ()
211
+ for p := range w .peers {
212
+ arr [i ] = p
213
+ i ++
214
+ }
215
+ w .peerMu .Unlock ()
216
+
217
+ for _ , p := range arr {
218
+ err := p .notifyAboutPowRequirementChange (pow )
219
+ if err != nil {
220
+ // allow one retry
221
+ err = p .notifyAboutPowRequirementChange (pow )
222
+ }
223
+ if err != nil {
224
+ log .Warn ("oversized message received" , "peer" , p .ID (), "error" , err )
225
+ }
226
+ }
227
+ }
228
+
187
229
// getPeer retrieves peer by ID
188
230
func (w * Whisper ) getPeer (peerID []byte ) (* Peer , error ) {
189
231
w .peerMu .Lock ()
@@ -233,7 +275,7 @@ func (w *Whisper) SendP2PMessage(peerID []byte, envelope *Envelope) error {
233
275
234
276
// SendP2PDirect sends a peer-to-peer message to a specific peer.
235
277
func (w * Whisper ) SendP2PDirect (peer * Peer , envelope * Envelope ) error {
236
- return p2p .Send (peer .ws , p2pCode , envelope )
278
+ return p2p .Send (peer .ws , p2pMessageCode , envelope )
237
279
}
238
280
239
281
// NewKeyPair generates a new cryptographic identity for the client, and injects
@@ -536,7 +578,22 @@ func (wh *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
536
578
if trouble {
537
579
return errors .New ("invalid envelope" )
538
580
}
539
- case p2pCode :
581
+ case powRequirementCode :
582
+ s := rlp .NewStream (packet .Payload , uint64 (packet .Size ))
583
+ i , err := s .Uint ()
584
+ if err != nil {
585
+ log .Warn ("failed to decode powRequirementCode message, peer will be disconnected" , "peer" , p .peer .ID (), "err" , err )
586
+ return errors .New ("invalid powRequirementCode message" )
587
+ }
588
+ f := math .Float64frombits (i )
589
+ if math .IsInf (f , 0 ) || math .IsNaN (f ) || f < 0.0 {
590
+ log .Warn ("invalid value in powRequirementCode message, peer will be disconnected" , "peer" , p .peer .ID (), "err" , err )
591
+ return errors .New ("invalid value in powRequirementCode message" )
592
+ }
593
+ p .powRequirement = f
594
+ case bloomFilterExCode :
595
+ // to be implemented
596
+ case p2pMessageCode :
540
597
// peer-to-peer message, sent directly to peer bypassing PoW checks, etc.
541
598
// this message is not supposed to be forwarded to other peers, and
542
599
// therefore might not satisfy the PoW, expiry and other requirements.
@@ -599,7 +656,10 @@ func (wh *Whisper) add(envelope *Envelope) (bool, error) {
599
656
600
657
if envelope .PoW () < wh .MinPow () {
601
658
log .Debug ("envelope with low PoW dropped" , "PoW" , envelope .PoW (), "hash" , envelope .Hash ().Hex ())
602
- return false , nil // drop envelope without error
659
+ return false , nil // drop envelope without error for now
660
+
661
+ // once the status message includes the PoW requirement, an error should be returned here:
662
+ //return false, fmt.Errorf("envelope with low PoW dropped: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
603
663
}
604
664
605
665
hash := envelope .Hash ()
0 commit comments