Skip to content

Commit c4c3936

Browse files
committed
fix(hll_family): Don't use destination key in PFMERGE operation
Skip destination key to be used with PFMERGE. Fixed bug in hllMergeDense that writes to wrong register. Fixes #4750 Signed-off-by: mkaruza <[email protected]>
1 parent 51f400a commit c4c3936

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

src/redis/hyperloglog.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ int64_t pfcountSingle(struct HllBufferPtr hll_ptr) {
13711371

13721372
/* Merge dense-encoded HLL */
13731373
static void hllMergeDense(uint8_t* max, struct HllBufferPtr to) {
1374-
uint8_t* registers = max + HLL_HDR_SIZE;
1374+
uint8_t* registers = max;
13751375
uint8_t val;
13761376
struct hllhdr* hll_hdr = (struct hllhdr*)to.hll;
13771377

src/server/hll_family.cc

+15-5
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,18 @@ OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
169169
}
170170
}
171171

172-
OpResult<vector<string>> ReadValues(const OpArgs& op_args, const ShardArgs& keys) {
172+
OpResult<vector<string>> ReadValues(const OpArgs& op_args, const ShardArgs& keys,
173+
bool skip_dest_key) {
173174
try {
174175
vector<string> values;
175-
for (string_view key : keys) {
176-
auto it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_STRING);
176+
ShardArgs::Iterator start_key = keys.begin(), end_key = keys.end();
177+
if (skip_dest_key) {
178+
// If destkey is only found on this shard we can return
179+
if (++start_key == end_key)
180+
return OpStatus::OK;
181+
}
182+
for (; start_key != end_key; ++start_key) {
183+
auto it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, *start_key, OBJ_STRING);
177184
if (it.ok()) {
178185
string hll;
179186
it.value()->second.GetString(&hll);
@@ -211,7 +218,7 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, const CommandContext& cmd_cntx)
211218
auto cb = [&](Transaction* t, EngineShard* shard) {
212219
ShardId sid = shard->shard_id();
213220
ShardArgs shard_args = t->GetShardArgs(shard->shard_id());
214-
auto result = ReadValues(t->GetOpArgs(shard), shard_args);
221+
auto result = ReadValues(t->GetOpArgs(shard), shard_args, false /* no dest key */);
215222
if (result.ok()) {
216223
hlls[sid] = std::move(result.value());
217224
}
@@ -246,12 +253,15 @@ void PFCount(CmdArgList args, const CommandContext& cmd_cntx) {
246253
OpResult<int> PFMergeInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
247254
vector<vector<string>> hlls;
248255
hlls.resize(shard_set->size());
256+
const string_view dest_key = ArgS(args, 0);
257+
const ShardId dest_shard = Shard(dest_key, shard_set->size());
249258

250259
atomic_bool success = true;
251260
auto cb = [&](Transaction* t, EngineShard* shard) {
252261
ShardId sid = shard->shard_id();
262+
const bool skip_dest_key = sid == dest_shard;
253263
ShardArgs shard_args = t->GetShardArgs(shard->shard_id());
254-
auto result = ReadValues(t->GetOpArgs(shard), shard_args);
264+
auto result = ReadValues(t->GetOpArgs(shard), shard_args, skip_dest_key);
255265
if (result.ok()) {
256266
hlls[sid] = std::move(result.value());
257267
} else {

src/server/hll_family_test.cc

+7-2
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,13 @@ TEST_F(HllFamilyTest, MergeToNew) {
166166
TEST_F(HllFamilyTest, MergeToExisting) {
167167
EXPECT_EQ(CheckedInt({"pfadd", "key1", "1", "2", "3"}), 1);
168168
EXPECT_EQ(CheckedInt({"pfadd", "key2", "4", "5"}), 1);
169-
EXPECT_EQ(Run({"pfmerge", "key2", "key1"}), "OK");
170-
EXPECT_EQ(CheckedInt({"pfcount", "key2"}), 5);
169+
EXPECT_EQ(Run({"pfmerge", "key3", "key2", "key1"}), "OK");
170+
EXPECT_EQ(CheckedInt({"pfcount", "key3"}), 5);
171+
EXPECT_EQ(Run({"pfmerge", "key3", "key3"}), "OK");
172+
EXPECT_EQ(CheckedInt({"pfcount", "key3"}), 5);
173+
EXPECT_EQ(CheckedInt({"pfadd", "key4", "4", "5", "6"}), 1);
174+
EXPECT_EQ(Run({"pfmerge", "key3", "key3", "key4"}), "OK");
175+
EXPECT_EQ(CheckedInt({"pfcount", "key3"}), 6);
171176
}
172177

173178
TEST_F(HllFamilyTest, MergeNonExisting) {

0 commit comments

Comments
 (0)