Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,27 @@ return q.addJob(job).then((savedJobs) => {

```

### Reconnection for the change feeds
If `reconnect` options are supplied in the connection during the queue creation, change-feeds will automatically attempt reconnection based on the `maxAttempts`.

````js
const Queue = require('rethinkdb-job-queue')

const cxnOptions = {
host: 'localhost',
port: 28015,
db : 'JobQueue', // The name of the database in RethinkDB

'reconnect': { // The reconnection options for change feeds
'pingInterval': 59, // in seconds, same as rethinkdbdash `pingInterval` option
'maxAttempts': 99999, // maximum no. of times reconnect should be attempted
'attemptDelay': 5000 // delay factor in milli-seconds between attempts
}

const q = new Queue(cxnOptions, { name: 'my-queue' })
````
Note: This reconnection is applicable only for change-feeds and does not facilitate reconnection for other parts of the package (such as initial db-assertion, job update db writes etc.).

## About the Owner

I, Grant Carthew, am a technologist, trainer, and Dad from Queensland, Australia. I work on code in a number of personal projects and when the need arises I build my own packages.
Expand Down
3 changes: 2 additions & 1 deletion src/db-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ module.exports = function dbDriver (cxn) {
cxn.port != null ||
is.string(cxn.db)) {
logger('cxn is an options object')
cxnCopy.silent = true
cxnCopy.pingInterval = cxnCopy.reconnect?.pingInterval || enums.options.reconnect.pingInterval;
cxnCopy.silent = cxnCopy.reconnect?.silent || enums.options.reconnect.silent;
cxnCopy.host = cxnCopy.host == null
? enums.options.host : cxnCopy.host
cxnCopy.port = cxnCopy.port == null
Expand Down
1 change: 1 addition & 0 deletions src/enums.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const options = Object.freeze({
host: 'localhost',
port: 28015,
db: 'rjqJobQueue',
reconnect: { silent: true, pingInterval: -1, maxAttempts: 0, attemptDelay: 5000 },
queryRunOptions: { readMode: 'majority' },
databaseInitDelay: 1000,
masterInterval: 310000, // 5 minutes and 10 seconds
Expand Down
60 changes: 50 additions & 10 deletions src/queue-db.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,53 @@ const dbAssert = require('./db-assert')
const dbReview = require('./db-review')
const queueChange = require('./queue-change')
const dbDriver = require('./db-driver')
const { ReqlDriverError, ReqlServerError } = require('rethinkdbdash/lib/error')

function _isConnectionError(err) {
// Credit: https://github.com/LearnersGuild/rethinkdb-changefeed-reconnect/blob/master/src/index.js#L82
// FIXME: I'm not terribly happy about this particular logic, but
// unfortunately, rethinkdbdash doesn't provide a consistent error type (or
// even message) when it's having trouble connecting to a changefeed,
// particularly if it is connecting via a rethinkdb proxy server.
return (err instanceof ReqlServerError) ||
(err instanceof ReqlDriverError) ||
(err.msg && err.msg.match(/Changefeed\saborted/)) ||
(err.msg && err.msg.match(/primary\sreplica.*not\savailable/));
}

function tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt) {
// if we are detaching (or detached), lets not crash the app with connection errors.
if (q._dbDetached) return error;
// if this is connection error, lets try reconnecting.
if (_isConnectionError(error)) {
// if no further attempts left, throw it as it is.
if (++nRetryAttempt > maxAttempts)
throw error;
// try reconnection after some linear delay.
logger(`connection error, retry after ${nRetryAttempt * attemptDelay / 1000} sec`);
return Promise.resolve().delay(nRetryAttempt * attemptDelay)
.then(() => monitorChangeFeed(q, { maxAttempts, attemptDelay }, nRetryAttempt));
}
throw error;
}

function monitorChangeFeed(q, {
maxAttempts = enums.options.reconnect.maxAttempts,
attemptDelay = enums.options.reconnect.attemptDelay } = {}, nRetryAttempt = 0) {

logger('monitorChangeFeed');

return q.r.db(q.db).table(q.name).changes().run(q.queryRunOptions).then(function (changeFeed) {
// we connected successfully, lets reset the counter
nRetryAttempt = 0;
// fetch each change and act on it
q._changeFeedCursor = changeFeed;
return q._changeFeedCursor.each(function (error, change) {
if (error) return tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt);
return queueChange(q, error, change);
});
}).catch(error => tryReconnect(q, error, maxAttempts, attemptDelay, nRetryAttempt));
}

module.exports.attach = function dbAttach (q, cxn) {
logger('attach')
Expand All @@ -23,16 +70,7 @@ module.exports.attach = function dbAttach (q, cxn) {
].join(':')
q._ready = dbAssert(q).then(() => {
if (q.changeFeed) {
return q.r.db(q.db)
.table(q.name)
.changes()
.run(q.queryRunOptions)
.then((changeFeed) => {
q._changeFeedCursor = changeFeed
return q._changeFeedCursor.each((err, change) => {
return queueChange(q, err, change)
})
})
return monitorChangeFeed(q, cxn.reconnect);
}
q._changeFeedCursor = false
return null
Expand All @@ -47,12 +85,14 @@ module.exports.attach = function dbAttach (q, cxn) {
q.emit(enums.status.ready, q.id)
return true
})
q._dbDetached = false;
return q._ready
}

module.exports.detach = function dbDetach (q) {
logger('detach')
return Promise.resolve().then(() => {
q._dbDetached = true;
if (q._changeFeedCursor) {
let feed = q._changeFeedCursor
q._changeFeedCursor = false
Expand Down