Skip to content

Commit 74d5903

Browse files
andreasonny83GitHub Enterprise
authored and
GitHub Enterprise
committed
FRIDGE-2080 Add gql subscription (#435)
* FRIDGE-2080 test graphql-ws * FRIDGE-2080 Add GQL Subscriptions
1 parent 7c2587a commit 74d5903

File tree

11 files changed

+1723
-1019
lines changed

11 files changed

+1723
-1019
lines changed

lib/Worker.js

+87-11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import isObject from 'lodash/isObject';
33
import isString from 'lodash/isString';
44
import Configuration from './util/Configuration';
55
import EventBridgeSignaling from './signaling/EventBridgeSignaling';
6+
import GraphQLSignaling from './signaling/GraphqlSignaling';
67
import { EventEmitter } from 'events';
78
import Logger from './util/Logger';
89
import Request from './util/Request';
@@ -71,7 +72,7 @@ class Worker extends EventEmitter {
7172
* @param {WorkerOptions} [options]
7273
* @param {WorkerDeps} [deps]
7374
*/
74-
constructor(token, options = {}, deps = { Request, EventBridgeSignaling }) {
75+
constructor(token, options = {}, deps = { Request, EventBridgeSignaling, GraphQLSignaling }) {
7576
super();
7677

7778
// check the jwt token
@@ -88,6 +89,7 @@ class Worker extends EventEmitter {
8889
wsServer: (val) => isString(val),
8990
region: (val) => isString(val),
9091
enableVersionCheck: (val) => isBoolean(val),
92+
useGraphQL: (val) => isBoolean(val),
9193
};
9294

9395
validateOptions(options, types);
@@ -102,6 +104,11 @@ class Worker extends EventEmitter {
102104
* @type {boolean}
103105
*/
104106
this._closeExistingSessions = options.closeExistingSessions;
107+
/**
108+
* @private
109+
* @type {boolean}
110+
*/
111+
this._useGraphQL = options.useGraphQL || false;
105112
/**
106113
* @private
107114
* @type {string}
@@ -143,19 +150,34 @@ class Worker extends EventEmitter {
143150
this._connectRetry = 0;
144151
/**
145152
* @private
146-
* @type {EventBridgeSignaling}
153+
* @type {GraphQLSignaling}
147154
*/
148-
this._signaling = new deps.EventBridgeSignaling(this, {
149-
closeExistingSessions: options.closeExistingSessions,
150-
setWorkerOfflineIfDisconnected: options.setWorkerOfflineIfDisconnected,
151-
});
155+
if (this._useGraphQL) {
156+
this._gqlSignaling = new deps.GraphQLSignaling(this, {
157+
closeExistingSessions: options.closeExistingSessions,
158+
setWorkerOfflineIfDisconnected: options.setWorkerOfflineIfDisconnected,
159+
});
160+
} else {
161+
/**
162+
* @private
163+
* @type {EventBridgeSignaling}
164+
*/
165+
this._signaling = new deps.EventBridgeSignaling(this, {
166+
closeExistingSessions: options.closeExistingSessions,
167+
setWorkerOfflineIfDisconnected: options.setWorkerOfflineIfDisconnected,
168+
});
169+
}
152170
/**
153171
* @private
154172
* @type {RetryUtil}
155173
*/
156174
this.retryUtil = new RetryUtil();
157175

158-
this._subscribeToSignalingEvents();
176+
if (this._useGraphQL) {
177+
this._subscribeToGQLEvents();
178+
} else {
179+
this._subscribeToSignalingEvents();
180+
}
159181

160182
let eventHandler;
161183
if (options.eventHandlerClass) {
@@ -362,7 +384,11 @@ class Worker extends EventEmitter {
362384

363385
try {
364386
this._config.updateToken(newToken);
365-
this._signaling.updateToken(newToken);
387+
if (this._useGraphQL) {
388+
this._gqlSignaling.updateToken(newToken);
389+
} else {
390+
this._signaling.updateToken(newToken);
391+
}
366392
this.emit('tokenUpdated');
367393
} catch (err) {
368394
this.emit('error', err);
@@ -417,13 +443,53 @@ class Worker extends EventEmitter {
417443
});
418444
}
419445

446+
/**
447+
* @private
448+
*/
449+
_subscribeToGQLEvents() {
450+
this._log.info('[GQL] Subscribing to Signaling events .... ');
451+
452+
this._gqlSignaling.on('connected', () => {
453+
this._log.info('[GQL] Received Event: \'connected\' from Signaling layer. Pending initialization.', this.sid);
454+
});
455+
456+
this._gqlSignaling.on('disconnected', reason => {
457+
this._log.info('[GQL] Received Event: \'disconnected\' from Signaling layer for Worker %s. %s', this.sid, reason);
458+
this._unSubscribeFromTaskRouterEvents();
459+
this.emit('disconnected', reason);
460+
});
461+
462+
this._gqlSignaling.on('init', evt => {
463+
this._log.info('[GQL] Received Event: \'init\' from Signaling layer. Proceeding to initialize Worker %s.', evt.channel_id);
464+
this.sid = evt.channel_id;
465+
this.accountSid = evt.account_sid;
466+
this.workspaceSid = evt.workspace_sid;
467+
// Check if we need this: this._gqlSignaling.setLifetime(evt.token_lifetime);
468+
this._initialize();
469+
});
470+
471+
this._gqlSignaling.on('error', err => {
472+
this._log.info('[GQL] Received Event: \'error\' from Signaling layer for Worker %s.', this.sid);
473+
this.emit('error', err);
474+
});
475+
476+
this._gqlSignaling.on('tokenExpired', () => {
477+
this._log.info('[GQL] Received Event: \'tokenExpired\' for for Worker %s. Please update the token. Websocket will not reconnect automatically until token is updated.', this.sid);
478+
this.emit('tokenExpired');
479+
});
480+
}
481+
420482
/**
421483
* @private
422484
*/
423485
_subscribeToTaskRouterEvents() {
424486
this._log.info('Subscribing to TaskRouter events ... ');
425487
for (let [eventName, eventHandler] of Object.entries(this.taskRouterEventHandler.getTREventsToHandlerMapping())) {
426-
this._signaling.on(eventName, this.taskRouterEventHandler[eventHandler]);
488+
if (this._useGraphQL) {
489+
this._gqlSignaling.on(eventName, this.taskRouterEventHandler[eventHandler]);
490+
} else {
491+
this._signaling.on(eventName, this.taskRouterEventHandler[eventHandler]);
492+
}
427493
}
428494
}
429495

@@ -434,7 +500,11 @@ class Worker extends EventEmitter {
434500
_unSubscribeFromTaskRouterEvents() {
435501
this._log.info('Unsubscribing from TaskRouter events ... ');
436502
for (let [eventName, eventHandler] of Object.entries(this.taskRouterEventHandler.getTREventsToHandlerMapping())) {
437-
this._signaling.removeListener(eventName, this.taskRouterEventHandler[eventHandler]);
503+
if (this._useGraphQL) {
504+
this._gqlSignaling.removeListener(eventName, this.taskRouterEventHandler[eventHandler]);
505+
} else {
506+
this._signaling.removeListener(eventName, this.taskRouterEventHandler[eventHandler]);
507+
}
438508
}
439509
}
440510

@@ -634,7 +704,11 @@ class Worker extends EventEmitter {
634704
* @returns {void}
635705
*/
636706
disconnect() {
637-
this._signaling.disconnect();
707+
if (this._useGraphQL) {
708+
this._gqlSignaling.disconnect();
709+
} else {
710+
this._signaling.disconnect();
711+
}
638712
}
639713

640714
/**
@@ -723,12 +797,14 @@ export default Worker;
723797
* ['error', 'warn', 'info', 'debug', 'trace', 'silent']
724798
* @property {string} [region] - the realm for connections (ex. "stage-us1")
725799
* @property {boolean} [enableVersionCheck=false] - To avoid accidentally overwriting objects with outdated data
800+
* @property {boolean} [useGraphQL=false] - Subscribe to worker events using GraphQL subscriptions
726801
*/
727802

728803
/**
729804
* @typedef {Object} WorkerDeps
730805
* @property {Request} Request
731806
* @property {EventBridgeSignaling} EventBridgeSignaling
807+
* @property {GraphQLSignaling} [GraphQLSignaling=undefined]
732808
*/
733809

734810
/**

0 commit comments

Comments
 (0)