Skip to content

support res.removeListener('drain'), res.once('drain') #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: v2
Choose a base branch
from
118 changes: 98 additions & 20 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,39 @@ function compression (options) {
var stream

var _end = res.end
var _on = res.on
var _write = res.write

// proxy drain events from stream
var _addListener = interceptAddListener(res, function (type, listener) {
if (!listeners || type !== 'drain') {
// skip intercept
return false
} else if (stream) {
// add listener to stream instead
stream.on(type, listener)
} else {
// buffer listeners for future stream
listeners.push([type, listener])
}
})

interceptRemoveListener(res, function (type, listener) {
if (!listeners || type !== 'drain') {
// skip intercept
return false
} else if (stream) {
// remove listener from stream
stream.removeListener(type, listener)
} else {
// remove buffered listener
for (var i = listeners.length - 1; i >= 0; i--) {
if (listeners[i][0] === type && listeners[i][1] === listener) {
listeners.splice(i, 1)
}
}
}
})

// flush
res.flush = function flush (cb) {
if (stream) {
Expand Down Expand Up @@ -128,24 +158,9 @@ function compression (options) {
: stream.end()
}

res.on = function on (type, listener) {
if (!listeners || type !== 'drain') {
return _on.call(this, type, listener)
}

if (stream) {
return stream.on(type, listener)
}

// buffer listeners for future stream
listeners.push([type, listener])

return this
}

function nocompress (msg) {
debug('no compression: %s', msg)
addListeners(res, _on, listeners)
addListeners(res, _addListener, listeners)
listeners = null
}

Expand Down Expand Up @@ -231,7 +246,7 @@ function compression (options) {
_end.call(res)
})

_on.call(res, 'drain', function onResponseDrain () {
_addListener.call(res, 'drain', function onResponseDrain () {
stream.resume()
})

Expand All @@ -254,9 +269,9 @@ function compression (options) {
* @private
*/

function addListeners (stream, on, listeners) {
function addListeners (stream, addListener, listeners) {
for (var i = 0; i < listeners.length; i++) {
on.apply(stream, listeners[i])
addListener.apply(stream, listeners[i])
}
}

Expand All @@ -274,6 +289,69 @@ function chunkLength (chunk, encoding) {
: Buffer.byteLength(chunk, encoding)
}

/**
* Intercept add listener on event emitter.
* @private
*/

function interceptAddListener (ee, fn) {
var _addListener = ee.addListener
var _on = ee.on

if (_addListener) {
ee.addListener = function addListener (type, listener) {
return fn.call(this, type, listener) === false
? _addListener.call(this, type, listener)
: this
}
}

if (_on) {
ee.on = function on (type, listener) {
return fn.call(this, type, listener) === false
? _on.call(this, type, listener)
: this
}
}

return _addListener || _on || noop
}

/**
* Intercept remove listener on event emitter.
* @private
*/

function interceptRemoveListener (ee, fn) {
var _removeListener = ee.removeListener
var _off = ee.off

if (_removeListener) {
ee.removeListener = function removeListener (type, listener) {
return fn.call(this, type, listener) === false
? _removeListener.call(this, type, listener)
: this
}
}

if (_off) {
ee.off = function off (type, listener) {
return fn.call(this, type, listener) === false
? _off.call(this, type, listener)
: this
}
}

return _removeListener || _off || noop
}

/**
* Reusable no-op function.
* @private
*/

function noop () {}

/**
* Default filter function.
* @private
Expand Down
180 changes: 180 additions & 0 deletions test/compression.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,186 @@ describe('compression()', function () {
.expect(200, done)
})

describe('listeners', function () {
it('should support removeListener("drain") after on("drain"); stream present', function (done) {
// compression doesn't proxy listenerCount() to the compression stream, so
// instead watch for a MaxListenersExceededWarning
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain") after addListener("drain")', function (done) {
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support off("drain") after addListener("drain")', function (done) {
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
var len = bytes('40kb')
var buf = Buffer.alloc(len, '.')
res.write(buf)
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.addListener('drain', listener)
res.off('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); buffered', function (done) {
// Variant of above tests for scenario when the listener is buffered (stream
// is not yet present).
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
res.on('end', function () {})
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) {
// Variant of above test for scenario when the listener is buffered (stream
// is not yet present) and the same listener is added two or more times.
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
process.on('warning', onWarning)
var server = createServer({ threshold: 0 }, function (req, res) {
res.setHeader('Content-Type', 'text/plain')
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
var listener = function () {}
res.on('drain', listener)
res.on('drain', listener)
res.removeListener('drain', listener)
}
res.end()
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})

// https://github.com/expressjs/compression/issues/135
it('should not leak event listeners when res.unpipe()', function (done) {
var hasWarned = false
var onWarning = function () {
hasWarned = true
}
var server = createServer({ threshold: 0 }, function (req, res) {
var times = 0
var int = setInterval(function () {
var rs = require('fs').createReadStream('does not exist')
rs.on('error', function (e) {
rs.unpipe(res)
})
rs.pipe(res)
if (times++ > res.getMaxListeners()) {
clearInterval(int)
res.end('hello, world')
}
})
})

request(server)
.get('/')
.set('Accept-Encoding', 'gzip')
.expect(function () {
process.removeListener('warning', onWarning)
assert.ok(!hasWarned)
})
.expect(200, done)
})
})

describe('http2', function () {
it('should work with http2 server', function (done) {
var server = createHttp2Server({ threshold: 0 }, function (req, res) {
Expand Down