Skip to content

Commit d44f0f4

Browse files
committed
Merge branch 'release/0.1.2'
2 parents a16ecdc + ab9d894 commit d44f0f4

File tree

6 files changed

+61
-11
lines changed

6 files changed

+61
-11
lines changed

HISTORY.rst

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
History
44
-------
55

6+
0.1.2 (2014-07-08)
7+
~~~~~~~~~~~~~~~~~~
8+
9+
* Flush delfate buffer for each message.
10+
611
0.1.1 (2014-07-07)
712
~~~~~~~~~~~~~~~~~~
813

gnsq/reader.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -262,17 +262,16 @@ def close(self):
262262
if not self.is_running:
263263
return
264264

265-
self.logger.debug('closing')
266265
self.state = CLOSED
267266

267+
self.logger.debug('closing %d worker(s)' % len(self.workers))
268268
for worker in self.workers:
269269
worker.kill(block=False)
270270

271+
self.logger.debug('closing %d connection(s)' % len(self.conns))
271272
for conn in self.conns:
272273
conn.close_stream()
273274

274-
self.logger.debug('workers: %r' % self.workers)
275-
276275
def join(self, timeout=None, raise_error=False):
277276
"""Block until all connections have closed and workers stopped."""
278277
gevent.joinall(self.workers, timeout, raise_error)
@@ -548,8 +547,6 @@ def _listen(self, conn):
548547
try:
549548
conn.listen()
550549
except NSQException as error:
551-
if self.state == CLOSED:
552-
return
553550
self.logger.warning('[%s] connection lost (%r)' % (conn, error))
554551

555552
self.handle_connection_failure(conn)
@@ -576,6 +573,9 @@ def handle_connection_failure(self, conn):
576573
self.conn_workers.pop(conn, None)
577574
conn.close_stream()
578575

576+
if self.state == CLOSED:
577+
return
578+
579579
if conn.ready_count:
580580
self.need_ready_redistributed = True
581581

gnsq/stream/defalte.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,12 @@ def __init__(self, socket, level):
1212
super(DefalteSocket, self).__init__(socket)
1313

1414
def compress(self, data):
15-
return self._compressor.compress(data)
15+
data = self._compressor.compress(data)
16+
return data + self._compressor.flush(zlib.Z_SYNC_FLUSH)
1617

1718
def decompress(self, data):
1819
return self._decompressor.decompress(data)
20+
21+
def close(self):
22+
self._socket.write(self._compressor.flush(zlib.Z_FINISH))
23+
self._socket.close()

gnsq/version.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# -*- coding: utf-8 -*-
22
# also update in setup.py
3-
__version__ = '0.1.1'
3+
__version__ = '0.1.2'

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
setup(
1515
name='gnsq',
16-
version='0.1.1',
16+
version='0.1.2',
1717
description='A gevent based NSQ driver for Python.',
1818
long_description=readme + '\n\n' + history,
1919
author='Trevor Olson',

tests/test_nsqd.py

+43-3
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,17 @@ def test_tls():
327327
assert frame == nsq.FRAME_TYPE_RESPONSE
328328
assert data == 'OK'
329329

330-
conn.close()
330+
conn.subscribe('topic', 'channel')
331+
frame, data = conn.read_response()
332+
assert frame == nsq.FRAME_TYPE_RESPONSE
333+
assert data == 'OK'
334+
335+
conn.ready(1)
336+
frame, data = conn.read_response()
337+
assert frame == nsq.FRAME_TYPE_MESSAGE
338+
assert data.body == 'sup'
339+
340+
conn.close_stream()
331341

332342

333343
@pytest.mark.slow
@@ -350,7 +360,22 @@ def test_deflate():
350360
assert frame == nsq.FRAME_TYPE_RESPONSE
351361
assert data == 'OK'
352362

353-
conn.close()
363+
conn.publish('topic', 'sup')
364+
frame, data = conn.read_response()
365+
assert frame == nsq.FRAME_TYPE_RESPONSE
366+
assert data == 'OK'
367+
368+
conn.subscribe('topic', 'channel')
369+
frame, data = conn.read_response()
370+
assert frame == nsq.FRAME_TYPE_RESPONSE
371+
assert data == 'OK'
372+
373+
conn.ready(1)
374+
frame, data = conn.read_response()
375+
assert frame == nsq.FRAME_TYPE_MESSAGE
376+
assert data.body == 'sup'
377+
378+
conn.close_stream()
354379

355380

356381
@pytest.mark.slow
@@ -373,7 +398,22 @@ def test_snappy():
373398
assert frame == nsq.FRAME_TYPE_RESPONSE
374399
assert data == 'OK'
375400

376-
conn.close()
401+
conn.publish('topic', 'sup')
402+
frame, data = conn.read_response()
403+
assert frame == nsq.FRAME_TYPE_RESPONSE
404+
assert data == 'OK'
405+
406+
conn.subscribe('topic', 'channel')
407+
frame, data = conn.read_response()
408+
assert frame == nsq.FRAME_TYPE_RESPONSE
409+
assert data == 'OK'
410+
411+
conn.ready(1)
412+
frame, data = conn.read_response()
413+
assert frame == nsq.FRAME_TYPE_MESSAGE
414+
assert data.body == 'sup'
415+
416+
conn.close_stream()
377417

378418

379419
@pytest.mark.slow

0 commit comments

Comments
 (0)