forked from JustinTulloss/zeromq.node
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.html
170 lines (153 loc) · 7.27 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<!DOCTYPE html><html><head><title>node-zeromq</title><link rel="stylesheet" href="style.css"><script src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.3/jquery.min.js"></script><script src="highlight.js"></script><script src="menu.js"></script></head><body><h1>node-zeromq</h1><p><a href="http://www.zeromq.org/">ZeroMQ </a>bindings for Node.js
</p><div class="comment"><h2>types</h2><div class="description"><p>Map of socket types.</p></div><a href="#" class="view-source">View source</a><pre><code>var types = exports.types = {
pub: zmq.ZMQ_PUB
, sub: zmq.ZMQ_SUB
, req: zmq.ZMQ_REQ
, xreq: zmq.ZMQ_XREQ
, rep: zmq.ZMQ_REP
, xrep: zmq.ZMQ_XREP
, push: zmq.ZMQ_PUSH
, pull: zmq.ZMQ_PULL
, dealer: zmq.ZMQ_DEALER
, router: zmq.ZMQ_ROUTER
, pair: zmq.ZMQ_PAIR
};</code></pre></div><div class="comment"><h2>opts</h2><div class="description"><p>Map of socket options.</p></div><a href="#" class="view-source">View source</a><pre><code>var opts = exports.options = {
_fd: zmq.ZMQ_FD
, _ioevents: zmq.ZMQ_EVENTS
, _receiveMore: zmq.ZMQ_RCVMORE
, _subscribe: zmq.ZMQ_SUBSCRIBE
, _unsubscribe: zmq.ZMQ_UNSUBSCRIBE
, affinity: zmq.ZMQ_AFFINITY
, backlog: zmq.ZMQ_BACKLOG
, hwm: zmq.ZMQ_HWM
, identity: zmq.ZMQ_IDENTITY
, linger: zmq.ZMQ_LINGER
, mcast_loop: zmq.ZMQ_MCAST_LOOP
, rate: zmq.ZMQ_RATE
, rcvbuf: zmq.ZMQ_RCVBUF
, reconnect_ivl: zmq.ZMQ_RECONNECT_IVL
, recovery_ivl: zmq.ZMQ_RECOVERY_IVL
, sndbuf: zmq.ZMQ_SNDBUF
, swap: zmq.ZMQ_SWAP
};
// Context management happens here. We lazily initialize a default context,
// and use that everywhere. Also cleans up on exit.
var ctx;
function defaultContext() {
if (ctx) return ctx;
var io_threads = 1;
if (process.env.ZMQ_IO_THREADS) {
io_threads = parseInt(process.env.ZMQ_IO_THREADS, 10);
if (!io_threads || io_threads < 1) {
console.warn('Invalid number in ZMQ_IO_THREADS, using 1 IO thread.');
io_threads = 1;
}
}
ctx = new zmq.Context(io_threads);
process.on('exit', function() {
// ctx.close();
ctx = null;
});
return ctx;
};</code></pre></div><div class="comment"><h2>Socket()</h2><div class="description"><p>Create a new socket of the given <code>type</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>function Socket(type) {
this.type = type;
this._zmq = new zmq.Socket(defaultContext(), types[type]);
this._outgoing = [];
this._watcher = new IOWatcher;
this._watcher.callback = this._flush.bind(this);
this._watcher.set(this._fd, true, false);
this._watcher.start();
};</code></pre></div><div class="comment"><h2>Socket.prototype.setsockopt()</h2><div class="description"><p>Set <code>opt</code> to <code>val</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.setsockopt = function(opt, val){
this._zmq.setsockopt(opts[opt] || opt, val);
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.getsockopt()</h2><div class="description"><p>Get socket <code>opt</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.getsockopt = function(opt){
return this._zmq.getsockopt(opts[opt] || opt);
};</code></pre></div><div class="comment"><h2>Socket.prototype.bind()</h2><div class="description"><p>Async bind.</p>
<p>Emits the "bind" event.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.bind = function(addr, cb) {
var self = this;
self._watcher.stop();
self._zmq.bind(addr, function(err) {
self._watcher.start();
self.emit('bind');
cb && cb(err);
});
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.bindSync()</h2><div class="description"><p>Sync bind.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.bindSync = function(addr) {
this._watcher.stop();
try {
this._zmq.bindSync(addr);
} catch (e) {
this._watcher.start();
throw e;
}
this._watcher.start();
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.connect()</h2><div class="description"><p>Connect to <code>addr</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.connect = function(addr) {
this._zmq.connect(addr);
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.subscribe()</h2><div class="description"><p>Subscribe with the given <code>filter</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.subscribe = function(filter) {
this._subscribe = filter;
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.unsubscribe()</h2><div class="description"><p>Unsubscribe with the given <code>filter</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.unsubscribe = function(filter) {
this._unsubscribe = filter;
return this;
};</code></pre></div><div class="comment"><h2>Socket.prototype.send()</h2><div class="description"><p>Send the given <code>msg</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.send = function(msg, flags) {
// allow strings etc
if (!Buffer.isBuffer(msg)) {
msg = new Buffer(String(msg), 'utf8');
}
this._outgoing.push([msg, flags || 0]);
this._flush();
return this;
};
// The workhorse that does actual send and receive operations.
// This helper is called from `send` above, and in response to
// the watcher noticing the signaller fd is readable.
Socket.prototype._flush = function() {
var args;
// Don't allow recursive flush invocation as it can lead to stack
// exhaustion and write starvation
if (this._flushing) return;
this._flushing = true;
try {
while (true) {
var emitArgs
, flags = this._ioevents;
if (!this._outgoing.length) {
flags &= ~zmq.ZMQ_POLLOUT;
}
if (!flags) break;
if (flags & zmq.ZMQ_POLLIN) {
emitArgs = ['message'];
do {
emitArgs.push(new Buffer(this._zmq.recv()));
} while (this._receiveMore);
this.emit.apply(this, emitArgs);
if (this._zmq.state != zmq.STATE_READY) {
this._flushing = false;
return;
}
}
// We send as much as possible in one burst so that we don't
// starve sends if we receive more than one message for each
// one sent.
while (flags & zmq.ZMQ_POLLOUT && this._outgoing.length) {
args = this._outgoing.shift();
this._zmq.send(args[0], args[1]);
flags = this._ioevents;
}
}
} catch (e) {
this.emit('error', e);
}
this._flushing = false;
};</code></pre></div><div class="comment"><h2>Socket.prototype.close()</h2><div class="description"><p>Close the socket.</p></div><a href="#" class="view-source">View source</a><pre><code>Socket.prototype.close = function() {
this._watcher.stop();
this._watcher = null;
this._zmq.close();
return this;
};</code></pre></div><div class="comment"><h2>exports.socket()</h2><div class="description"><p>Create a <code>type</code> socket with the given <code>options</code>.</p></div><a href="#" class="view-source">View source</a><pre><code>exports.socket = function(type, options) {
var sock = new Socket(type);
for (var key in options) sock[key] = options[key];
return sock;
};</code></pre></div></body></html>