|
35 | 35 | #include "server/transaction.h"
|
36 | 36 | #include "util/fibers/future.h"
|
37 | 37 |
|
| 38 | +ABSL_FLAG(bool, mget_prefetch_keys, true, "If true, MGET will prefetch keys before reading them"); |
| 39 | + |
| 40 | +ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys"); |
| 41 | + |
38 | 42 | namespace dfly {
|
39 | 43 |
|
40 | 44 | namespace {
|
@@ -547,23 +551,42 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran
|
547 | 551 |
|
548 | 552 | absl::InlinedVector<Item, 32> items(keys.Size());
|
549 | 553 |
|
| 554 | + // First, fetch all iterators and count total size ahead |
| 555 | + size_t total_size = 0; |
| 556 | + unsigned index = 0; |
| 557 | + static bool mget_prefetch_keys = absl::GetFlag(FLAGS_mget_prefetch_keys); |
| 558 | + static bool mget_dedup_keys = absl::GetFlag(FLAGS_mget_dedup_keys); |
| 559 | + |
550 | 560 | // We can not make it thread-local because we may preempt during the Find loop due to
|
551 | 561 | // serialization with the bumpup calls.
|
552 | 562 |
|
553 | 563 | // TODO: consider separating BumpUps from finds because it becomes too complicated
|
554 | 564 | // to reason about.
|
555 | 565 | absl::flat_hash_map<string_view, unsigned> key_index;
|
| 566 | + if (mget_dedup_keys) { |
| 567 | + key_index.reserve(keys.Size()); |
| 568 | + } |
556 | 569 |
|
557 |
| - // First, fetch all iterators and count total size ahead |
558 |
| - size_t total_size = 0; |
559 |
| - unsigned index = 0; |
560 |
| - key_index.reserve(keys.Size()); |
| 570 | + PrimeTable& pt = db_slice.GetDBTable(t->GetDbIndex())->prime; |
| 571 | + |
| 572 | + constexpr unsigned kPrefetchLimit = 32; |
| 573 | + if (mget_prefetch_keys) { |
| 574 | + unsigned prefetched = 0; |
| 575 | + for (string_view key : keys) { |
| 576 | + pt.Prefetch(key); |
| 577 | + if (++prefetched >= kPrefetchLimit) { |
| 578 | + break; |
| 579 | + } |
| 580 | + } |
| 581 | + } |
561 | 582 |
|
562 | 583 | for (string_view key : keys) {
|
563 |
| - auto [it, inserted] = key_index.try_emplace(key, index); |
564 |
| - if (!inserted) { // duplicate -> point to the first occurrence. |
565 |
| - items[index++].source_index = it->second; |
566 |
| - continue; |
| 584 | + if (mget_dedup_keys) { |
| 585 | + auto [it, inserted] = key_index.try_emplace(key, index); |
| 586 | + if (!inserted) { // duplicate -> point to the first occurrence. |
| 587 | + items[index++].source_index = it->second; |
| 588 | + continue; |
| 589 | + } |
567 | 590 | }
|
568 | 591 |
|
569 | 592 | auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
|
@@ -618,8 +641,8 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran
|
618 | 641 | }
|
619 | 642 | }
|
620 | 643 | }
|
621 |
| - key_index.clear(); |
622 | 644 |
|
| 645 | + key_index.clear(); |
623 | 646 | return response;
|
624 | 647 | }
|
625 | 648 |
|
|
0 commit comments