Skip to content

chore: clean up dbslice #4769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ElementAccess {
EngineShard* shard_ = nullptr;
mutable DbSlice::AutoUpdater post_updater_;

void SetFields(EngineShard* shard, DbSlice::AddOrFindResult res);
void SetFields(EngineShard* shard, DbSlice::ItAndUpdater res);

public:
ElementAccess(string_view key, const OpArgs& args) : key_{key}, context_{args.db_cntx} {
Expand Down Expand Up @@ -298,7 +298,7 @@ std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
return res.status() != OpStatus::KEY_NOTFOUND;
}

void ElementAccess::SetFields(EngineShard* shard, DbSlice::AddOrFindResult res) {
void ElementAccess::SetFields(EngineShard* shard, DbSlice::ItAndUpdater res) {
element_iter_ = res.it;
added_ = res.is_new;
shard_ = shard;
Expand Down
54 changes: 20 additions & 34 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,14 +479,6 @@ DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
fields_.orig_heap_size = fields.it->second.MallocUsed();
}

DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) {
it = o.it;
exp_it = o.exp_it;
is_new = false;
post_updater = std::move(o).post_updater;
return *this;
}

DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) {
return std::move(FindMutableInternal(cntx, key, std::nullopt).value());
}
Expand Down Expand Up @@ -626,12 +618,11 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
return res;
}

OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFind(const Context& cntx, string_view key) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFind(const Context& cntx, string_view key) {
return AddOrFindInternal(cntx, key);
}

OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cntx,
string_view key) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx, string_view key) {
DCHECK(IsDbValid(cntx.db_index));

DbTable& db = *db_arr_[cntx.db_index];
Expand All @@ -644,15 +635,15 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt

// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return DbSlice::AddOrFindResult{
return ItAndUpdater{
.it = it,
.exp_it = exp_it,
.is_new = false,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
.key = key})};
.key = key}),
.is_new = false};
} else {
res = OpStatus::KEY_NOTFOUND;
}
Expand Down Expand Up @@ -760,15 +751,14 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
db.slots_stats[sid].key_count += 1;
}

return DbSlice::AddOrFindResult{
.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.is_new = true,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
.key = key})};
return ItAndUpdater{.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
.key = key}),
.is_new = true};
}

void DbSlice::ActivateDb(DbIndex db_ind) {
Expand Down Expand Up @@ -1049,11 +1039,10 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it,
}
}

OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key,
PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key, PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) {
DCHECK(!obj.IsRef());

auto op_result = AddOrFind(cntx, key);
Expand Down Expand Up @@ -1084,8 +1073,8 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdateInternal(const Context& c
return op_result;
}

OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdate(const Context& cntx, string_view key,
PrimeValue obj, uint64_t expire_at_ms) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrUpdate(const Context& cntx, string_view key,
PrimeValue obj, uint64_t expire_at_ms) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}

Expand Down Expand Up @@ -1545,6 +1534,7 @@ void DbSlice::SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events) {
}

void DbSlice::QueueInvalidationTrackingMessageAtomic(std::string_view key) {
FiberAtomicGuard guard;
auto it = client_tracking_map_.find(key);
if (it == client_tracking_map_.end()) {
return;
Expand Down Expand Up @@ -1651,10 +1641,6 @@ size_t DbSlice::StopSampleKeys(DbIndex db_ind) {
return count;
}

void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
return PerformDeletion(Iterator::FromPrime(del_it), table);
}

void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table) {
FiberAtomicGuard guard;
size_t table_before = table->table_memory();
Expand Down
30 changes: 8 additions & 22 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,7 @@ class DbSlice {
Iterator it;
ExpIterator exp_it;
AutoUpdater post_updater;

bool IsValid() const {
return !it.is_done();
}
bool is_new = false;
};

ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
Expand All @@ -304,20 +301,11 @@ class DbSlice {
OpResult<ConstIterator> FindReadOnly(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;

struct AddOrFindResult {
Iterator it;
ExpIterator exp_it;
bool is_new = false;
AutoUpdater post_updater;

AddOrFindResult& operator=(ItAndUpdater&& o);
};

OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> AddOrFind(const Context& cntx, std::string_view key);

// Same as AddOrSkip, but overwrites in case entry exists.
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);
OpResult<ItAndUpdater> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);

// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
Expand Down Expand Up @@ -552,9 +540,9 @@ class DbSlice {

bool DelEmptyPrimeValue(const Context& cntx, Iterator it);

OpResult<AddOrFindResult> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update);
OpResult<ItAndUpdater> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update);

void FlushSlotsFb(const cluster::SlotSet& slot_ids);
void FlushDbIndexes(const std::vector<DbIndex>& indexes);
Expand All @@ -568,9 +556,7 @@ class DbSlice {
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);

//
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);

// Queues invalidation message to the clients that are tracking the change to a key.
void QueueInvalidationTrackingMessageAtomic(std::string_view key);
Expand All @@ -590,7 +576,7 @@ class DbSlice {

PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;

OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> AddOrFindInternal(const Context& cntx, std::string_view key);

OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
Expand Down
14 changes: 6 additions & 8 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ class RdbRestoreValue : protected RdbLoaderBase {
rdb_version_ = rdb_version;
}

OpResult<DbSlice::AddOrFindResult> Add(string_view key, string_view payload,
const DbContext& cntx, const RestoreArgs& args,
DbSlice* db_slice);
OpResult<DbSlice::ItAndUpdater> Add(string_view key, string_view payload, const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice);

private:
std::optional<OpaqueObj> Parse(io::Source* source);
Expand Down Expand Up @@ -190,10 +189,9 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
return std::optional<OpaqueObj>(std::move(obj));
}

OpResult<DbSlice::AddOrFindResult> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx,
const RestoreArgs& args,
DbSlice* db_slice) {
OpResult<DbSlice::ItAndUpdater> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx, const RestoreArgs& args,
DbSlice* db_slice) {
InMemSource data_src(data);
PrimeValue pv;
bool first_parse = true;
Expand Down Expand Up @@ -715,7 +713,7 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
OpResult<vector<long>> OpFieldExpire(const OpArgs& op_args, string_view key, uint32_t ttl_sec,
CmdArgList values) {
auto& db_slice = op_args.GetDbSlice();
auto [it, expire_it, auto_updater] = db_slice.FindMutable(op_args.db_cntx, key);
auto [it, expire_it, auto_updater, is_new] = db_slice.FindMutable(op_args.db_cntx, key);

if (!IsValid(it) || (it->second.ObjType() != OBJ_SET && it->second.ObjType() != OBJ_HASH)) {
std::vector<long> res(values.size(), -2);
Expand Down
6 changes: 3 additions & 3 deletions src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ std::optional<JsonType> ShardJsonFromString(std::string_view input) {
return dfly::JsonFromString(input, CompactObj::memory_resource());
}

OpResult<DbSlice::AddOrFindResult> SetJson(const OpArgs& op_args, string_view key,
string_view json_str) {
OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
string_view json_str) {
auto& db_slice = op_args.GetDbSlice();

auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
Expand Down Expand Up @@ -1330,7 +1330,7 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
}

JsonMemTracker mem_tracker;
OpResult<DbSlice::AddOrFindResult> st = SetJson(op_args, key, json_str);
auto st = SetJson(op_args, key, json_str);
RETURN_ON_BAD_STATUS(st);
mem_tracker.SetJsonSize(st->it->second, st->is_new);
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, facade::ArgRange vals, bool journal_rewrite) {
EngineShard* es = op_args.shard;
DbSlice::AddOrFindResult res;
DbSlice::ItAndUpdater res;

if (skip_notexist) {
auto tmp_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_LIST);
Expand Down
2 changes: 1 addition & 1 deletion src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&

auto& db_slice = op_args.GetDbSlice();

DbSlice::AddOrFindResult add_res;
DbSlice::ItAndUpdater add_res;
if (opts.no_mkstream) {
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
RETURN_ON_BAD_STATUS(res_it);
Expand Down
Loading