Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct DbStats : public DbTableStats {
// Memory used by dictionaries.
size_t table_mem_usage = 0;

using DbTableStats::operator+=;
// We override additional DbStats fields explicitly in DbSlice::GetStats().
using DbTableStats::operator=;

DbStats& operator+=(const DbStats& o);
Expand Down
5 changes: 3 additions & 2 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"

using namespace std;

ABSL_DECLARE_FLAG(string, dir);
Expand Down Expand Up @@ -283,8 +284,8 @@ unsigned HufHist::MaxFreqCount() const {
return max_freq;
}

unsigned kMaxFreqPerShard = 1U << 20;
unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);
constexpr unsigned kMaxFreqPerShard = 1U << 20;
constexpr unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);

void DoComputeHist(CompactObjType type, EngineShard* shard, ConnectionContext* cntx,
HufHist* dest) {
Expand Down
123 changes: 120 additions & 3 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

#include "server/engine_shard.h"

#include <absl/strings/escaping.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

#include <memory>

#include "base/flags.h"
#include "core/huff_coder.h"
#include "core/page_usage_stats.h"
#include "io/proc_reader.h"

Expand Down Expand Up @@ -116,6 +118,86 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
}

class HuffmanCheckTask {
public:
HuffmanCheckTask() {
hist_.fill(0);
}

int32_t Run(DbSlice* db_slice);
Comment on lines +121 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My dream once was to turn every on idle task into a background fiber... 😞

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the same though? I clearly remember that OnIdle tasks run only when we do not have any CPU work to do and can be starved indefinitely. I thought that background fibers a bit more agressive. Also OnIdle tasks provide a way to throttle themselves and background fibers do not do that,, imho


private:
PrimeTable::Cursor cursor_;

static constexpr unsigned kMaxSymbol = 255;
array<unsigned, kMaxSymbol + 1> hist_; // histogram of symbols.
string scratch_;
};

int32_t HuffmanCheckTask::Run(DbSlice* db_slice) {
DbTable* db_table = db_slice->GetDBTable(0); // we currently support only default db.
if (!db_table)
return -1;

// incrementally aggregate frequency histogram.
auto& prime = db_table->prime;

constexpr uint32_t kMaxTraverses = 512;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that you run it without flags or anything and it might freeze when iterating 512 of big strings

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A valid concern, even though I do not think keys are long for real workloads. I will add a protection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, didn't think about it being just keys, I assume that in the future we want to test values as well

uint32_t traverses_count = 0;
do {
cursor_ = prime.Traverse(cursor_, [&](PrimeIterator it) {
if (!it->first.IsInline()) {
string_view val = it->first.GetSlice(&scratch_);
for (unsigned char c : val) {
hist_[c]++;
}

if (val.size() > 1024) {
traverses_count = kMaxTraverses; // return early.
string{}.swap(scratch_); // free memory.
}
}
});
traverses_count++;
} while (traverses_count < kMaxTraverses && cursor_);

if (cursor_)
return 4; // priority to continue later.

// Finished scanning the table, now normalize the table.
constexpr unsigned kMaxFreqTotal = static_cast<unsigned>((1U << 31) * 0.9);
size_t total_freq = std::accumulate(hist_.begin(), hist_.end(), 0UL);
if (total_freq == 0)
return -1;

// to avoid overflow.
double scale = total_freq > kMaxFreqTotal ? static_cast<double>(total_freq) / kMaxFreqTotal : 1.0;
for (unsigned i = 0; i <= kMaxSymbol; i++) {
hist_[i] = static_cast<unsigned>(hist_[i] / scale);
if (hist_[i] == 0) {
hist_[i] = 1; // Avoid zero frequency symbols.
}
}

// Build the huffman table. We currently output the table to logs and just increase
// the metric counter to signal that we built a table.

HuffmanEncoder huff_enc;
string error_msg;
if (huff_enc.Build(hist_.data(), kMaxSymbol, &error_msg)) {
size_t compressed_size = huff_enc.EstimateCompressedSize(hist_.data(), kMaxSymbol);
LOG(INFO) << "Huffman table built, reducing character count from " << total_freq << " to "
<< compressed_size << ", compression ratio " << double(compressed_size) / total_freq;
string bintable = huff_enc.Export();
LOG(INFO) << "Huffman binary table: " << absl::Base64Escape(bintable);
db_slice->shard_owner()->stats().huffman_tables_built++;
} else {
LOG(WARNING) << "Huffman build failed: " << error_msg;
}

return -1; // task completed.
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand All @@ -141,7 +223,7 @@ string EngineShard::TxQueueInfo::Format() const {
}

EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
static_assert(sizeof(Stats) == 96);
static_assert(sizeof(Stats) == 104);

#define ADD(x) x += o.x

Expand All @@ -157,6 +239,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
ADD(total_heartbeat_expired_bytes);
ADD(total_heartbeat_expired_calls);
ADD(total_migrated_keys);
ADD(huffman_tables_built);

#undef ADD
return *this;
Expand Down Expand Up @@ -347,6 +430,8 @@ void EngineShard::Shutdown() {

void EngineShard::StopPeriodicFiber() {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_id_);
ProactorBase::me()->RemoveOnIdleTask(huffman_check_task_id_);

fiber_heartbeat_periodic_done_.Notify();
if (fiber_heartbeat_periodic_.IsJoinable()) {
fiber_heartbeat_periodic_.Join();
Expand Down Expand Up @@ -620,7 +705,39 @@ void EngineShard::Heartbeat() {
}
return;
}
start = std::chrono::system_clock::now();

thread_local bool check_huffman = (shard_id_ == 0); // run it only on shard 0.
if (check_huffman) {
auto* ptr = db_slice.GetDBTable(0);
if (ptr) {
size_t key_usage = ptr->stats.memory_usage_by_type[OBJ_KEY];
size_t obj_usage = ptr->stats.obj_memory_usage;

#ifdef NDEBUG
#define MB_THRESHOLD (50 * 1024 * 1024)
#else
#define MB_THRESHOLD (5 * 1024 * 1024)
#endif

if (key_usage > MB_THRESHOLD && key_usage > obj_usage / 8) {
VLOG(1) << "Scheduling huffman check task, key usage: " << key_usage
<< ", obj usage: " << obj_usage;

check_huffman = false; // trigger only once.

// launch the task
huffman_check_task_id_ =
ProactorBase::me()->AddOnIdleTask([task = HuffmanCheckTask{}]() mutable {
if (!shard_ || !namespaces) {
return -1;
}

DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_->shard_id());
return task.Run(&db_slice);
});
}
}
}

if (!IsReplica()) { // Never run expiry/evictions on replica.
RetireExpiredAndEvict();
Expand Down Expand Up @@ -655,7 +772,7 @@ void EngineShard::RetireExpiredAndEvict() {
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher ttl_delete_target the more likely we have lots of expired items that need
// to be deleted.
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
ttl_delete_target = unsigned(kTtlDeleteLimit * double(deleted) / (double(traversed) + 10));
}

DbContext db_cntx;
Expand Down
7 changes: 5 additions & 2 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class EngineShard {
// cluster stats
uint64_t total_migrated_keys = 0;

// how many huffman tables were built successfully in the background
uint32_t huffman_tables_built = 0;

Stats& operator+=(const Stats&);
};

Expand Down Expand Up @@ -135,7 +138,7 @@ class EngineShard {
}

// Moving average counters.
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };
enum MovingCnt : uint8_t { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };

// Returns moving sum over the last 6 seconds.
uint32_t GetMovingSum6(MovingCnt type) const {
Expand Down Expand Up @@ -294,7 +297,7 @@ class EngineShard {
journal::Journal* journal_ = nullptr;
IntentLock shard_lock_;

uint32_t defrag_task_id_ = 0;
uint32_t defrag_task_id_ = UINT32_MAX, huffman_check_task_id_ = UINT32_MAX;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;
Expand Down
3 changes: 3 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,9 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
AppendMetricWithoutLabels("defrag_objects_moved", "Objects moved",
m.shard_stats.defrag_realloc_total, COUNTER, &resp->body());

AppendMetricWithoutLabels("huffman_tables_built", "Huffman tables built",
m.shard_stats.huffman_tables_built, MetricType::COUNTER, &resp->body());

// Tiered metrics
{
AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,
Expand Down
26 changes: 21 additions & 5 deletions tests/dragonfly/server_family_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import yaml
from prometheus_client.samples import Sample
from pymemcache import Client
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from . import dfly_args
from .instance import DflyInstance
Expand Down Expand Up @@ -247,9 +245,8 @@ async def test_latency_stats(async_client: aioredis.Redis):
latency_stats = await async_client.info("LATENCYSTATS")
# Only stats for the `config resetstat` command should remain in stats
assert (
len(latency_stats) == 1 and "latency_percentiles_usec_config" in latency_stats,
f"unexpected latency stats after reset: {latency_stats}",
)
len(latency_stats) == 1 and "latency_percentiles_usec_config" in latency_stats
), f"unexpected latency stats after reset: {latency_stats}"


@dfly_args({"latency_tracking": False})
Expand All @@ -260,6 +257,8 @@ async def test_latency_stats_disabled(async_client: aioredis.Redis):


async def test_metrics_sanity_check(df_server: DflyInstance):
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

def on_container_output(container: DockerContainer, fn: Callable):
for entry in container.get_logs():
Expand Down Expand Up @@ -320,3 +319,20 @@ def assert_no_error(entry: str):
on_container_output(
prometheus, lambda entry: logging.info(f"prometheus log: {entry}")
)


@pytest.mark.opt_only
@dfly_args({"proactor_threads": "2"})
async def test_huffman_tables_built(df_server: DflyInstance):
async_client = df_server.client()
# Insert enough data to trigger background huffman table building
key_name = "keyfooobarrsoooooooooooooooooooooooooooooooooooooooooooooooo"
await async_client.execute_command("DEBUG", "POPULATE", "1000000", key_name, "14")

@assert_eventually(times=500)
async def check_metrics():
metrics = await df_server.metrics()
m = metrics["dragonfly_huffman_tables_built"]
assert m.samples[0].value > 0

await check_metrics()
Loading