Skip to content

Commit 4b0a33a

Browse files
committed
chore: implement background self-identification of huffman encoding
The PR adds a background task that runs only once on db 0 and computes huffman table but only on datasets that fit the requirements - key memory usage is significant enough to do such analysis. At the end of the process a metric increased to signal that the huffman table was computed and printed to logs. Signed-off-by: Roman Gershman <[email protected]>
1 parent 0386de0 commit 4b0a33a

File tree

7 files changed

+147
-13
lines changed

7 files changed

+147
-13
lines changed

src/server/db_slice.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ struct DbStats : public DbTableStats {
3939
// Memory used by dictionaries.
4040
size_t table_mem_usage = 0;
4141

42-
using DbTableStats::operator+=;
42+
// We everride additional DbStats fields explicitly in DbSlice::GetStats().
4343
using DbTableStats::operator=;
4444

4545
DbStats& operator+=(const DbStats& o);

src/server/debugcmd.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ extern "C" {
4141
#include "server/server_state.h"
4242
#include "server/string_family.h"
4343
#include "server/transaction.h"
44+
4445
using namespace std;
4546

4647
ABSL_DECLARE_FLAG(string, dir);
@@ -283,8 +284,8 @@ unsigned HufHist::MaxFreqCount() const {
283284
return max_freq;
284285
}
285286

286-
unsigned kMaxFreqPerShard = 1U << 20;
287-
unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);
287+
constexpr unsigned kMaxFreqPerShard = 1U << 20;
288+
constexpr unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);
288289

289290
void DoComputeHist(CompactObjType type, EngineShard* shard, ConnectionContext* cntx,
290291
HufHist* dest) {

src/server/engine_shard.cc

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
#include "server/engine_shard.h"
66

7+
#include <absl/strings/escaping.h>
78
#include <absl/strings/match.h>
89
#include <absl/strings/str_cat.h>
910

1011
#include <memory>
1112

1213
#include "base/flags.h"
14+
#include "core/huff_coder.h"
1315
#include "core/page_usage_stats.h"
1416
#include "io/proc_reader.h"
1517

@@ -116,6 +118,75 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
116118
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
117119
}
118120

121+
class HuffmanCheckTask {
122+
public:
123+
int32_t Run(DbSlice* db_slice);
124+
125+
private:
126+
PrimeTable::Cursor cursor_;
127+
128+
static constexpr unsigned kMaxSymbol = 255;
129+
array<unsigned, kMaxSymbol + 1> hist_; // histogram of symbols.
130+
string scratch_;
131+
};
132+
133+
int32_t HuffmanCheckTask::Run(DbSlice* db_slice) {
134+
DbTable* db_table = db_slice->GetDBTable(0); // we currently support only default db.
135+
if (!db_table)
136+
return -1;
137+
138+
// incrementally aggregate frequency histogram.
139+
auto& prime = db_table->prime;
140+
141+
constexpr uint32_t kMaxTraverses = 512;
142+
uint32_t traverses_count = 0;
143+
do {
144+
cursor_ = prime.Traverse(cursor_, [&](PrimeIterator it) {
145+
if (!it->first.IsInline()) {
146+
string_view val = it->first.GetSlice(&scratch_);
147+
for (unsigned char c : val) {
148+
hist_[c]++;
149+
}
150+
}
151+
});
152+
traverses_count++;
153+
} while (traverses_count < kMaxTraverses && cursor_);
154+
155+
if (cursor_)
156+
return 4; // priority to continue later.
157+
158+
// Finished scanning the table, now normalize the table.
159+
constexpr unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);
160+
size_t total_freq = std::accumulate(hist_.begin(), hist_.end(), 0UL);
161+
162+
// to avoid overflow.
163+
double scale = total_freq > kMaxFreqTotal ? static_cast<double>(total_freq) / kMaxFreqTotal : 1.0;
164+
for (unsigned i = 0; i <= kMaxSymbol; i++) {
165+
hist_[i] = static_cast<unsigned>(hist_[i] / scale);
166+
if (hist_[i] == 0) {
167+
hist_[i] = 1; // Avoid zero frequency symbols.
168+
}
169+
}
170+
171+
// Build the huffman table. We currently output the table to logs and just increase
172+
// the metric counter to signal that we built a table.
173+
174+
HuffmanEncoder huff_enc;
175+
string error_msg;
176+
if (huff_enc.Build(hist_.data(), kMaxSymbol, &error_msg)) {
177+
size_t compressed_size = huff_enc.EstimateCompressedSize(hist_.data(), kMaxSymbol);
178+
LOG(INFO) << "Huffman table built, reducing memory usage from " << total_freq << " to "
179+
<< compressed_size << " bytes, ratio " << double(compressed_size) / total_freq;
180+
string bintable = huff_enc.Export();
181+
LOG(INFO) << "Huffman binary table: " << absl::Base64Escape(bintable);
182+
db_slice->shard_owner()->stats().huffman_tables_built++;
183+
} else {
184+
LOG(WARNING) << "Huffman build failed: " << error_msg;
185+
}
186+
187+
return -1; // task completed.
188+
}
189+
119190
} // namespace
120191

121192
__thread EngineShard* EngineShard::shard_ = nullptr;
@@ -141,7 +212,7 @@ string EngineShard::TxQueueInfo::Format() const {
141212
}
142213

143214
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
144-
static_assert(sizeof(Stats) == 96);
215+
static_assert(sizeof(Stats) == 104);
145216

146217
#define ADD(x) x += o.x
147218

@@ -157,6 +228,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
157228
ADD(total_heartbeat_expired_bytes);
158229
ADD(total_heartbeat_expired_calls);
159230
ADD(total_migrated_keys);
231+
ADD(huffman_tables_built);
160232

161233
#undef ADD
162234
return *this;
@@ -347,6 +419,8 @@ void EngineShard::Shutdown() {
347419

348420
void EngineShard::StopPeriodicFiber() {
349421
ProactorBase::me()->RemoveOnIdleTask(defrag_task_id_);
422+
ProactorBase::me()->RemoveOnIdleTask(huffman_check_task_id_);
423+
350424
fiber_heartbeat_periodic_done_.Notify();
351425
if (fiber_heartbeat_periodic_.IsJoinable()) {
352426
fiber_heartbeat_periodic_.Join();
@@ -620,7 +694,43 @@ void EngineShard::Heartbeat() {
620694
}
621695
return;
622696
}
623-
start = std::chrono::system_clock::now();
697+
698+
thread_local bool check_huffman = (shard_id_ == 0); // run it only on shard 0.
699+
if (check_huffman) {
700+
auto* ptr = db_slice.GetDBTable(0);
701+
if (ptr) {
702+
size_t key_usage = ptr->stats.memory_usage_by_type[OBJ_KEY];
703+
size_t obj_usage = ptr->stats.obj_memory_usage;
704+
705+
#ifdef NDEBUG
706+
#define MB_THRESHOLD (50 * 1024 * 1024)
707+
#else
708+
#define MB_THRESHOLD (5 * 1024 * 1024)
709+
#endif
710+
711+
if (key_usage > MB_THRESHOLD && key_usage > obj_usage / 8) {
712+
VLOG(1) << "Scheduling huffman check task, key usage: " << key_usage
713+
<< ", obj usage: " << obj_usage;
714+
715+
check_huffman = false; // trigger only once.
716+
717+
// launch the task
718+
HuffmanCheckTask* task = new HuffmanCheckTask();
719+
huffman_check_task_id_ = ProactorBase::me()->AddOnIdleTask([task] {
720+
if (!shard_ || !namespaces) {
721+
delete task;
722+
return -1;
723+
}
724+
725+
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_->shard_id());
726+
int32_t res = task->Run(&db_slice);
727+
if (res == -1)
728+
delete task;
729+
return res;
730+
});
731+
}
732+
}
733+
}
624734

625735
if (!IsReplica()) { // Never run expiry/evictions on replica.
626736
RetireExpiredAndEvict();
@@ -655,7 +765,7 @@ void EngineShard::RetireExpiredAndEvict() {
655765
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
656766
// The higher ttl_delete_target the more likely we have lots of expired items that need
657767
// to be deleted.
658-
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
768+
ttl_delete_target = unsigned(kTtlDeleteLimit * double(deleted) / (double(traversed) + 10));
659769
}
660770

661771
DbContext db_cntx;

src/server/engine_shard.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class EngineShard {
5353
// cluster stats
5454
uint64_t total_migrated_keys = 0;
5555

56+
// how many huffman tables were built successfully in the background
57+
uint32_t huffman_tables_built = 0;
58+
5659
Stats& operator+=(const Stats&);
5760
};
5861

@@ -135,7 +138,7 @@ class EngineShard {
135138
}
136139

137140
// Moving average counters.
138-
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };
141+
enum MovingCnt : uint8 { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };
139142

140143
// Returns moving sum over the last 6 seconds.
141144
uint32_t GetMovingSum6(MovingCnt type) const {
@@ -294,7 +297,7 @@ class EngineShard {
294297
journal::Journal* journal_ = nullptr;
295298
IntentLock shard_lock_;
296299

297-
uint32_t defrag_task_id_ = 0;
300+
uint32_t defrag_task_id_ = UINT32_MAX, huffman_check_task_id_ = UINT32_MAX;
298301
EvictionTaskState eviction_state_; // Used on eviction fiber
299302
util::fb2::Fiber fiber_heartbeat_periodic_;
300303
util::fb2::Done fiber_heartbeat_periodic_done_;

src/server/server_family.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1959,6 +1959,9 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
19591959
AppendMetricWithoutLabels("defrag_objects_moved", "Objects moved",
19601960
m.shard_stats.defrag_realloc_total, COUNTER, &resp->body());
19611961

1962+
AppendMetricWithoutLabels("huffman_tables_built", "Huffman tables built",
1963+
m.shard_stats.huffman_tables_built, MetricType::COUNTER, &resp->body());
1964+
19621965
// Tiered metrics
19631966
{
19641967
AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,

tests/dragonfly/connection_test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,7 @@ async def measure(aw):
641641
await async_client.zadd("zset-1", mapping={str(i): i for i in range(50)})
642642
assert await measure(async_client.zrange("zset-1", 0, -1, withscores=True)) <= 2
643643

644+
port = async_client.connection_pool.connection_kwargs["port"]
644645
# Exec call
645646
e = async_client.pipeline(transaction=True)
646647
for _ in range(100):

tests/dragonfly/server_family_test.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import yaml
66
from prometheus_client.samples import Sample
77
from pymemcache import Client
8-
from testcontainers.core.container import DockerContainer
9-
from testcontainers.core.waiting_utils import wait_for_logs
108

119
from . import dfly_args
1210
from .instance import DflyInstance
@@ -247,9 +245,8 @@ async def test_latency_stats(async_client: aioredis.Redis):
247245
latency_stats = await async_client.info("LATENCYSTATS")
248246
# Only stats for the `config resetstat` command should remain in stats
249247
assert (
250-
len(latency_stats) == 1 and "latency_percentiles_usec_config" in latency_stats,
251-
f"unexpected latency stats after reset: {latency_stats}",
252-
)
248+
len(latency_stats) == 1 and "latency_percentiles_usec_config" in latency_stats
249+
), f"unexpected latency stats after reset: {latency_stats}"
253250

254251

255252
@dfly_args({"latency_tracking": False})
@@ -260,6 +257,8 @@ async def test_latency_stats_disabled(async_client: aioredis.Redis):
260257

261258

262259
async def test_metrics_sanity_check(df_server: DflyInstance):
260+
from testcontainers.core.container import DockerContainer
261+
from testcontainers.core.waiting_utils import wait_for_logs
263262

264263
def on_container_output(container: DockerContainer, fn: Callable):
265264
for entry in container.get_logs():
@@ -320,3 +319,20 @@ def assert_no_error(entry: str):
320319
on_container_output(
321320
prometheus, lambda entry: logging.info(f"prometheus log: {entry}")
322321
)
322+
323+
324+
@pytest.mark.opt_only
325+
@dfly_args({"proactor_threads": "2"})
326+
async def test_huffman_tables_built(df_server: DflyInstance):
327+
async_client = df_server.client()
328+
# Insert enough data to trigger background huffman table building
329+
key_name = "keyfooobarrsoooooooooooooooooooooooooooooooooooooooooooooooo"
330+
await async_client.execute_command("DEBUG", "POPULATE", "1000000", key_name, "14")
331+
332+
@assert_eventually(times=500)
333+
async def check_metrics():
334+
metrics = await df_server.metrics()
335+
m = metrics["dragonfly_huffman_tables_built"]
336+
assert m.samples[0].value > 0
337+
338+
await check_metrics()

0 commit comments

Comments
 (0)