Skip to content

Commit 10eaecf

Browse files
fix(json_family): Fix memory tracking for JSON
fixes dragonflydb#4725 Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent 753c25e commit 10eaecf

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

src/core/compact_object.cc

+24-4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,29 @@ size_t QlMAllocSize(quicklist* ql, bool slow) {
5555
return node_size + ql->count * 16; // we account for each member 16 bytes.
5656
}
5757

58+
size_t UpdateSize(size_t size, int64_t update) {
59+
if (update >= 0) {
60+
return size + update;
61+
}
62+
63+
// update < 0, so we need to convert it to unsigned
64+
uint64_t unsigned_update = 0;
65+
if (update > std::numeric_limits<int64_t>::min()) {
66+
unsigned_update = -update;
67+
} else {
68+
static constexpr uint64_t kMaxUpdate = std::numeric_limits<int64_t>::max();
69+
unsigned_update = kMaxUpdate + 1;
70+
}
71+
72+
if (unsigned_update > std::numeric_limits<size_t>::max()) {
73+
LOG(DFATAL) << "Overflow. Update size is too large. Size: " << size << ", update: " << update;
74+
return 0;
75+
}
76+
77+
DCHECK(static_cast<uint64_t>(size) >= unsigned_update);
78+
return size - unsigned_update;
79+
}
80+
5881
inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
5982
switch (encoding) {
6083
case kEncodingStrMap2: {
@@ -925,10 +948,7 @@ void CompactObj::SetJson(JsonType&& j) {
925948
void CompactObj::SetJsonSize(int64_t size) {
926949
if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) {
927950
// JSON.SET or if mem hasn't changed from a JSON op then we just update.
928-
if (size < 0) {
929-
DCHECK(static_cast<int64_t>(u_.json_obj.cons.bytes_used) >= size);
930-
}
931-
u_.json_obj.cons.bytes_used += size;
951+
u_.json_obj.cons.bytes_used = UpdateSize(u_.json_obj.cons.bytes_used, size);
932952
}
933953
}
934954

src/server/json_family.cc

+4-2
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
339339

340340
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);
341341

342+
JsonMemTracker tracker;
342343
std::optional<JsonType> parsed_json = ShardJsonFromString(json_str);
343344
if (!parsed_json) {
344345
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved";
@@ -354,6 +355,9 @@ OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
354355
} else {
355356
res.it->second.SetJson(std::move(*parsed_json));
356357
}
358+
359+
tracker.SetJsonSize(res.it->second, res.is_new);
360+
357361
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
358362
return std::move(res);
359363
}
@@ -1329,10 +1333,8 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
13291333
}
13301334
}
13311335

1332-
JsonMemTracker mem_tracker;
13331336
auto st = SetJson(op_args, key, json_str);
13341337
RETURN_ON_BAD_STATUS(st);
1335-
mem_tracker.SetJsonSize(st->it->second, st->is_new);
13361338
return true;
13371339
}
13381340

tests/dragonfly/replication_test.py

+32
Original file line numberDiff line numberDiff line change
@@ -2951,3 +2951,35 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
29512951
await wait_for_replicas_state(*c_replicas)
29522952

29532953
await fill_task
2954+
2955+
2956+
async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
2957+
"""
2958+
This test reproduces a bug in the JSON memory tracking.
2959+
"""
2960+
master = df_factory.create(proactor_threads=2, serialization_max_chunk_size=1)
2961+
replicas = [df_factory.create(proactor_threads=2) for i in range(2)]
2962+
2963+
# Start instances and connect clients
2964+
df_factory.start_all([master] + replicas)
2965+
c_master = master.client()
2966+
c_replicas = [replica.client() for replica in replicas]
2967+
2968+
total = 100000
2969+
await c_master.execute_command(f"DEBUG POPULATE {total} tmp 1000 TYPE SET ELEMENTS 100")
2970+
2971+
thresehold = 25000
2972+
for i in range(thresehold):
2973+
rand = random.randint(1, 4)
2974+
await c_master.execute_command(f"EXPIRE tmp:{i} {rand} NX")
2975+
2976+
seeder = StaticSeeder(key_target=100_000)
2977+
fill_task = asyncio.create_task(seeder.run(master.client()))
2978+
2979+
for replica in c_replicas:
2980+
await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}")
2981+
2982+
async with async_timeout.timeout(240):
2983+
await wait_for_replicas_state(*c_replicas)
2984+
2985+
await fill_task

0 commit comments

Comments
 (0)