@@ -169,11 +169,18 @@ OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
169
169
}
170
170
}
171
171
172
- OpResult<vector<string>> ReadValues (const OpArgs& op_args, const ShardArgs& keys) {
172
+ OpResult<vector<string>> ReadValues (const OpArgs& op_args, const ShardArgs& keys,
173
+ bool skip_dest_key) {
173
174
try {
174
175
vector<string> values;
175
- for (string_view key : keys) {
176
- auto it = op_args.GetDbSlice ().FindReadOnly (op_args.db_cntx , key, OBJ_STRING);
176
+ ShardArgs::Iterator start_key = keys.begin (), end_key = keys.end ();
177
+ if (skip_dest_key) {
178
+ // If destkey is only found on this shard we can return
179
+ if (++start_key == end_key)
180
+ return OpStatus::OK;
181
+ }
182
+ for (; start_key != end_key; ++start_key) {
183
+ auto it = op_args.GetDbSlice ().FindReadOnly (op_args.db_cntx , *start_key, OBJ_STRING);
177
184
if (it.ok ()) {
178
185
string hll;
179
186
it.value ()->second .GetString (&hll);
@@ -211,7 +218,7 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, const CommandContext& cmd_cntx)
211
218
auto cb = [&](Transaction* t, EngineShard* shard) {
212
219
ShardId sid = shard->shard_id ();
213
220
ShardArgs shard_args = t->GetShardArgs (shard->shard_id ());
214
- auto result = ReadValues (t->GetOpArgs (shard), shard_args);
221
+ auto result = ReadValues (t->GetOpArgs (shard), shard_args, false /* no dest key */ );
215
222
if (result.ok ()) {
216
223
hlls[sid] = std::move (result.value ());
217
224
}
@@ -246,12 +253,15 @@ void PFCount(CmdArgList args, const CommandContext& cmd_cntx) {
246
253
OpResult<int > PFMergeInternal (CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
247
254
vector<vector<string>> hlls;
248
255
hlls.resize (shard_set->size ());
256
+ const string_view dest_key = ArgS (args, 0 );
257
+ const ShardId dest_shard = Shard (dest_key, shard_set->size ());
249
258
250
259
atomic_bool success = true ;
251
260
auto cb = [&](Transaction* t, EngineShard* shard) {
252
261
ShardId sid = shard->shard_id ();
262
+ const bool skip_dest_key = sid == dest_shard;
253
263
ShardArgs shard_args = t->GetShardArgs (shard->shard_id ());
254
- auto result = ReadValues (t->GetOpArgs (shard), shard_args);
264
+ auto result = ReadValues (t->GetOpArgs (shard), shard_args, skip_dest_key );
255
265
if (result.ok ()) {
256
266
hlls[sid] = std::move (result.value ());
257
267
} else {
@@ -275,16 +285,15 @@ OpResult<int> PFMergeInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder
275
285
int result = pfmerge (ptrs.data (), ptrs.size (), StringToHllPtr (hll));
276
286
277
287
auto set_cb = [&](Transaction* t, EngineShard* shard) {
278
- string_view key = ArgS (args, 0 );
279
288
const OpArgs& op_args = t->GetOpArgs (shard);
280
289
auto & db_slice = op_args.GetDbSlice ();
281
- auto op_res = db_slice.AddOrFind (t->GetDbContext (), key );
290
+ auto op_res = db_slice.AddOrFind (t->GetDbContext (), dest_key );
282
291
RETURN_ON_BAD_STATUS (op_res);
283
292
auto & res = *op_res;
284
293
res.it ->second .SetString (hll);
285
294
286
295
if (op_args.shard ->journal ()) {
287
- RecordJournal (op_args, " SET" , ArgSlice{key , hll});
296
+ RecordJournal (op_args, " SET" , ArgSlice{dest_key , hll});
288
297
}
289
298
290
299
return OpStatus::OK;
0 commit comments