You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
console.warn('Invalid number in ZMQ_IO_THREADS, using 1 IO thread.');
45
+
io_threads = 1;
46
+
}
47
+
}
48
+
49
+
ctx = newzmq.Context(io_threads);
50
+
process.on('exit', function() {
51
+
// ctx.close();
52
+
ctx = null;
53
+
});
54
+
55
+
returnctx;
56
+
};</code></pre></div><divclass="comment"><h2>Socket()</h2><divclass="description"><p>Create a new socket of the given <code>type</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>function Socket(type) {
57
+
this.type = type;
58
+
this._zmq = new zmq.Socket(defaultContext(), types[type]);
59
+
this._outgoing = [];
60
+
this._watcher = new IOWatcher;
61
+
this._watcher.callback = this._flush.bind(this);
62
+
this._watcher.set(this._fd, true, false);
63
+
this._watcher.start();
64
+
};</code></pre></div><divclass="comment"><h2>Socket.prototype.setsockopt()</h2><divclass="description"><p>Set <code>opt</code> to <code>val</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.setsockopt = function(opt, val){
};</code></pre></div><divclass="comment"><h2>Socket.prototype.connect()</h2><divclass="description"><p>Connect to <code>addr</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.connect = function(addr) {
91
+
this._zmq.connect(addr);
92
+
return this;
93
+
};</code></pre></div><divclass="comment"><h2>Socket.prototype.subscribe()</h2><divclass="description"><p>Subscribe with the given <code>filter</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.subscribe = function(filter) {
94
+
this._subscribe = filter;
95
+
return this;
96
+
};</code></pre></div><divclass="comment"><h2>Socket.prototype.unsubscribe()</h2><divclass="description"><p>Unsubscribe with the given <code>filter</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.unsubscribe = function(filter) {
97
+
this._unsubscribe = filter;
98
+
return this;
99
+
};</code></pre></div><divclass="comment"><h2>Socket.prototype.send()</h2><divclass="description"><p>Send the given <code>msg</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.send = function(msg, flags) {
100
+
// allow strings etc
101
+
if (!Buffer.isBuffer(msg)) {
102
+
msg = new Buffer(String(msg), 'utf8');
103
+
}
104
+
105
+
this._outgoing.push([msg, flags || 0]);
106
+
this._flush();
107
+
108
+
return this;
109
+
};
110
+
111
+
// The workhorse that does actual send and receive operations.
112
+
// This helper is called from `send` above, and in response to
113
+
// the watcher noticing the signaller fd is readable.
114
+
Socket.prototype._flush = function() {
115
+
var args;
116
+
117
+
// Don't allow recursive flush invocation as it can lead to stack
118
+
// exhaustion and write starvation
119
+
if (this._flushing) return;
120
+
121
+
this._flushing = true;
122
+
try {
123
+
while (true) {
124
+
var emitArgs
125
+
, flags = this._ioevents;
126
+
127
+
if (!this._outgoing.length) {
128
+
flags &= ~zmq.ZMQ_POLLOUT;
129
+
}
130
+
131
+
if (!flags) break;
132
+
133
+
if (flags & zmq.ZMQ_POLLIN) {
134
+
emitArgs = ['message'];
135
+
136
+
do {
137
+
emitArgs.push(new Buffer(this._zmq.recv()));
138
+
} while (this._receiveMore);
139
+
140
+
this.emit.apply(this, emitArgs);
141
+
if (this._zmq.state != zmq.STATE_READY) {
142
+
this._flushing = false;
143
+
return;
144
+
}
145
+
}
146
+
147
+
// We send as much as possible in one burst so that we don't
148
+
// starve sends if we receive more than one message for each
149
+
// one sent.
150
+
while (flags & zmq.ZMQ_POLLOUT && this._outgoing.length) {
151
+
args = this._outgoing.shift();
152
+
this._zmq.send(args[0], args[1]);
153
+
flags = this._ioevents;
154
+
}
155
+
}
156
+
} catch (e) {
157
+
this.emit('error', e);
158
+
}
159
+
160
+
this._flushing = false;
161
+
};</code></pre></div><divclass="comment"><h2>Socket.prototype.close()</h2><divclass="description"><p>Close the socket.</p></div><ahref="#" class="view-source">View source</a><pre><code>Socket.prototype.close = function() {
162
+
this._watcher.stop();
163
+
this._watcher = null;
164
+
this._zmq.close();
165
+
return this;
166
+
};</code></pre></div><divclass="comment"><h2>exports.socket()</h2><divclass="description"><p>Create a <code>type</code> socket with the given <code>options</code>.</p></div><ahref="#" class="view-source">View source</a><pre><code>exports.socket = function(type, options) {
167
+
var sock = new Socket(type);
168
+
for (var key in options) sock[key] = options[key];
0 commit comments