@@ -118,11 +118,11 @@ func (ms *MetricSync) Run(ctx context.Context) error {
118
118
})
119
119
120
120
g .Go (func () error {
121
- return ms .db . UpsertStreamed ( ctx , upsertPodMetrics , database . WithStatement (ms .podMetricUpsertStmt (), 5 ))
121
+ return database . NewUpsert ( ms .db ). WithStatement (ms .podMetricUpsertStmt (), 5 ). Stream ( ctx , upsertPodMetrics )
122
122
})
123
123
124
124
g .Go (func () error {
125
- return ms .db . UpsertStreamed ( ctx , upsertContainerMetrics , database . WithStatement (ms .containerMetricUpsertStmt (), 6 ))
125
+ return database . NewUpsert ( ms .db ). WithStatement (ms .containerMetricUpsertStmt (), 6 ). Stream ( ctx , upsertContainerMetrics )
126
126
})
127
127
128
128
return g .Wait ()
@@ -157,11 +157,11 @@ func (ms *MetricSync) Clean(ctx context.Context, deleteChannel <-chan contracts.
157
157
})
158
158
159
159
g .Go (func () error {
160
- return ms .db . DeleteStreamed ( ctx , & schema.PodMetric {}, deletesPod , database . ByColumn ( "reference_id" ) )
160
+ return database . NewDelete ( ms .db ). ByColumn ( "reference_id" ). Stream ( ctx , & schema.PodMetric {}, deletesPod )
161
161
})
162
162
163
163
g .Go (func () error {
164
- return ms .db . DeleteStreamed ( ctx , & schema.ContainerMetric {}, deletesContainer , database . ByColumn ( "pod_reference_id" ) )
164
+ return database . NewDelete ( ms .db ). ByColumn ( "pod_reference_id" ). Stream ( ctx , & schema.ContainerMetric {}, deletesContainer )
165
165
})
166
166
167
167
return g .Wait ()
@@ -229,7 +229,7 @@ func (nms *NodeMetricSync) Run(ctx context.Context) error {
229
229
})
230
230
231
231
g .Go (func () error {
232
- return nms .db . UpsertStreamed ( ctx , upsertNodeMetrics , database . WithStatement (nms .nodeMetricUpsertStmt (), 5 ))
232
+ return database . NewUpsert ( nms .db ). WithStatement (nms .nodeMetricUpsertStmt (), 5 ). Stream ( ctx , upsertNodeMetrics )
233
233
})
234
234
235
235
return g .Wait ()
@@ -261,7 +261,7 @@ func (nms *NodeMetricSync) Clean(ctx context.Context, deleteChannel <-chan contr
261
261
})
262
262
263
263
g .Go (func () error {
264
- return nms .db . DeleteStreamed ( ctx , & schema.NodeMetric {}, deletes , database . ByColumn ( "node_id" ) )
264
+ return database . NewDelete ( nms .db ). ByColumn ( "node_id" ). Stream ( ctx , & schema.NodeMetric {}, deletes )
265
265
})
266
266
267
267
return g .Wait ()
0 commit comments