Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions fdbrpc/ReplicationPolicy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ bool PolicyAcross::validate(std::vector<LocalityEntry> const& solutionSet,
// Choose new servers from "least utilized" alsoServers and append the new servers to results
// fromserverse are the servers that have already been chosen and
// that should be excluded from being selected as replicas.
// FIXME: Simplify this function, such as removing unnecessary printf
// fromServers are the servers that must have;
// alsoServers are the servers you can choose.
bool PolicyAcross::selectReplicas(Reference<LocalitySet>& fromServers,
std::vector<LocalityEntry> const& alsoServers,
Expand All @@ -190,16 +188,17 @@ bool PolicyAcross::selectReplicas(Reference<LocalitySet>& fromServers,
int resultsInit = results.size();

// Clear the member variables
AttribValue firstUsedValue, secondUsedValue;
_usedValues.clear();
_newResults.clear();
_addedResults.resize(_arena, 0);

for (auto& alsoServer : alsoServers) {
auto value = fromServers->getValueViaGroupKey(alsoServer, groupIndexKey);
if (value.present()) {
auto lowerBound = std::lower_bound(_usedValues.begin(), _usedValues.end(), value.get());
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
//_selected is a set of processes that have the same indexKey and indexValue (value)
if ((firstUsedValue != value.get() && secondUsedValue != value.get()) &&
((lowerBound == _usedValues.end()) || (*lowerBound != value.get()))) {
// _selected is a set of processes that have the same indexKey and indexValue (value)
_selected = fromServers->restrict(indexKey, value.get());
if (_selected->size()) {
// Pass only the also array item which are valid for the value
Expand All @@ -213,7 +212,13 @@ bool PolicyAcross::selectReplicas(Reference<LocalitySet>& fromServers,
}
if (count >= _count)
break;
_usedValues.insert(lowerBound, value.get());
if (firstUsedValue._id == -1) {
firstUsedValue = AttribValue(value.get());
} else if (secondUsedValue._id == -1) {
secondUsedValue = AttribValue(value.get());
} else {
_usedValues.insert(lowerBound, value.get());
}
}
}
}
Expand Down Expand Up @@ -259,14 +264,21 @@ bool PolicyAcross::selectReplicas(Reference<LocalitySet>& fromServers,
auto value = fromServers->getValueViaGroupKey(entry, groupIndexKey);
if (value.present()) {
auto lowerBound = std::lower_bound(_usedValues.begin(), _usedValues.end(), value.get());
if ((lowerBound == _usedValues.end()) || (*lowerBound != value.get())) {
if ((firstUsedValue != value.get() && secondUsedValue != value.get()) &&
((lowerBound == _usedValues.end()) || (*lowerBound != value.get()))) {
_selected = fromServers->restrict(indexKey, value.get());
if (_selected->size()) {
if (_policy->selectReplicas(_selected, emptyEntryArray, results)) {
count++;
if (count >= _count)
break;
_usedValues.insert(lowerBound, value.get());
if (firstUsedValue._id == -1) {
firstUsedValue = AttribValue(value.get());
} else if (secondUsedValue._id == -1) {
secondUsedValue = AttribValue(value.get());
} else {
_usedValues.insert(lowerBound, value.get());
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions fdbrpc/include/fdbrpc/ReplicationTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ struct AttribValue {
bool operator<=(AttribValue const& source) const { return !(*this > source); }
bool operator>=(AttribValue const& source) const { return !(*this < source); }
};
namespace std {
template <>
struct hash<AttribValue> {
std::size_t operator()(const AttribValue& v) const noexcept { return static_cast<std::size_t>(v._id); }
// return std::hash<int>{}(v._id);
};
} // namespace std
struct LocalityEntry {
int _id;
explicit LocalityEntry() : _id(-1) {}
Expand Down