Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/client/address_group.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,25 @@ class AddressGroup extends Base {
}

async getConnection(req) {
const hasFilter = typeof this.options.balancerFilter === 'function';
// 增加一层filter判断,若正常则直接返回,否则走原 getConnection 逻辑
if (hasFilter) {
try {
const filterConnection = await this.getConnectionDefault(req, true);
if (filterConnection) return filterConnection;
} catch (error) {
// filter 地址都失败后,多走一次原 getConnection 逻辑
this.logger.info('[AddressGroup] filterConnection error', error);
}
}
return await this.getConnectionDefault(req);
}

async getConnectionDefault(req, needFilter) {
const meta = req.meta;
meta.connectionGroup = this.key;

const address = this._loadbalancer.select(req);
const address = this._loadbalancer.select(req, needFilter);
if (!address) return null;

const { connectionOpts, connectionClass } = this.options;
Expand Down
18 changes: 17 additions & 1 deletion lib/client/loadbalancer/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,23 @@ class LoadBalancer extends Base {
return this.addressGroup.logger;
}

select(request) {
select(request, needFilter) {
// 需要时才需要过滤
const hasFilter = needFilter && typeof this.addressGroup.options.balancerFilter === 'function';
this.inBalancerFilterFilter = false;
// 透出addressList,供外部进行一次优先筛选
if (hasFilter) {
const list = this.addressGroup.options.balancerFilter(
this.addressList
);
if (Array.isArray(list) && list.length > 0) {
this.inBalancerFilterFilter = true;
const address = this._doSelect(request, list);
this.inBalancerFilterFilter = false;
// 没有数据使用this.addressList重试
if (address) return address;
}
}
if (this.size === 0) return null;
if (this.size === 1) return this.addressList[0];

Expand Down
27 changes: 27 additions & 0 deletions lib/client/loadbalancer/weight_rr.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer {
}

_doSelect(request, addressList) {
// 存在balancerFilterFilter标识使用filter_doSelect
if (this.inBalancerFilterFilter) return this.filter_doSelect(request, addressList);
let address;
let count = this.size;
while (count--) {
Expand All @@ -37,6 +39,31 @@ class WeightRoundRobinLoadBalancer extends LoadBalancer {
// 直接返回兜底
return addressList[this._offset];
}

// 原逻辑this.size this.offset使用了this.addressList, 在这里新增一个使用内部addressList的方法
filter_doSelect(request, addressList) {
let address;
let count = addressList.length;
this.filter_offset = utility.random(addressList.length);
while (count--) {
address = this.filter_rr(request, addressList);
if (address) return address;
}
// 全失败
return null;
}

filter_rr(request, addressList) {
const address = addressList[this.filter_offset];
this._offset = (this.filter_offset + 1) % addressList.length;

const weight = this.getWeight(address);
if (weight === DEFAULT_WEIGHT) return address;
if (weight === 0) return null;

const randNum = utility.random(DEFAULT_WEIGHT);
return weight >= randNum ? address : null;
}
}

module.exports = WeightRoundRobinLoadBalancer;
84 changes: 84 additions & 0 deletions test/client/address_group.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,90 @@ describe('test/client/address_group.test.js', () => {
await utils.closeAll();
});

it('balancerFilter 优先匹配', async function() {
await Promise.all([
utils.startServer(13201),
utils.startServer(13202),
utils.startServer(13203),
utils.startServer(13204),
]);

const addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
balancerFilter: addressList => {
return addressList.filter(v => {
return v.host === '127.0.0.1:13202';
});
},
});
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13201', true),
urlparse('bolt://127.0.0.1:13202', true),
urlparse('bolt://127.0.0.1:13203', true),
urlparse('bolt://127.0.0.1:13204', true),
];
await addressGroup.ready();
let count = 3;
while (count--) {
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
// 优先匹配
assert(connection.url === 'bolt://127.0.0.1:13202');
}
// 匹配到的数据不可用时,会调用其他地址
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13201', true),
];
count = 3;
while (count--) {
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
assert(connection.url === 'bolt://127.0.0.1:13201');
}
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:13202', true),
];
addressGroup._weightMap.set('127.0.0.1:13202', 20);
addressGroup._maxIdleWindow = addressGroup._maxIdleWindow + Date.now();
const connection = await addressGroup.getConnection(req);
assert(connection && connection.isConnected);
assert(connection.url === 'bolt://127.0.0.1:13202');
addressGroup.close();
await connectionManager.closeAllConnections();
await utils.closeAll();
});

it('balancerFilter 优先匹配 异常', async function() {
await Promise.all([
utils.startServer(13201),
utils.startServer(13202),
]);

const addressGroup = new AddressGroup({
key: 'xxx',
logger,
connectionManager,
balancerFilter: addressList => {
return addressList.map(() => [ 1 ]);
},
});
// 使用错误地址
addressGroup.addressList = [
urlparse('bolt://127.0.0.1:132011', true),
urlparse('bolt://127.0.0.1:132022', true),
];
try {
await addressGroup.getConnection(req);
} catch (error) {
assert(error.code === 'ERR_SOCKET_BAD_PORT');
}
addressGroup.close();
await connectionManager.closeAllConnections();
await utils.closeAll();
});

describe('对于连不上地址的处理', () => {
const mod = 2;
const count = 10;
Expand Down