From c9f98013fd417b8aee7106aa2c8eb8c9470b9e03 Mon Sep 17 00:00:00 2001 From: rajendrant Date: Sun, 6 Jan 2019 04:02:31 -0800 Subject: [PATCH] Optimize key based indexing and then scoring --- benchmark/benchmark-large.coffee | 14 +++ fuzzaldrin.coffee | 23 ++--- src/ConcurrentQueue.h | 71 ++++++++++++++ src/common.h | 13 ++- src/filter.cc | 153 ++++++++++++++++++++++++------- src/fuzzaldrin.cc | 44 +++++++-- src/fuzzaldrin.h | 2 +- 7 files changed, 264 insertions(+), 56 deletions(-) create mode 100644 src/ConcurrentQueue.h diff --git a/benchmark/benchmark-large.coffee b/benchmark/benchmark-large.coffee index 5e14c83e..90782d76 100644 --- a/benchmark/benchmark-large.coffee +++ b/benchmark/benchmark-large.coffee @@ -40,6 +40,20 @@ for query in three_letter_tests console.timeEnd('ThreeLetter#fuzzaldrin-plus-fast') console.log("======") +console.time('TwoLetter#fuzzaldrin-plus-fast-filter') +for query in two_letter_tests + FuzzaldrinPlusFast.filter lines, query, maxResults: 10 +console.timeEnd('TwoLetter#fuzzaldrin-plus-fast-filter') +console.log("======") + +dict = [] +for e in lines + dict.push {key:e, val:e} +console.time('TwoLetter#Keybased#Filter') +for query in two_letter_tests + FuzzaldrinPlusFast.filter dict, query, maxResults: 10, key: 'key' +console.timeEnd('TwoLetter#Keybased#Filter') +console.log("======") # An exmaple run below # npm run benchmarklarge diff --git a/fuzzaldrin.coffee b/fuzzaldrin.coffee index f46399e7..24aac724 100644 --- a/fuzzaldrin.coffee +++ b/fuzzaldrin.coffee @@ -17,21 +17,12 @@ class FuzzaldrinPlusFast @obj = new binding.Fuzzaldrin() setCandidates: (candidates, options = {}) -> - @item_to_val = null - if options.key? - @item_to_val = {} - newcandidates = [] - for c in candidates - newcandidates.push c[options.key] - @item_to_val[c[options.key]] = c - candidates = newcandidates @obj.setCandidates(candidates) filter: (query, options = {}) -> options = parseOptions(options) - filtered = @obj.filter query, options.maxResults, + @obj.filter query, options.maxResults, options.usePathScoring, options.useExtensionBonus - return if @item_to_val? then filtered.map((item) => @item_to_val[item]) else filtered module.exports = @@ -39,9 +30,15 @@ module.exports = new FuzzaldrinPlusFast() filter: (candidates, query, options = {}) -> - obj = new FuzzaldrinPlusFast() - obj.setCandidates(candidates, options) - obj.filter(query, options) + if options.key? + options = parseOptions(options) + filtered = binding.filterWithCandidates query, options.maxResults, + options.usePathScoring, options.useExtensionBonus, candidates, options.key + return filtered.map((item) => candidates[item]) + else + obj = new FuzzaldrinPlusFast() + obj.setCandidates(candidates, options) + obj.filter(query, options) score: (candidate, query, options = {}) -> options = parseOptions(options) diff --git a/src/ConcurrentQueue.h b/src/ConcurrentQueue.h new file mode 100644 index 00000000..9aeed28c --- /dev/null +++ b/src/ConcurrentQueue.h @@ -0,0 +1,71 @@ +// +// Copyright (c) 2013 Juan Palacios juan.palacios.puyana@gmail.com +// Subject to the BSD 2-Clause License +// - see < http://opensource.org/licenses/BSD-2-Clause> +// +// Source: +// https://github.com/juanchopanza/cppblog/blob/master/Concurrency/Queue/Queue.h + +#ifndef CONCURRENT_QUEUE_H__ +#define CONCURRENT_QUEUE_H__ + +#include +#include +#include +#include + +template +class ConcurrentQueue +{ + public: + + bool empty() { + std::unique_lock mlock(mutex_); + bool res = queue_.empty(); + mlock.unlock(); + return res; + } + + T pop() + { + std::unique_lock mlock(mutex_); + while (queue_.empty()) + { + cond_.wait(mlock); + } + auto val = queue_.front(); + queue_.pop(); + return val; + } + + void pop(T& item) + { + std::unique_lock mlock(mutex_); + while (queue_.empty()) + { + cond_.wait(mlock); + } + item = queue_.front(); + queue_.pop(); + } + + void push(const T& item) + { + std::unique_lock mlock(mutex_); + queue_.push(item); + mlock.unlock(); + cond_.notify_one(); + } + ConcurrentQueue() = default; + ConcurrentQueue(const ConcurrentQueue&) = delete; // disable copying + ConcurrentQueue& operator=(const ConcurrentQueue&) = delete; // disable assignment + // move constructor + ConcurrentQueue(ConcurrentQueue && a) : queue_(std::move(a.queue_)) {} + + private: + std::queue queue_; + std::mutex mutex_; + std::condition_variable cond_; +}; + +#endif // CONCURRENT_QUEUE_H__ diff --git a/src/common.h b/src/common.h index 232b59b6..00a24821 100644 --- a/src/common.h +++ b/src/common.h @@ -7,6 +7,8 @@ #include #include +#include + using namespace std; @@ -30,8 +32,15 @@ typedef string Element; typedef string Candidate; #endif +typedef struct CandidateIndex { + size_t thread_id; + size_t index; + CandidateIndex(size_t thread_id, size_t index) : thread_id(thread_id), index(index) {} +} CandidateIndex; + typedef std::vector Candidates; typedef float Score; +typedef std::vector CandidateIndexes; struct Options; @@ -76,4 +85,6 @@ extern Score path_scorer_score(const Candidate &string, const Element &query, co extern int countDir(const Candidate &path, int end, char pathSeparator); extern Candidate getExtension(const Candidate &str); -extern Candidates filter(const Candidates &candidates, const Element &query, const Options &options); +extern CandidateIndexes filter(const vector &candidates, const Element &query, const Options &options); + +Napi::Value filter_with_candidates(Napi::Env env, const Napi::Array &candidates, const std::string &key, const std::string &query, const Options &options); diff --git a/src/filter.cc b/src/filter.cc index c35ba773..0abd7770 100644 --- a/src/filter.cc +++ b/src/filter.cc @@ -2,13 +2,16 @@ #include #include #include +#include + +#include "ConcurrentQueue.h" namespace { struct CandidateScore { Score score; - Candidate candidate; - CandidateScore(Score score, Candidate candidate) : score(score), candidate(candidate) {} + CandidateIndex index; + CandidateScore(Score score, size_t thread_id, size_t index) : score(score), index(thread_id, index) {} bool operator<(const CandidateScore& other) const { return score > other.score; @@ -17,29 +20,54 @@ struct CandidateScore { typedef std::priority_queue CandidateScorePriorityQueue; -void thread_worker_filter(const Candidates &candidates, - size_t start, size_t end, - const Element &query, const Options &options, - size_t max_results, - CandidateScorePriorityQueue &results) { - if (start >= end || end >candidates.size()) - return; - for (size_t i = start; i < end; i++) { +struct ThreadState { + ConcurrentQueue input; + CandidateScorePriorityQueue results; + ThreadState() = default; +}; + +void filter_internal(const Candidates &candidates, + size_t thread_id, + size_t start_index, + const Element &query, const Options &options, + size_t max_results, + CandidateScorePriorityQueue &results) { + for (size_t i=0; i0) { - results.emplace(score, candidate); + results.emplace(score, thread_id, start_index+i); if (results.size() > max_results) results.pop(); } } } -Candidates sort_priority_queue(CandidateScorePriorityQueue &candidates) { +void thread_worker_filter(ThreadState &thread_state, size_t thread_id, + const Candidates *initial_candidates, + const Element &query, const Options &options, + size_t max_results) { + size_t start_index = 0; + if (initial_candidates) { + filter_internal(*initial_candidates, thread_id, 0, query, options, max_results, + thread_state.results); + start_index += initial_candidates->size(); + } + while (true) { + Candidates candidates; + thread_state.input.pop(candidates); + if(candidates.empty()) break; + filter_internal(candidates, thread_id, start_index, query, options, max_results, + thread_state.results); + start_index += candidates.size(); + } +} + +CandidateIndexes sort_priority_queue(CandidateScorePriorityQueue &candidates) { vector sorted; - Candidates ret; + CandidateIndexes ret; sorted.reserve(candidates.size()); ret.reserve(candidates.size()); while(!candidates.empty()) { @@ -48,53 +76,112 @@ Candidates sort_priority_queue(CandidateScorePriorityQueue &candidates) { } std::sort(sorted.begin(), sorted.end()); for(const auto& item : sorted) { - ret.push_back(item.candidate); + ret.push_back(item.index); } return ret; } -} +} // namespace -Candidates filter(const Candidates &candidates, const Element &query, const Options &options) { +CandidateIndexes filter(const vector &candidates, const Element &query, const Options &options) { CandidateScorePriorityQueue top_k; size_t max_results = options.max_results; - if (!max_results || max_results >= candidates.size()) - max_results = candidates.size(); + if (!max_results) + max_results = std::numeric_limits::max(); - if (candidates.size() < 10000) { - thread_worker_filter(candidates, 0, candidates.size(), query, - options, max_results, top_k); + if (candidates.size()==1) { + filter_internal(candidates[0], 0, 0, query, options, max_results, top_k); return sort_priority_queue(top_k); } // Split the dataset and pass down to multiple threads. + const size_t max_threads = candidates.size(); + vector threads; + vector thread_state(max_threads); + for (size_t i = 0; i < max_threads; i++) { + threads.emplace_back( + thread_worker_filter, ref(thread_state[i]), i, + &candidates[i], + ref(query), ref(options), max_results); + } + // Push an empty vector for the threads to terminate. + for (size_t i = 0; i < max_threads; i++) { + Candidates t; + thread_state[i].input.push(t); + } + // Wait for threads to complete and merge the restuls. + for (size_t i = 0; i < max_threads; i++) { + threads[i].join(); + auto &results = thread_state[i].results; + while(!results.empty()) { + top_k.emplace(results.top()); + results.pop(); + if (top_k.size() > max_results) + top_k.pop(); + } + } + return sort_priority_queue(top_k); +} + +Napi::Value filter_with_candidates(Napi::Env env, const Napi::Array &candidates, + const std::string &key, const std::string &query, const Options &options) { + CandidateScorePriorityQueue top_k; + size_t max_results = options.max_results; + if (!max_results) + max_results = std::numeric_limits::max(); + + Napi::Array res = Napi::Array::New(env); const size_t max_threads = 8; vector threads; - vector thread_results(max_threads); + vector thread_state(max_threads); + vector chunks; + vector initial_candidates(max_threads); size_t cur_start = 0; for (size_t i = 0; i < max_threads; i++) { - size_t chunk_size = candidates.size() / max_threads; + size_t chunk_size = candidates.Length() / max_threads; // Distribute remainder among the chunks. - if (i < candidates.size() % max_threads) { + if (i < candidates.Length() % max_threads) { chunk_size++; } + for(size_t j=0; j<1000 && j max_results) top_k.pop(); } } - return sort_priority_queue(top_k); + auto indexes = sort_priority_queue(top_k); + for(size_t i=0; i 0) + ind += chunks[indexes[i].thread_id-1]; + res[i] = Napi::Number::New(env, ind); + } + return res; } diff --git a/src/fuzzaldrin.cc b/src/fuzzaldrin.cc index fd036849..7d4c6f0d 100644 --- a/src/fuzzaldrin.cc +++ b/src/fuzzaldrin.cc @@ -7,7 +7,7 @@ Napi::Value Fuzzaldrin::Filter(const Napi::CallbackInfo& info) { if (info.Length() != 4 || !info[0].IsString() || !info[1].IsNumber() || !info[2].IsBoolean() || !info[3].IsBoolean()) { Napi::TypeError::New(info.Env(), "Invalid arguments").ThrowAsJavaScriptException(); - return res; + return Napi::Boolean(); } std::string query = info[0].As(); size_t maxResults = info[1].As().Uint32Value(); @@ -17,7 +17,8 @@ Napi::Value Fuzzaldrin::Filter(const Napi::CallbackInfo& info) { const auto matches = filter(candidates_, query, options); for(uint32_t i=0; i(); candidates_.clear(); - candidates_.reserve(candidates.Length()); - for(uint32_t i=0; i(); + size_t maxResults = info[1].As().Uint32Value(); + bool usePathScoring = info[2].As(); + bool useExtensionBonus = info[3].As(); + Options options(query, maxResults, usePathScoring, useExtensionBonus); + return filter_with_candidates(info.Env(), info[4].As(), info[5].As(), query, options); +} + Napi::Object Fuzzaldrin::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); @@ -62,6 +89,7 @@ Napi::Object Fuzzaldrin::Init(Napi::Env env, Napi::Object exports) { exports.Set("Fuzzaldrin", func); exports.Set("score", Napi::Function::New(env, score)); + exports.Set("filterWithCandidates", Napi::Function::New(env, filterWithCandidates)); return exports; } diff --git a/src/fuzzaldrin.h b/src/fuzzaldrin.h index ef207c6f..b4f8323b 100644 --- a/src/fuzzaldrin.h +++ b/src/fuzzaldrin.h @@ -16,7 +16,7 @@ class Fuzzaldrin : public Napi::ObjectWrap { Napi::Value SetCandidates(const Napi::CallbackInfo& info); private: - Candidates candidates_; + vector candidates_; }; #endif // FUZZALDRIN_H