Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: proper memory accounting for objects loaded via streaming #4774

Merged
merged 1 commit into from
Mar 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ static_assert(kExpireSegmentSize == 23528);

void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) {
DCHECK_NE(db, nullptr);
if (size == 0)
return;

DbTableStats& stats = db->stats;
DCHECK_GE(static_cast<int64_t>(stats.obj_memory_usage) + size, 0)
<< "Can't decrease " << size << " from " << stats.obj_memory_usage;
Expand Down Expand Up @@ -500,6 +503,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
PreUpdateBlocking(cntx.db_index, it, key);
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
DCHECK_GE(db_arr_[cntx.db_index]->stats.obj_memory_usage, res->it->second.MallocUsed());

return {{it, exp_it,
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
Expand Down
12 changes: 8 additions & 4 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2544,11 +2544,15 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
item->val.rdb_type);
};

// The scope is important here, as we need to ensure that the object memory is properly
// accounted for.
DbSlice::ItAndUpdater append_res;

// If we're appending the item to an existing key, first load the
// object.
if (item->load_config.append) {
auto res = db_slice->FindMutable(db_cntx, item->key);
if (!IsValid(res.it)) {
append_res = db_slice->FindMutable(db_cntx, item->key);
if (!IsValid(append_res.it)) {
// If the item has expired we may not find the key. Note if the key
// is found, but expired since we started loading, we still append to
// avoid an inconsistent state where only part of the key is loaded.
Expand All @@ -2557,7 +2561,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
}
return;
}
pv_ptr = &res.it->second;
pv_ptr = &append_res.it->second;
}

if (ec_ = FromOpaque(item->val, item->load_config, pv_ptr); ec_) {
Expand Down Expand Up @@ -2598,7 +2602,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
return;
}

auto& res = *op_res;
DbSlice::ItAndUpdater& res = *op_res;
res.it->first.SetSticky(item->is_sticky);
if (item->has_mc_flags) {
res.it->second.SetFlag(true);
Expand Down
8 changes: 8 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ TEST_F(RdbTest, LoadHugeSet) {

ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));
auto metrics = GetMetrics();
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 24'000'000u);
}

// Tests loading a huge hmap, where the map is loaded in multiple partial
Expand All @@ -602,6 +604,8 @@ TEST_F(RdbTest, LoadHugeHMap) {

ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
auto metrics = GetMetrics();
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 29'000'000u);
}

// Tests loading a huge zset, where the zset is loaded in multiple partial
Expand All @@ -622,6 +626,8 @@ TEST_F(RdbTest, LoadHugeZSet) {

ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
auto metrics = GetMetrics();
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 26'000'000u);
}

// Tests loading a huge list, where the list is loaded in multiple partial
Expand All @@ -642,6 +648,8 @@ TEST_F(RdbTest, LoadHugeList) {

ASSERT_EQ(100000, CheckedInt({"llen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"llen", "test:1"}));
auto metrics = GetMetrics();
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 20'000'000u);
}

// Tests loading a huge stream, where the stream is loaded in multiple partial
Expand Down
Loading