Skip to content

Commit

Permalink
Entry write support local node region aware placement policy (#4063)
Browse files Browse the repository at this point in the history
* Entry write support local node region aware placement policy

* address comments
  • Loading branch information
hangc0276 authored Jan 5, 2024
1 parent f88f437 commit 031069c
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,20 @@ public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQu
remainingEnsembleBeforeIteration = remainingEnsemble;
int regionsToAllocate = numRemainingRegions;
int startRegionIndex = lastRegionIndex % numRegionsAvailable;
int localRegionIndex = -1;
if (myRegion != null && !UNKNOWN_REGION.equals(myRegion)) {
localRegionIndex = availableRegions.indexOf(myRegion);
}
String region = myRegion;
for (int i = 0; i < numRegionsAvailable; ++i) {
String region = availableRegions.get(startRegionIndex % numRegionsAvailable);
startRegionIndex++;
// select the local region first, and for the rest region select, use round-robin selection.
if (i > 0 || localRegionIndex == -1) {
if (startRegionIndex % numRegionsAvailable == localRegionIndex) {
startRegionIndex++;
}
region = availableRegions.get(startRegionIndex % numRegionsAvailable);
startRegionIndex++;
}
final Pair<Integer, Integer> currentAllocation = regionsWiseAllocation.get(region);
TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
if (!regionsReachedMaxAllocation.contains(region)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1930,4 +1930,119 @@ public void testNotifyRackChangeWithNewRegion() throws Exception {
assertEquals("region2", repp.address2Region.get(addr3.toBookieId()));
assertEquals("region3", repp.address2Region.get(addr4.toBookieId()));
}


@Test
public void testNewEnsemblePickLocalRegionBookies()
throws Exception {
repp.uninitalize();
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.10", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);

// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region2/r2");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region3/r3");
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/region4/r4");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/region5/r5");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/region1/r2");
StaticDNSResolver.addNodeToRack(addr9.getHostName(), "/region1/r2");


updateMyRack("/region1/r2");
repp = new RegionAwareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
addrs.add(addr5.toBookieId());
addrs.add(addr6.toBookieId());
addrs.add(addr7.toBookieId());
addrs.add(addr8.toBookieId());
addrs.add(addr9.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());

int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;

Set<BookieId> excludeBookies = new HashSet<>();

int bookie1Count = 0;
int bookie8Count = 0;
int bookie9Count = 0;
for (int i = 0; i < 100; ++i) {
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
List<BookieId> ensemble = ensembleResponse.getResult();
if (ensemble.contains(addr1.toBookieId())) {
bookie1Count++;
}
if (ensemble.contains(addr8.toBookieId())) {
bookie8Count++;
}
if (ensemble.contains(addr9.toBookieId())) {
bookie9Count++;
}

if (!ensemble.contains(addr8.toBookieId()) && !ensemble.contains(addr9.toBookieId())) {
fail("Failed to select bookie located on the same region and rack with bookie client");
}
if (ensemble.contains(addr2.toBookieId()) && ensemble.contains(addr3.toBookieId())) {
fail("addr2 and addr3 is same rack.");
}
}
LOG.info("Bookie1 Count: {}, Bookie8 Count: {}, Bookie9 Count: {}", bookie1Count, bookie8Count, bookie9Count);

//shutdown all the bookies located in the same region and rack with local node
// to test new ensemble should contain addr1
addrs.remove(addr8.toBookieId());
addrs.remove(addr9.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());
bookie1Count = 0;
bookie8Count = 0;
bookie9Count = 0;
for (int i = 0; i < 100; ++i) {
try {
EnsemblePlacementPolicy.PlacementResult<List<BookieId>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, excludeBookies);
List<BookieId> ensemble = ensembleResponse.getResult();
if (ensemble.contains(addr1.toBookieId())) {
bookie1Count++;
}
if (ensemble.contains(addr8.toBookieId())) {
bookie8Count++;
}
if (ensemble.contains(addr9.toBookieId())) {
bookie9Count++;
}
if (!ensemble.contains(addr1.toBookieId())) {
fail("Failed to select bookie located on the same region with bookie client");
}
if (ensemble.contains(addr8.toBookieId()) || ensemble.contains(addr9.toBookieId())) {
fail("Selected the shutdown bookies");
}
} catch (BKNotEnoughBookiesException e) {
fail("Failed to select the ensemble.");
}
}
LOG.info("Bookie1 Count: {}, Bookie8 Count: {}, Bookie9 Count: {}", bookie1Count, bookie8Count, bookie9Count);

}
}

0 comments on commit 031069c

Please sign in to comment.