Skip to content

Commit fb6ba7d

Browse files
authored
Fix retrieval worker should pull from earliest task (data-preservation-programs#9)
1 parent 491ed3c commit fb6ba7d

File tree

9 files changed

+384
-193
lines changed

9 files changed

+384
-193
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@
2727
/repdao
2828
/repdao_dp
2929
/vendor
30+
/spcoverage

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ linters:
5151
- gochecknoglobals
5252
- funlen
5353
- maintidx
54+
- depguard
5455

5556
linters-settings:
5657
revive:

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ build:
99
go build -o filplus_integration ./integration/filplus
1010
go build -o repdao ./integration/repdao
1111
go build -o repdao_dp ./integration/repdao_dp
12+
go build -o spcoverage ./integration/spcoverage
1213

1314
lint:
1415
gofmt -s -w .

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ require (
2525
github.com/multiformats/go-multiaddr v0.9.0
2626
github.com/multiformats/go-multistream v0.4.1
2727
github.com/pkg/errors v0.9.1
28-
github.com/stretchr/testify v1.8.2
29-
github.com/urfave/cli/v2 v2.24.4
28+
github.com/rjNemo/underscore v0.6.1
29+
github.com/stretchr/testify v1.8.4
30+
github.com/urfave/cli/v2 v2.25.7
3031
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa
3132
github.com/ybbus/jsonrpc/v3 v3.1.4
3233
go.mongodb.org/mongo-driver v1.11.3
33-
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
34+
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
3435
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
3536
)
3637

go.sum

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,8 @@ github.com/quic-go/webtransport-go v0.5.2/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2Gk
669669
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
670670
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
671671
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
672+
github.com/rjNemo/underscore v0.6.1 h1:9IK9TWIDToFFWwoYZpD8cEKgmJ2N73bP/UIZJYEY+wU=
673+
github.com/rjNemo/underscore v0.6.1/go.mod h1:PwVP2XGRgIpWUkPbb8huhJ9xNWk+0xv9gM8uRpj4r0k=
672674
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
673675
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
674676
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
@@ -730,8 +732,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
730732
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
731733
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
732734
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
733-
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
734-
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
735+
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
736+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
735737
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
736738
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
737739
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
@@ -745,8 +747,8 @@ github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxW
745747
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
746748
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
747749
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
748-
github.com/urfave/cli/v2 v2.24.4 h1:0gyJJEBYtCV87zI/x2nZCPyDxD51K6xM8SkwjHFCNEU=
749-
github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
750+
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
751+
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
750752
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
751753
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
752754
github.com/warpfork/go-testmark v0.11.0 h1:J6LnV8KpceDvo7spaNU4+DauH2n1x+6RaO2rJrmpQ9U=
@@ -867,8 +869,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
867869
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
868870
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
869871
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
870-
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
871-
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
872+
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc=
873+
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
872874
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
873875
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
874876
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

integration/filplus/main.go

Lines changed: 3 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,17 @@ package main
22

33
import (
44
"context"
5-
"github.com/data-preservation-programs/RetrievalBot/pkg/convert"
5+
"github.com/data-preservation-programs/RetrievalBot/integration/filplus/util"
66
"github.com/data-preservation-programs/RetrievalBot/pkg/env"
77
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
8-
"github.com/data-preservation-programs/RetrievalBot/pkg/requesterror"
98
"github.com/data-preservation-programs/RetrievalBot/pkg/resolver"
109
"github.com/data-preservation-programs/RetrievalBot/pkg/task"
11-
"github.com/ipfs/go-cid"
1210
logging "github.com/ipfs/go-log/v2"
1311
_ "github.com/joho/godotenv/autoload"
14-
"github.com/libp2p/go-libp2p/core/peer"
1512
"github.com/pkg/errors"
1613
"go.mongodb.org/mongo-driver/bson"
1714
"go.mongodb.org/mongo-driver/mongo"
1815
"go.mongodb.org/mongo-driver/mongo/options"
19-
"golang.org/x/exp/slices"
20-
"strconv"
2116
"time"
2217
)
2318

@@ -87,7 +82,7 @@ func NewFilPlusIntegration() *FilPlusIntegration {
8782
}
8883

8984
// Check public IP address
90-
ipInfo, err := resolver.GetPublicIPInfo(context.TODO(), "", "")
85+
ipInfo, err := resolver.GetPublicIPInfo(ctx, "", "")
9186
if err != nil {
9287
panic(err)
9388
}
@@ -107,76 +102,6 @@ func NewFilPlusIntegration() *FilPlusIntegration {
107102
}
108103
}
109104

110-
var moduleMetadataMap = map[task.ModuleName]map[string]string{
111-
task.GraphSync: {
112-
"assume_label": "true",
113-
"retrieve_type": "root_block",
114-
},
115-
task.Bitswap: {
116-
"assume_label": "true",
117-
"retrieve_type": "root_block",
118-
},
119-
task.HTTP: {
120-
"retrieve_type": "piece",
121-
"retrieve_size": "1048576",
122-
},
123-
}
124-
125-
func (f *FilPlusIntegration) addErrorResults(results []interface{}, document model.DealState,
126-
providerInfo resolver.MinerInfo, location resolver.IPInfo,
127-
errorCode task.ErrorCode, errorMessage string) []interface{} {
128-
for module, metadata := range moduleMetadataMap {
129-
newMetadata := make(map[string]string)
130-
for k, v := range metadata {
131-
newMetadata[k] = v
132-
}
133-
newMetadata["deal_id"] = strconv.Itoa(int(document.DealID))
134-
newMetadata["client"] = document.Client
135-
results = append(results, task.Result{
136-
Task: task.Task{
137-
Requester: f.requester,
138-
Module: module,
139-
Metadata: newMetadata,
140-
Provider: task.Provider{
141-
ID: document.Provider,
142-
PeerID: providerInfo.PeerId,
143-
Multiaddrs: convert.MultiaddrsBytesToStringArraySkippingError(providerInfo.Multiaddrs),
144-
City: location.City,
145-
Region: location.Region,
146-
Country: location.Country,
147-
Continent: location.Continent,
148-
},
149-
Content: task.Content{
150-
CID: document.Label,
151-
},
152-
CreatedAt: time.Now().UTC(),
153-
Timeout: env.GetDuration(env.FilplusIntegrationTaskTimeout, 15*time.Second)},
154-
Retriever: task.Retriever{
155-
PublicIP: f.ipInfo.IP,
156-
City: f.ipInfo.City,
157-
Region: f.ipInfo.Region,
158-
Country: f.ipInfo.Country,
159-
Continent: f.ipInfo.Continent,
160-
ASN: f.ipInfo.ASN,
161-
ISP: f.ipInfo.ISP,
162-
Latitude: f.ipInfo.Latitude,
163-
Longitude: f.ipInfo.Longitude,
164-
},
165-
Result: task.RetrievalResult{
166-
Success: false,
167-
ErrorCode: errorCode,
168-
ErrorMessage: errorMessage,
169-
TTFB: 0,
170-
Speed: 0,
171-
Duration: 0,
172-
Downloaded: 0,
173-
},
174-
CreatedAt: time.Now().UTC(),
175-
})
176-
}
177-
return results
178-
}
179-
180105
func (f *FilPlusIntegration) RunOnce(ctx context.Context) error {
181106
logger.Info("start running filplus integration")
182107

@@ -220,112 +145,7 @@ func (f *FilPlusIntegration) RunOnce(ctx context.Context) error {
220145
}
221146

222147
documents = RandomObjects(documents, len(documents)/2, f.randConst)
223-
tasks := make([]interface{}, 0)
224-
results := make([]interface{}, 0)
225-
// Insert the documents into task queue
226-
for _, document := range documents {
227-
// If the label is a correct CID, assume it is the payload CID and try GraphSync and Bitswap retrieval
228-
labelCID, err := cid.Decode(document.Label)
229-
if err != nil {
230-
logger.With("label", document.Label, "deal_id", document.DealID).
231-
Debug("failed to decode label as CID")
232-
continue
233-
}
234-
235-
isPayloadCID := true
236-
// Skip graphsync and bitswap if the cid is not decodable, i.e. it is a pieceCID
237-
if !slices.Contains([]uint64{cid.Raw, cid.DagCBOR, cid.DagProtobuf, cid.DagJSON, cid.DagJOSE},
238-
labelCID.Prefix().Codec) {
239-
logger.With("provider", document.Provider, "deal_id", document.DealID,
240-
"label", document.Label, "codec", labelCID.Prefix().Codec).
241-
Info("Skip Bitswap and Graphsync because the Label is likely not a payload CID")
242-
isPayloadCID = false
243-
}
244-
245-
providerInfo, err := f.providerResolver.ResolveProvider(ctx, document.Provider)
246-
if err != nil {
247-
logger.With("provider", document.Provider, "deal_id", document.DealID).
248-
Error("failed to resolve provider")
249-
continue
250-
}
251-
252-
location, err := f.locationResolver.ResolveMultiaddrsBytes(ctx, providerInfo.Multiaddrs)
253-
if err != nil {
254-
if errors.As(err, &requesterror.BogonIPError{}) ||
255-
errors.As(err, &requesterror.InvalidIPError{}) ||
256-
errors.As(err, &requesterror.HostLookupError{}) ||
257-
errors.As(err, &requesterror.NoValidMultiAddrError{}) {
258-
results = f.addErrorResults(results, document, providerInfo, location,
259-
task.NoValidMultiAddrs, err.Error())
260-
} else {
261-
logger.With("provider", document.Provider, "deal_id", document.DealID, "err", err).
262-
Error("failed to resolve provider location")
263-
}
264-
continue
265-
}
266-
267-
_, err = peer.Decode(providerInfo.PeerId)
268-
if err != nil {
269-
logger.With("provider", document.Provider, "deal_id", document.DealID, "peerID", providerInfo.PeerId,
270-
"err", err).
271-
Info("failed to decode peerID")
272-
results = f.addErrorResults(results, document, providerInfo, location,
273-
task.InvalidPeerID, err.Error())
274-
continue
275-
}
276-
277-
if isPayloadCID {
278-
for _, module := range []task.ModuleName{task.GraphSync, task.Bitswap} {
279-
tasks = append(tasks, task.Task{
280-
Requester: f.requester,
281-
Module: module,
282-
Metadata: map[string]string{
283-
"deal_id": strconv.Itoa(int(document.DealID)),
284-
"client": document.Client,
285-
"assume_label": "true",
286-
"retrieve_type": "root_block"},
287-
Provider: task.Provider{
288-
ID: document.Provider,
289-
PeerID: providerInfo.PeerId,
290-
Multiaddrs: convert.MultiaddrsBytesToStringArraySkippingError(providerInfo.Multiaddrs),
291-
City: location.City,
292-
Region: location.Region,
293-
Country: location.Country,
294-
Continent: location.Continent,
295-
},
296-
Content: task.Content{
297-
CID: document.Label,
298-
},
299-
CreatedAt: time.Now().UTC(),
300-
Timeout: env.GetDuration(env.FilplusIntegrationTaskTimeout, 15*time.Second),
301-
})
302-
}
303-
}
304-
305-
tasks = append(tasks, task.Task{
306-
Requester: f.requester,
307-
Module: task.HTTP,
308-
Metadata: map[string]string{
309-
"deal_id": strconv.Itoa(int(document.DealID)),
310-
"client": document.Client,
311-
"retrieve_type": "piece",
312-
"retrieve_size": "1048576"},
313-
Provider: task.Provider{
314-
ID: document.Provider,
315-
PeerID: providerInfo.PeerId,
316-
Multiaddrs: convert.MultiaddrsBytesToStringArraySkippingError(providerInfo.Multiaddrs),
317-
City: location.City,
318-
Region: location.Region,
319-
Country: location.Country,
320-
Continent: location.Continent,
321-
},
322-
Content: task.Content{
323-
CID: document.PieceCID,
324-
},
325-
CreatedAt: time.Now().UTC(),
326-
Timeout: env.GetDuration(env.FilplusIntegrationTaskTimeout, 15*time.Second),
327-
})
328-
}
148+
tasks, results := util.AddTasks(ctx, f.requester, f.ipInfo, documents, f.locationResolver, f.providerResolver)
329149

330150
if len(tasks) > 0 {
331151
_, err = f.taskCollection.InsertMany(ctx, tasks)

0 commit comments

Comments
 (0)