Skip to content

Commit 9be8293

Browse files
authored
Merge pull request #264 from DavidVujic/libuv-assert
Example code: use one client, use only when connected
2 parents cf28a26 + 04c651d commit 9be8293

8 files changed

+103
-72
lines changed

examples/addlistener.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const { isClientConnected } = require('./wrapper.js');
12
const notifier = require('./notifier.js');
23
const logger = require('./logger.js');
34

@@ -18,10 +19,13 @@ async function listen(client, path) {
1819
const watchFunc = watcher.bind(null, client, listen);
1920

2021
try {
22+
if (!isClientConnected()) {
23+
throw new Error('listen: client is not connected');
24+
}
2125
const children = await client.w_get_children(path, watchFunc);
2226
emit(client, path, children);
2327
} catch (error) {
24-
logger.error(error);
28+
logger.error('listen', error.message);
2529
}
2630
}
2731

examples/addtask.js

+3-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const { constants, createClient } = require('./wrapper.js');
1+
const { constants } = require('./wrapper.js');
22
const notifier = require('./notifier.js');
33
const { createNode, persistentNode } = require('./createnode.js');
44

@@ -8,15 +8,8 @@ async function createTask(client, data) {
88
notifier.emit('addTask', message);
99
}
1010

11-
async function addTask(data) {
12-
const client = createClient();
13-
14-
client.on('connect', () => {
15-
notifier.emit('connect', `addTask: session established, id=${client.client_id}`);
16-
17-
createTask(client, data);
18-
});
19-
client.init({});
11+
async function addTask(client, data) {
12+
createTask(client, data);
2013
}
2114

2215
module.exports = {

examples/createnode.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
const { isClientConnected } = require('./wrapper.js');
2+
13
const persistentNode = 0;
24

35
/**
@@ -9,10 +11,14 @@ const persistentNode = 0;
911
*/
1012
async function createNode(client, path, flags, data = '') {
1113
try {
14+
if (!isClientConnected()) {
15+
throw new Error('createNode: client is not connected');
16+
}
17+
1218
const createdPath = await client.create(path, data, flags);
1319
return `(created: ${createdPath})`;
1420
} catch (error) {
15-
return `${path} already exists`;
21+
return `${path} Error: ${error.message}`;
1622
}
1723
}
1824

examples/createworker.js

+8-12
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,27 @@
1-
const { constants, createClient } = require('./wrapper.js');
1+
const { constants, isClientConnected } = require('./wrapper.js');
22
const notifier = require('./notifier.js');
33
const logger = require('./logger.js');
44

55
function emit(client, path) {
66
logger.log(`(${path}) client id: ${client.client_id}`);
7-
notifier.emit('createWorker', client);
7+
notifier.emit('createWorker');
88
}
99

1010
async function createWorkerPath(client, path) {
1111
try {
12+
if (!isClientConnected()) {
13+
throw new Error('createWorkerPath: client is not connected');
14+
}
1215
// eslint-disable-next-line no-bitwise
1316
const createdPath = await client.create(path, '', constants.ZOO_EPHEMERAL | constants.ZOO_SEQUENCE);
1417
emit(client, createdPath);
1518
} catch (error) {
16-
logger.error(error);
19+
logger.error('createWorkerPath', error.message);
1720
}
1821
}
1922

20-
async function createWorker() {
21-
const client = createClient();
22-
23-
client.on('connect', () => {
24-
notifier.emit('connect', `createWorker: session established, id=${client.client_id}`);
25-
createWorkerPath(client, '/workers/worker-');
26-
});
27-
28-
client.init({});
23+
async function createWorker(client) {
24+
createWorkerPath(client, '/workers/worker-');
2925
}
3026

3127
module.exports = {

examples/electleader.js

+13-13
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
const { constants, createClient } = require('./wrapper.js');
1+
const { constants, isClientConnected } = require('./wrapper.js');
22
const notifier = require('./notifier.js');
33
const logger = require('./logger.js');
44

55
function emit(client, path) {
6-
logger.log(`(${path}) ${client.client_id}`);
7-
notifier.emit('leader', client);
6+
logger.log(`Elect leader: (${path}) ${client.client_id}`);
7+
notifier.emit('leader');
88
}
99

1010
function onData(client, path, rc, error, stat, data) {
@@ -27,33 +27,33 @@ async function checkMaster(client, path, retryFunc) {
2727
const watchFunc = watcher.bind(null, client, path, checkMaster, retryFunc);
2828

2929
try {
30+
if (!isClientConnected()) {
31+
throw new Error('is not connected');
32+
}
3033
const res = await client.w_get(path, watchFunc);
3134
onData(client, path, res.rc, res.error, res.stat, res.data);
3235
} catch (error) {
33-
logger.error(error);
36+
logger.error('checkMaster:', error.message);
3437
}
3538
}
3639

3740
async function runForLeader(client, path) {
3841
const clientId = client.client_id;
3942

4043
try {
44+
if (!isClientConnected()) {
45+
throw new Error('is not connected');
46+
}
4147
await client.create(path, `${clientId}`, constants.ZOO_EPHEMERAL);
4248
emit(client, path);
4349
} catch (error) {
50+
logger.error('runForLeader:', error.message);
4451
await checkMaster(client, path, runForLeader);
4552
}
4653
}
4754

48-
async function electLeader(path) {
49-
const client = createClient();
50-
51-
client.on('connect', () => {
52-
notifier.emit('connect', `electLeader: session established, id=${client.client_id}`);
53-
runForLeader(client, path);
54-
});
55-
56-
client.init({});
55+
async function electLeader(client, path) {
56+
runForLeader(client, path);
5757
}
5858

5959
module.exports = {

examples/index.js

+15-11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const { getClient } = require('./wrapper.js');
12
const { createNodes } = require('./setup.js');
23
const { electLeader } = require('./electleader.js');
34
const { createWorker } = require('./createworker.js');
@@ -10,7 +11,6 @@ const notifier = require('./notifier.js');
1011
notifier.on('connect', (message) => logger.log('connect', message));
1112
notifier.on('createNode', (message) => logger.log('createNode', message));
1213
notifier.on('addTask', (message) => logger.log('addTask', message));
13-
notifier.on('close', (message) => logger.log('close', message));
1414

1515
notifier.on('onChildren', (children) => {
1616
children.forEach((child) => {
@@ -19,21 +19,25 @@ notifier.on('onChildren', (children) => {
1919
});
2020

2121
async function init() {
22-
await createNodes(['/workers', '/assign', '/tasks', '/status']);
22+
const client = getClient();
2323

24-
notifier.on('leader', async (master) => {
25-
await listen(master, '/workers');
26-
await listen(master, '/assign');
24+
client.on('connect', async () => {
25+
await createNodes(client, ['/workers', '/assign', '/tasks', '/status']);
2726

28-
notifier.on('createWorker', async (worker) => {
29-
await listen(worker, '/tasks');
27+
notifier.on('leader', async () => {
28+
await listen(client, '/workers');
29+
await listen(client, '/assign');
30+
31+
notifier.on('createWorker', async () => {
32+
await listen(client, '/tasks');
33+
});
34+
35+
await createWorker(client);
36+
await addTask(client, 'hello world');
3037
});
3138

32-
await createWorker();
33-
await addTask('hello world');
39+
await electLeader(client, '/master');
3440
});
35-
36-
await electLeader('/master');
3741
}
3842

3943
init().catch(logger.error);

examples/setup.js

+2-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
const { createClient } = require('./wrapper.js');
21
const notifier = require('./notifier.js');
32
const { createNode, persistentNode } = require('./createnode.js');
43

@@ -14,20 +13,8 @@ async function createAllNodes(client, paths) {
1413
});
1514
}
1615

17-
async function createNodes(paths) {
18-
const client = createClient();
19-
20-
client.on('close', () => {
21-
notifier.emit('close', `session closed, id=${client.client_id}`);
22-
});
23-
24-
client.on('connect', () => {
25-
notifier.emit('connect', `session established, id=${client.client_id}`);
26-
27-
createAllNodes(client, paths);
28-
});
29-
30-
client.init({});
16+
async function createNodes(client, paths) {
17+
createAllNodes(client, paths);
3118
}
3219

3320
module.exports = {

examples/wrapper.js

+50-9
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,65 @@
11
const { constants, Promise: ZooKeeper } = require('../lib/index');
2+
const logger = require('./logger.js');
23

34
const host = process.argv[2] || '127.0.0.1:2181';
45

6+
let client;
7+
let isConnected = false;
8+
59
/**
610
* @param timeoutMs {number}
711
* @returns {ZooKeeper}
812
*/
913
function createClient(timeoutMs = 5000) {
10-
const config = {
11-
connect: host,
12-
timeout: timeoutMs,
13-
debug_level: constants.ZOO_LOG_LEVEL_WARN,
14-
host_order_deterministic: false,
15-
};
16-
17-
return new ZooKeeper(config);
14+
if (!client) {
15+
isConnected = false;
16+
logger.log('creating a client.');
17+
18+
const config = {
19+
connect: host,
20+
timeout: timeoutMs,
21+
debug_level: constants.ZOO_LOG_LEVEL_WARN,
22+
host_order_deterministic: false,
23+
};
24+
25+
client = new ZooKeeper(config);
26+
27+
client.on('close', () => {
28+
isConnected = false;
29+
logger.log('close', `session closed, id=${client.client_id}`);
30+
client = null;
31+
});
32+
33+
client.on('connecting', () => {
34+
isConnected = false;
35+
logger.log('connecting', `session connecting, id=${client.client_id}`);
36+
});
37+
38+
client.on('connect', () => {
39+
isConnected = true;
40+
logger.log('connect', `session connect, id=${client.client_id}`);
41+
});
42+
43+
setTimeout(() => {
44+
client.init({});
45+
}, 1000);
46+
}
47+
48+
return client;
49+
}
50+
51+
/** @returns {ZooKeeper} */
52+
function getClient() {
53+
return createClient();
54+
}
55+
56+
function isClientConnected() {
57+
return isConnected;
1858
}
1959

2060
module.exports = {
2161
constants,
22-
createClient,
2362
ZooKeeper,
63+
getClient,
64+
isClientConnected,
2465
};

0 commit comments

Comments
 (0)