Skip to content

Commit 714949b

Browse files
committed
Add support for nsq auth protocal.
1 parent 32a52d1 commit 714949b

File tree

6 files changed

+105
-17
lines changed

6 files changed

+105
-17
lines changed

HISTORY.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
History
44
-------
55

6-
0.1.5 (TBD)
6+
0.2.0 (TBD)
77
~~~~~~~~~~~~~~~~~~
88
* Warn on connection failure.
99
* Add extra requires for snappy.
10+
* Add support for nsq auth protocal.
1011

1112
0.1.4 (2014-07-24)
1213
~~~~~~~~~~~~~~~~~~

gnsq/errors.py

+15
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ class NSQMPubFailed(NSQErrorCode):
6969
"""E_MPUB_FAILED"""
7070

7171

72+
class NSQAuthDisabled(NSQErrorCode):
73+
"""E_AUTH_DISABLED"""
74+
75+
76+
class NSQAuthFailed(NSQErrorCode):
77+
"""E_AUTH_FAILED"""
78+
79+
80+
class NSQUnauthorized(NSQErrorCode):
81+
"""E_UNAUTHORIZED"""
82+
83+
7284
class NSQFinishFailed(NSQErrorCode):
7385
"""E_FIN_FAILED"""
7486
fatal = False
@@ -94,6 +106,9 @@ class NSQTouchFailed(NSQErrorCode):
94106
'E_PUB_FAILED': NSQPubFailed,
95107
'E_MPUB_FAILED': NSQMPubFailed,
96108
'E_FINISH_FAILED': NSQFinishFailed,
109+
'E_AUTH_DISABLED': NSQAuthDisabled,
110+
'E_AUTH_FAILED': NSQAuthFailed,
111+
'E_UNAUTHORIZED': NSQUnauthorized,
97112
'E_FIN_FAILED': NSQFinishFailed,
98113
'E_REQUEUE_FAILED': NSQRequeueFailed,
99114
'E_REQ_FAILED': NSQRequeueFailed,

gnsq/nsqd.py

+51-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class Nsqd(HTTPClient):
7171
less than 100 and the client will receive that percentage of the message
7272
traffic. (requires nsqd 0.2.25+)
7373
74+
:param auth_secret: a string passed when using nsq auth (requires
75+
nsqd 0.2.29+)
76+
7477
:param user_agent: a string identifying the agent for this client in the
7578
spirit of HTTP (default: ``<client_library_name>/<version>``) (requires
7679
nsqd 0.2.25+)
@@ -92,6 +95,7 @@ def __init__(
9295
deflate=False,
9396
deflate_level=6,
9497
sample_rate=0,
98+
auth_secret=None,
9599
user_agent=USERAGENT,
96100
):
97101
self.address = address
@@ -110,6 +114,7 @@ def __init__(
110114
self.deflate = deflate
111115
self.deflate_level = deflate_level
112116
self.sample_rate = sample_rate
117+
self.auth_secret = auth_secret
113118
self.user_agent = user_agent
114119

115120
self.state = INIT
@@ -173,6 +178,17 @@ def on_requeue(self):
173178
"""
174179
return blinker.Signal(doc='Emitted after the a message is requeued.')
175180

181+
@cached_property
182+
def on_auth(self):
183+
"""Emitted after the connection is successfully authenticated.
184+
185+
The signal sender is the connection and the parsed `response` is sent as
186+
arguments.
187+
"""
188+
return blinker.Signal(
189+
doc='Emitted after the connection is successfully authenticated.'
190+
)
191+
176192
@cached_property
177193
def on_close(self):
178194
"""Emitted after :meth:`close_stream`.
@@ -296,17 +312,28 @@ def listen(self):
296312
while self.is_connected:
297313
self.read_response()
298314

315+
def check_ok(self, expected='OK'):
316+
frame, data = self.read_response()
317+
if frame == nsq.FRAME_TYPE_ERROR:
318+
raise data
319+
320+
if frame != nsq.FRAME_TYPE_RESPONSE:
321+
raise errors.NSQException('expected response frame')
322+
323+
if data != expected:
324+
raise errors.NSQException('unexpected response %r' % data)
325+
299326
def upgrade_to_tls(self):
300327
self.stream.upgrade_to_tls(**self.tls_options)
301-
return self.read_response()
328+
self.check_ok()
302329

303330
def upgrade_to_snappy(self):
304331
self.stream.upgrade_to_snappy()
305-
return self.read_response()
332+
self.check_ok()
306333

307334
def upgrade_to_defalte(self):
308335
self.stream.upgrade_to_defalte(self.deflate_level)
309-
return self.read_response()
336+
self.check_ok()
310337

311338
def identify(self):
312339
"""Update client metadata on the server and negotiate features.
@@ -372,8 +399,29 @@ def identify(self):
372399
self.deflate_level = data.get('deflate_level', self.deflate_level)
373400
self.upgrade_to_defalte()
374401

402+
if self.auth_secret and data.get('auth_required'):
403+
self.auth()
404+
375405
return data
376406

407+
def auth(self):
408+
"""Send authorization secret to nsqd."""
409+
self.send(nsq.auth(self.auth_secret))
410+
frame, data = self.read_response()
411+
412+
if frame == nsq.FRAME_TYPE_ERROR:
413+
raise data
414+
415+
try:
416+
response = json.loads(data)
417+
except ValueError:
418+
self.close_stream()
419+
msg = 'failed to parse AUTH response JSON from nsqd: %r'
420+
raise errors.NSQException(msg % data)
421+
422+
self.on_auth.send(self, response=response)
423+
return response
424+
377425
def subscribe(self, topic, channel):
378426
"""Subscribe to a nsq `topic` and `channel`."""
379427
self.send(nsq.subscribe(topic, channel))

gnsq/protocol.py

+4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ def identify(data):
116116
return _command('IDENTIFY', json.dumps(data))
117117

118118

119+
def auth(secret):
120+
return _command('AUTH', secret)
121+
122+
119123
def subscribe(topic_name, channel_name):
120124
assert_valid_topic_name(topic_name)
121125
assert_valid_channel_name(channel_name)

gnsq/reader.py

+24
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,15 @@ def on_giving_up(self):
224224
"""
225225
return blinker.Signal(doc='Sent after a giving up on a message.')
226226

227+
@cached_property
228+
def on_auth(self):
229+
"""Emitted after a connection is successfully authenticated.
230+
231+
The signal sender is the reader and the `conn` and parsed `response` are
232+
sent as arguments.
233+
"""
234+
return blinker.Signal(doc='Emitted when a response is received.')
235+
227236
@cached_property
228237
def on_exception(self):
229238
"""Emitted when an exception is caught while handling a message.
@@ -536,6 +545,7 @@ def connect_to_nsqd(self, conn):
536545
conn.on_error.connect(self.handle_error)
537546
conn.on_finish.connect(self.handle_finish)
538547
conn.on_requeue.connect(self.handle_requeue)
548+
conn.on_auth.connect(self.handle_auth)
539549

540550
if self.max_concurrency:
541551
conn.on_message.connect(self.queue_message)
@@ -672,6 +682,20 @@ def handle_requeue(self, conn, message_id, timeout):
672682
self.handle_backoff()
673683
self.on_requeue.send(self, message_id=message_id, timeout=timeout)
674684

685+
def handle_auth(self, conn, response):
686+
metadata = []
687+
if response.get('identity'):
688+
metadata.append("Identity: %r" % response['identity'])
689+
690+
if response.get('permission_count'):
691+
metadata.append("Permissions: %d" % response['permission_count'])
692+
693+
if response.get('identity_url'):
694+
metadata.append(response['identity_url'])
695+
696+
self.logger.info('[%s] AUTH accepted %s' % (conn, ' '.join(metadata)))
697+
self.on_auth.send(self, conn=conn, response=response)
698+
675699
def handle_backoff(self):
676700
if self.state in (BACKOFF, CLOSED):
677701
return

tests/test_nsqd.py

+9-13
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def handle(socket, address):
324324

325325
conn.connect()
326326
resp = conn.auth()
327-
assert resp['identify'] == 'awesome'
327+
assert resp['identity'] == 'awesome'
328328

329329

330330
def test_identify_auth():
@@ -354,22 +354,18 @@ def handle(socket, address):
354354
auth_secret='secret'
355355
)
356356

357-
server.auth_was_called = False
358-
_auth = conn.auth
357+
@conn.on_auth.connect
358+
def assert_auth(conn, response):
359+
assert assert_auth.was_called is False
360+
assert_auth.was_called = True
361+
assert response['identity'] == 'awesome'
359362

360-
def auth():
361-
assert server.auth_was_called is False
362-
server.auth_was_called = True
363-
364-
resp = _auth()
365-
assert resp['identify'] == 'awesome'
366-
367-
conn.auth = auth
363+
assert_auth.was_called = False
368364
conn.connect()
369-
370365
resp = conn.identify()
366+
371367
assert resp['auth_required']
372-
assert server.auth_was_called
368+
assert assert_auth.was_called
373369

374370

375371
@pytest.mark.parametrize('tls,deflate,snappy', product((True, False), repeat=3))

0 commit comments

Comments
 (0)