Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions lib/dispatcher/balanced-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class BalancedPool extends PoolBase {
.map((p) => p[kUrl].origin)
}

[kGetDispatcher] () {
[kGetDispatcher] (peek = false) {
// We validate that pools is greater than 0,
// otherwise we would have to wait until an upstream
// is added, which might never happen.
Expand All @@ -174,6 +174,10 @@ class BalancedPool extends PoolBase {
return
}

if (peek) {
return dispatcher
}

const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)

if (allClientsBusy) {
Expand All @@ -182,7 +186,18 @@ class BalancedPool extends PoolBase {

let counter = 0

let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
let maxWeightIndex = -1

for (let i = 0; i < this[kClients].length; i++) {
const pool = this[kClients][i]

if (
!pool[kNeedDrain] &&
(maxWeightIndex === -1 || pool[kWeight] > this[kClients][maxWeightIndex][kWeight])
) {
maxWeightIndex = i
}
}

while (counter++ < this[kClients].length) {
this[kIndex] = (this[kIndex] + 1) % this[kClients].length
Expand All @@ -199,7 +214,7 @@ class BalancedPool extends PoolBase {
this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]

if (this[kCurrentWeight] <= 0) {
this[kCurrentWeight] = this[kMaxWeightPerServer]
this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
}
}
if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
Expand Down
2 changes: 1 addition & 1 deletion lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class PoolBase extends DispatcherBase {
this[kQueued]++
} else if (!dispatcher.dispatch(opts, handler)) {
dispatcher[kNeedDrain] = true
this[kNeedDrain] = !this[kGetDispatcher]()
this[kNeedDrain] = !this[kGetDispatcher](true)
}

return !this[kNeedDrain]
Expand Down
6 changes: 5 additions & 1 deletion lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class Pool extends PoolBase {
})
}

[kGetDispatcher] () {
[kGetDispatcher] (peek = false) {
const clientTtlOption = this[kOptions].clientTtl
for (const client of this[kClients]) {
// check ttl of client and if it's stale, remove it from the pool
Expand All @@ -108,6 +108,10 @@ class Pool extends PoolBase {
}

if (!this[kConnections] || this[kClients].length < this[kConnections]) {
if (peek) {
return true
}

const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
return dispatcher
Expand Down
10 changes: 9 additions & 1 deletion lib/dispatcher/round-robin-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,16 @@ class RoundRobinPool extends PoolBase {
})
}

[kGetDispatcher] () {
[kGetDispatcher] (peek = false) {
const clientTtlOption = this[kOptions].clientTtl
const clientsLength = this[kClients].length

// If we have no clients yet, create one
if (clientsLength === 0) {
if (peek) {
return true
}

const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
return dispatcher
Expand Down Expand Up @@ -127,6 +131,10 @@ class RoundRobinPool extends PoolBase {

// All clients are busy, create a new one if we haven't reached the limit
if (!this[kConnections] || clientsLength < this[kConnections]) {
if (peek) {
return true
}

const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
return dispatcher
Expand Down
100 changes: 100 additions & 0 deletions test/node-test/balanced-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,106 @@ test('getUpstream returns undefined for closed/destroyed upstream', (t) => {
p.strictEqual(result, undefined)
})

function createReusableBackend (name) {
const sockets = new Set()
const server = createServer({ joinDuplicateHeaders: true }, (_req, res) => {
res.end(name)
})

server.on('connection', (socket) => {
sockets.add(socket)
socket.on('close', () => sockets.delete(socket))
})

let port = 0

return {
get url () {
return `http://127.0.0.1:${port}`
},
async start () {
if (server.listening) {
return
}

await new Promise((resolve, reject) => {
server.listen(port, '127.0.0.1', (err) => {
if (err) {
reject(err)
} else {
port = server.address().port
resolve()
}
})
})
},
async stop () {
if (!server.listening) {
return
}

for (const socket of sockets) {
socket.destroy()
}

await new Promise((resolve) => server.close(resolve))
}
}
}

test('weighted round robin recovers with two upstreams after both hit minimum weight', async (t) => {
const backends = [createReusableBackend('A'), createReusableBackend('B')]
await Promise.all(backends.map((backend) => backend.start()))

t.after(async () => Promise.all(backends.map((backend) => backend.stop())))

const pool = new BalancedPool(backends.map((backend) => backend.url), {
connections: 1,
pipelining: 0,
connectTimeout: 200,
keepAliveTimeout: 1000,
keepAliveTimeoutThreshold: 100
})

t.after(() => pool.destroy())

async function requestBackend () {
try {
const { body } = await pool.request({
method: 'GET',
path: '/',
headersTimeout: 500,
bodyTimeout: 500
})
return await body.text()
} catch {
return 'ERR'
}
}

for (let i = 0; i < 12; i++) {
await requestBackend()
}

await Promise.all(backends.map((backend) => backend.stop()))

for (let i = 0; i < 24; i++) {
await requestBackend()
}

await backends[0].start()

const recoveredPhaseResults = []
for (let i = 0; i < 24; i++) {
recoveredPhaseResults.push(await requestBackend())
}

assert.ok(
recoveredPhaseResults.includes('A'),
`expected at least one request to recover through upstream A, received: ${recoveredPhaseResults.join(', ')}`
)
})

class TestServer {
constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) {
this.config = {
Expand Down
Loading