Skip to content

Commit 99403c9

Browse files
committed
feat: add PushTasksTruncated which only push a limited amount of tasks
1 parent 8e40be9 commit 99403c9

File tree

5 files changed

+42
-4
lines changed

5 files changed

+42
-4
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.19
44

55
require (
66
github.com/benbjohnson/clock v1.3.0
7-
github.com/ipfs/go-ipfs-pq v0.0.2
7+
github.com/ipfs/go-ipfs-pq v0.0.3
88
github.com/libp2p/go-libp2p v0.22.0
99
)
1010

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
88
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
99
github.com/ipfs/go-cid v0.2.0 h1:01JTiihFq9en9Vz0lc0VDWvZe/uBonGpzo4THP0vcQ0=
1010
github.com/ipfs/go-cid v0.2.0/go.mod h1:P+HXFDF4CVhaVayiEb4wkAy7zBHxBwsJyt0Y5U6MLro=
11-
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
12-
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
11+
github.com/ipfs/go-ipfs-pq v0.0.3 h1:YpoHVJB+jzK15mr/xsWC574tyDLkezVrDNeaalQBsTE=
12+
github.com/ipfs/go-ipfs-pq v0.0.3/go.mod h1:btNw5hsHBpRcSSgZtiNm/SLj5gYIZ18AKtv3kERkRb4=
1313
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
1414
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
1515
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=

peertaskqueue.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package peertaskqueue
22

33
import (
4+
"math"
45
"sync"
56

67
pq "github.com/ipfs/go-ipfs-pq"
@@ -204,6 +205,13 @@ func (ptq *PeerTaskQueue) PeerTopics(p peer.ID) *peertracker.PeerTrackerTopics {
204205

205206
// PushTasks adds a new group of tasks for the given peer to the queue
206207
func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
208+
ptq.PushTasksTruncated(math.MaxUint, to, tasks...)
209+
}
210+
211+
// PushTasksTruncated is like PushTasks but it will not grow that peers's queue beyond n.
212+
// When truncation happen we will keep older tasks in the queue to avoid some infinite
213+
// tasks rotations if we are continously receiving work faster than we process it.
214+
func (ptq *PeerTaskQueue) PushTasksTruncated(n uint, to peer.ID, tasks ...peertask.Task) {
207215
ptq.lock.Lock()
208216
defer ptq.lock.Unlock()
209217

@@ -219,7 +227,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
219227
ptq.callHooks(to, peerAdded)
220228
}
221229

222-
peerTracker.PushTasks(tasks...)
230+
peerTracker.PushTasksTruncated(n, tasks...)
223231
ptq.pQueue.Update(peerTracker.Index())
224232
}
225233

peertracker/peertracker.go

+23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package peertracker
22

33
import (
4+
"math"
5+
"math/bits"
46
"sync"
57

68
"github.com/benbjohnson/clock"
@@ -205,11 +207,32 @@ func (p *PeerTracker) SetIndex(i int) {
205207

206208
// PushTasks adds a group of tasks onto a peer's queue
207209
func (p *PeerTracker) PushTasks(tasks ...peertask.Task) {
210+
p.PushTasksTruncated(math.MaxUint, tasks...)
211+
}
212+
213+
// PushTasksTruncated is like PushTasks but it will never grow the queue more than n.
214+
// When truncation happen we will keep older tasks in the queue to avoid some infinite
215+
// tasks rotations if we are continously receiving work faster than we process it.
216+
func (p *PeerTracker) PushTasksTruncated(n uint, tasks ...peertask.Task) {
208217
now := clockInstance.Now()
209218

210219
p.activelk.Lock()
211220
defer p.activelk.Unlock()
212221

222+
l := p.taskQueue.Len()
223+
if l < 0 {
224+
panic("negative length")
225+
}
226+
227+
if wouldBe := uint(l + len(tasks)); wouldBe > n {
228+
available, o := bits.Sub(n, uint(l), 0)
229+
if o != 0 {
230+
// happen if you mix Truncated and Untrucated or varies n.
231+
available = 0
232+
}
233+
tasks = tasks[:available]
234+
}
235+
213236
for _, task := range tasks {
214237
// If the new task doesn't add any more information over what we
215238
// already have in the active queue, then we can skip the new task

peertracker/peertracker_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ func TestPushPop(t *testing.T) {
3535
},
3636
}
3737
tracker.PushTasks(tasks...)
38+
39+
tracker.PushTasksTruncated(1, peertask.Task{
40+
Topic: "2",
41+
Priority: 2,
42+
Work: 20,
43+
})
44+
3845
popped, _ := tracker.PopTasks(100)
3946
if len(popped) != 1 {
4047
t.Fatal("Expected 1 task")

0 commit comments

Comments
 (0)