Skip to content

Commit ca65a49

Browse files
fix(json_family): Fix memory tracking for JSON (#4777)
* fix(json_family): Fix memory tracking for JSON fixes #4725 Signed-off-by: Stepan Bagritsevich <[email protected]> * refactor: address comments Signed-off-by: Stepan Bagritsevich <[email protected]> * small fix Signed-off-by: Stepan Bagritsevich <[email protected]> --------- Signed-off-by: Stepan Bagritsevich <[email protected]>
1 parent ce5c44b commit ca65a49

File tree

3 files changed

+52
-7
lines changed

3 files changed

+52
-7
lines changed

src/core/compact_object.cc

+14-4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ size_t QlMAllocSize(quicklist* ql, bool slow) {
5555
return node_size + ql->count * 16; // we account for each member 16 bytes.
5656
}
5757

58+
size_t UpdateSize(size_t size, int64_t update) {
59+
if (update >= 0) {
60+
return size + update;
61+
}
62+
63+
int64_t result = static_cast<int64_t>(size) + update;
64+
if (result < 0) {
65+
DCHECK(false) << "Can't decrease " << size << " from " << -update;
66+
LOG_EVERY_N(ERROR, 30) << "Can't decrease " << size << " from " << -update;
67+
}
68+
return result;
69+
}
70+
5871
inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
5972
switch (encoding) {
6073
case kEncodingStrMap2: {
@@ -927,10 +940,7 @@ void CompactObj::SetJson(JsonType&& j) {
927940
void CompactObj::SetJsonSize(int64_t size) {
928941
if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) {
929942
// JSON.SET or if mem hasn't changed from a JSON op then we just update.
930-
if (size < 0) {
931-
DCHECK(static_cast<int64_t>(u_.json_obj.cons.bytes_used) >= size);
932-
}
933-
u_.json_obj.cons.bytes_used += size;
943+
u_.json_obj.cons.bytes_used = UpdateSize(u_.json_obj.cons.bytes_used, size);
934944
}
935945
}
936946

src/server/json_family.cc

+6-3
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,8 @@ OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
339339

340340
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);
341341

342+
JsonMemTracker tracker;
343+
342344
std::optional<JsonType> parsed_json = ShardJsonFromString(json_str);
343345
if (!parsed_json) {
344346
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved";
@@ -354,7 +356,10 @@ OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
354356
} else {
355357
res.it->second.SetJson(std::move(*parsed_json));
356358
}
359+
360+
tracker.SetJsonSize(res.it->second, res.is_new);
357361
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
362+
358363
return std::move(res);
359364
}
360365

@@ -942,7 +947,6 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
942947
return 0;
943948
}
944949

945-
JsonMemTracker tracker;
946950
// FindMutable because we need to run the AutoUpdater at the end which will account
947951
// the deltas calculated from the MemoryTracker
948952
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
@@ -953,6 +957,7 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
953957
PrimeValue& pv = it_res->it->second;
954958
JsonType* json_val = pv.GetJson();
955959

960+
JsonMemTracker tracker;
956961
absl::Cleanup update_size_on_exit([tracker, &pv]() mutable { tracker.SetJsonSize(pv, false); });
957962

958963
if (json_path.HoldsJsonPath()) {
@@ -1329,10 +1334,8 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
13291334
}
13301335
}
13311336

1332-
JsonMemTracker mem_tracker;
13331337
auto st = SetJson(op_args, key, json_str);
13341338
RETURN_ON_BAD_STATUS(st);
1335-
mem_tracker.SetJsonSize(st->it->second, st->is_new);
13361339
return true;
13371340
}
13381341

tests/dragonfly/replication_test.py

+32
Original file line numberDiff line numberDiff line change
@@ -2951,3 +2951,35 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
29512951
await wait_for_replicas_state(*c_replicas)
29522952

29532953
await fill_task
2954+
2955+
2956+
async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
2957+
"""
2958+
This test reproduces a bug in the JSON memory tracking.
2959+
"""
2960+
master = df_factory.create(proactor_threads=2, serialization_max_chunk_size=1)
2961+
replicas = [df_factory.create(proactor_threads=2) for i in range(2)]
2962+
2963+
# Start instances and connect clients
2964+
df_factory.start_all([master] + replicas)
2965+
c_master = master.client()
2966+
c_replicas = [replica.client() for replica in replicas]
2967+
2968+
total = 100000
2969+
await c_master.execute_command(f"DEBUG POPULATE {total} tmp 1000 TYPE SET ELEMENTS 100")
2970+
2971+
thresehold = 25000
2972+
for i in range(thresehold):
2973+
rand = random.randint(1, 4)
2974+
await c_master.execute_command(f"EXPIRE tmp:{i} {rand} NX")
2975+
2976+
seeder = StaticSeeder(key_target=100_000)
2977+
fill_task = asyncio.create_task(seeder.run(master.client()))
2978+
2979+
for replica in c_replicas:
2980+
await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}")
2981+
2982+
async with async_timeout.timeout(240):
2983+
await wait_for_replicas_state(*c_replicas)
2984+
2985+
await fill_task

0 commit comments

Comments
 (0)