Skip to content

Commit 491ed3c

Browse files
committed
Fix repdao for dp instances to allow one instance per run
1 parent 470de55 commit 491ed3c

File tree

1 file changed

+155
-159
lines changed

1 file changed

+155
-159
lines changed

integration/repdao_dp/main.go

Lines changed: 155 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -40,188 +40,184 @@ func main() {
4040
}
4141
defer repdaoMongo.Disconnect(ctx)
4242

43-
countryCollectionMap := map[string]string{
44-
"US": "retrievalbot_1",
45-
"DE": "retrievalbot_2",
46-
"SG": "retrievalbot_3",
47-
}
43+
country := os.Getenv("RETRIEVER_COUNTRY")
44+
collectionName := os.Getenv("REPDAO_MONGO_COLLECTION")
4845

49-
for country, collectionName := range countryCollectionMap {
50-
repdao := repdaoMongo.Database(os.Getenv("REPDAO_MONGO_DATABASE")).Collection(collectionName)
46+
repdao := repdaoMongo.Database(os.Getenv("REPDAO_MONGO_DATABASE")).Collection(collectionName)
5147

52-
// Find the last saved date
53-
var lastStats DailyStats
54-
err = repdao.FindOne(ctx, bson.D{}, options.FindOne().SetSort(bson.D{{"date", -1}})).Decode(&lastStats)
55-
if err != nil && err != mongo.ErrNoDocuments {
56-
panic(err)
57-
}
48+
// Find the last saved date
49+
var lastStats DailyStats
50+
err = repdao.FindOne(ctx, bson.D{}, options.FindOne().SetSort(bson.D{{"date", -1}})).Decode(&lastStats)
51+
if err != nil && err != mongo.ErrNoDocuments {
52+
panic(err)
53+
}
5854

59-
var startDate time.Time
60-
if err == mongo.ErrNoDocuments {
61-
startDate = time.Time{}
62-
} else {
63-
startDate, err = time.Parse("2006-01-02", lastStats.Date)
64-
if err != nil {
65-
panic(err)
66-
}
67-
startDate = startDate.AddDate(0, 0, 1)
55+
var startDate time.Time
56+
if err == mongo.ErrNoDocuments {
57+
startDate = time.Time{}
58+
} else {
59+
startDate, err = time.Parse("2006-01-02", lastStats.Date)
60+
if err != nil {
61+
panic(err)
6862
}
63+
startDate = startDate.AddDate(0, 0, 1)
64+
}
6965

70-
// Get the current day part of yesterday
71-
now := time.Now().UTC()
72-
endDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
66+
// Get the current day part of yesterday
67+
now := time.Now().UTC()
68+
endDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
7369

74-
fmt.Printf("startDate: %s, endDate: %s\n", startDate, endDate)
75-
if startDate.After(endDate) || startDate.Equal(endDate) {
76-
fmt.Println("No new data to process")
77-
return
78-
}
70+
fmt.Printf("startDate: %s, endDate: %s\n", startDate, endDate)
71+
if startDate.After(endDate) || startDate.Equal(endDate) {
72+
fmt.Println("No new data to process")
73+
return
74+
}
7975

80-
// Aggregate the results
81-
matchStage := bson.D{{"$match", bson.D{
82-
{"retriever.country", country},
83-
{"created_at", bson.D{{"$gte", startDate}, {"$lt", endDate}}},
84-
{"task.module", bson.D{{"$in", bson.A{"http", "bitswap", "graphsync"}}}},
85-
}}}
86-
87-
groupStage := bson.D{{"$group", bson.D{
88-
{"_id", bson.D{
89-
{"provider_id", "$task.provider.id"},
90-
{"date", bson.D{{"$dateToString", bson.D{
91-
{"format", "%Y-%m-%d"},
92-
{"date", "$created_at"},
93-
}}}},
94-
{"module", "$task.module"},
95-
{"success", "$result.success"},
96-
}},
97-
{"count", bson.D{{"$sum", 1}}},
98-
{"ttfb_sum", bson.D{{"$sum", "$result.ttfb"}}},
99-
{"speed_sum", bson.D{{"$sum", "$result.speed"}}},
100-
}}}
101-
102-
groupStage2 := bson.D{{"$group", bson.D{
103-
{"_id", bson.D{
104-
{"provider_id", "$_id.provider_id"},
105-
{"date", "$_id.date"},
76+
// Aggregate the results
77+
matchStage := bson.D{{"$match", bson.D{
78+
{"retriever.country", country},
79+
{"created_at", bson.D{{"$gte", startDate}, {"$lt", endDate}}},
80+
{"task.module", bson.D{{"$in", bson.A{"http", "bitswap", "graphsync"}}}},
81+
}}}
82+
83+
groupStage := bson.D{{"$group", bson.D{
84+
{"_id", bson.D{
85+
{"provider_id", "$task.provider.id"},
86+
{"date", bson.D{{"$dateToString", bson.D{
87+
{"format", "%Y-%m-%d"},
88+
{"date", "$created_at"},
89+
}}}},
90+
{"module", "$task.module"},
91+
{"success", "$result.success"},
92+
}},
93+
{"count", bson.D{{"$sum", 1}}},
94+
{"ttfb_sum", bson.D{{"$sum", "$result.ttfb"}}},
95+
{"speed_sum", bson.D{{"$sum", "$result.speed"}}},
96+
}}}
97+
98+
groupStage2 := bson.D{{"$group", bson.D{
99+
{"_id", bson.D{
100+
{"provider_id", "$_id.provider_id"},
101+
{"date", "$_id.date"},
102+
}},
103+
{"http_retrievals", bson.D{{"$sum", bson.D{
104+
{"$cond", bson.A{
105+
bson.D{{"$eq", bson.A{"$_id.module", "http"}}},
106+
"$count",
107+
0,
106108
}},
107-
{"http_retrievals", bson.D{{"$sum", bson.D{
108-
{"$cond", bson.A{
109+
}}}},
110+
{"http_retrieval_success", bson.D{{"$sum", bson.D{
111+
{"$cond", bson.A{
112+
bson.D{{"$and", bson.A{
109113
bson.D{{"$eq", bson.A{"$_id.module", "http"}}},
110-
"$count",
111-
0,
112-
}},
113-
}}}},
114-
{"http_retrieval_success", bson.D{{"$sum", bson.D{
115-
{"$cond", bson.A{
116-
bson.D{{"$and", bson.A{
117-
bson.D{{"$eq", bson.A{"$_id.module", "http"}}},
118-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
119-
}}},
120-
"$count",
121-
0,
114+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
122115
}}},
116+
"$count",
117+
0,
123118
}}},
124-
{"bitswap_retrievals", bson.D{{"$sum", bson.D{
125-
{"$cond", bson.A{
119+
}}},
120+
{"bitswap_retrievals", bson.D{{"$sum", bson.D{
121+
{"$cond", bson.A{
122+
bson.D{{"$eq", bson.A{"$_id.module", "bitswap"}}},
123+
"$count",
124+
0,
125+
}},
126+
}}}},
127+
{"bitswap_retrieval_success", bson.D{{"$sum", bson.D{
128+
{"$cond", bson.A{
129+
bson.D{{"$and", bson.A{
126130
bson.D{{"$eq", bson.A{"$_id.module", "bitswap"}}},
127-
"$count",
128-
0,
129-
}},
130-
}}}},
131-
{"bitswap_retrieval_success", bson.D{{"$sum", bson.D{
132-
{"$cond", bson.A{
133-
bson.D{{"$and", bson.A{
134-
bson.D{{"$eq", bson.A{"$_id.module", "bitswap"}}},
135-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
136-
}}},
137-
"$count",
138-
0,
131+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
139132
}}},
133+
"$count",
134+
0,
140135
}}},
141-
{"graphsync_retrievals", bson.D{{"$sum", bson.D{
142-
{"$cond", bson.A{
136+
}}},
137+
{"graphsync_retrievals", bson.D{{"$sum", bson.D{
138+
{"$cond", bson.A{
139+
bson.D{{"$eq", bson.A{"$_id.module", "graphsync"}}},
140+
"$count",
141+
0,
142+
}},
143+
}}}},
144+
{"graphsync_retrieval_success", bson.D{{"$sum", bson.D{
145+
{"$cond", bson.A{
146+
bson.D{{"$and", bson.A{
143147
bson.D{{"$eq", bson.A{"$_id.module", "graphsync"}}},
144-
"$count",
145-
0,
146-
}},
147-
}}}},
148-
{"graphsync_retrieval_success", bson.D{{"$sum", bson.D{
149-
{"$cond", bson.A{
150-
bson.D{{"$and", bson.A{
151-
bson.D{{"$eq", bson.A{"$_id.module", "graphsync"}}},
152-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
153-
}}},
154-
"$count",
155-
0,
148+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
156149
}}},
150+
"$count",
151+
0,
157152
}}},
158-
{"ttfb_sum", bson.D{{"$sum", bson.D{
159-
{"$cond", bson.A{
160-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
161-
"$ttfb_sum",
162-
0,
163-
}},
164-
}}}},
165-
{"speed_sum", bson.D{{"$sum", bson.D{
166-
{"$cond", bson.A{
167-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
168-
"$speed_sum",
169-
0,
170-
}},
171-
}}}},
172-
{"success_count", bson.D{{"$sum", bson.D{
173-
{"$cond", bson.A{
174-
bson.D{{"$eq", bson.A{"$_id.success", true}}},
175-
"$count",
176-
0,
177-
}},
178-
}}}},
179-
}}}
180-
181-
projectStage := bson.D{{"$project", bson.D{
182-
{"provider_id", "$_id.provider_id"},
183-
{"date", "$_id.date"},
184-
{"http_retrievals", 1},
185-
{"http_retrieval_success", 1},
186-
{"bitswap_retrievals", 1},
187-
{"bitswap_retrieval_success", 1},
188-
{"graphsync_retrievals", 1},
189-
{"graphsync_retrieval_success", 1},
190-
{"avg_ttfb_ms", bson.D{{"$cond", bson.A{
191-
bson.D{{"$eq", bson.A{"$success_count", 0}}},
153+
}}},
154+
{"ttfb_sum", bson.D{{"$sum", bson.D{
155+
{"$cond", bson.A{
156+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
157+
"$ttfb_sum",
192158
0,
193-
bson.D{{"$divide", bson.A{bson.D{{"$divide", bson.A{"$ttfb_sum", "$success_count"}}}, 1000000.0}}},
194-
}}}},
195-
{"avg_speed_bps", bson.D{{"$cond", bson.A{
196-
bson.D{{"$eq", bson.A{"$success_count", 0}}},
159+
}},
160+
}}}},
161+
{"speed_sum", bson.D{{"$sum", bson.D{
162+
{"$cond", bson.A{
163+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
164+
"$speed_sum",
197165
0,
198-
bson.D{{"$divide", bson.A{"$speed_sum", "$success_count"}}},
199-
}}}},
200-
}}}
166+
}},
167+
}}}},
168+
{"success_count", bson.D{{"$sum", bson.D{
169+
{"$cond", bson.A{
170+
bson.D{{"$eq", bson.A{"$_id.success", true}}},
171+
"$count",
172+
0,
173+
}},
174+
}}}},
175+
}}}
176+
177+
projectStage := bson.D{{"$project", bson.D{
178+
{"provider_id", "$_id.provider_id"},
179+
{"date", "$_id.date"},
180+
{"http_retrievals", 1},
181+
{"http_retrieval_success", 1},
182+
{"bitswap_retrievals", 1},
183+
{"bitswap_retrieval_success", 1},
184+
{"graphsync_retrievals", 1},
185+
{"graphsync_retrieval_success", 1},
186+
{"avg_ttfb_ms", bson.D{{"$cond", bson.A{
187+
bson.D{{"$eq", bson.A{"$success_count", 0}}},
188+
0,
189+
bson.D{{"$divide", bson.A{bson.D{{"$divide", bson.A{"$ttfb_sum", "$success_count"}}}, 1000000.0}}},
190+
}}}},
191+
{"avg_speed_bps", bson.D{{"$cond", bson.A{
192+
bson.D{{"$eq", bson.A{"$success_count", 0}}},
193+
0,
194+
bson.D{{"$divide", bson.A{"$speed_sum", "$success_count"}}},
195+
}}}},
196+
}}}
197+
198+
cursor, err := retbot.Aggregate(context.Background(), mongo.Pipeline{matchStage, groupStage, groupStage2, projectStage})
199+
if err != nil {
200+
panic(err)
201+
}
202+
defer cursor.Close(context.Background())
203+
var stats []interface{}
201204

202-
cursor, err := retbot.Aggregate(context.Background(), mongo.Pipeline{matchStage, groupStage, groupStage2, projectStage})
205+
// Insert the aggregated results into the new collection
206+
for cursor.Next(context.Background()) {
207+
var dailyStats DailyStats
208+
err := cursor.Decode(&dailyStats)
203209
if err != nil {
204210
panic(err)
205211
}
206-
defer cursor.Close(context.Background())
207-
var stats []interface{}
208-
209-
// Insert the aggregated results into the new collection
210-
for cursor.Next(context.Background()) {
211-
var dailyStats DailyStats
212-
err := cursor.Decode(&dailyStats)
213-
if err != nil {
214-
panic(err)
215-
}
216-
stats = append(stats, dailyStats)
217-
fmt.Printf("Got daily stats: %+v\n", dailyStats)
218-
}
212+
stats = append(stats, dailyStats)
213+
fmt.Printf("Got daily stats: %+v\n", dailyStats)
214+
}
219215

220-
// Insert the aggregated results into the new collection
221-
result, err := repdao.InsertMany(ctx, stats)
222-
if err != nil {
223-
panic(err)
224-
}
225-
fmt.Printf("Inserted %v documents into the new collection!\n", len(result.InsertedIDs))
216+
// Insert the aggregated results into the new collection
217+
result, err := repdao.InsertMany(ctx, stats)
218+
if err != nil {
219+
panic(err)
226220
}
221+
fmt.Printf("Inserted %v documents into the new collection!\n", len(result.InsertedIDs))
222+
227223
}

0 commit comments

Comments
 (0)