diff --git a/README.md b/README.md index e878f2ed..0dcfa455 100644 --- a/README.md +++ b/README.md @@ -77,60 +77,31 @@ print(polo.returnTradeHistory('BTC_ETH')) You can also not use the 'helper' methods at all and use `poloniex.PoloniexBase` which only has `returnMarketHist` and `__call__` to make rest api calls. #### Websocket Usage: -To connect to the websocket api just create a child class of `PoloniexSocketed` like so: +To connect to the websocket api use the `PoloniexSocketed` class like so: ```python import poloniex import logging +from time import sleep -logging.basicConfig() - -class MySocket(poloniex.PoloniexSocketed): - - def on_heartbeat(self, msg): - """ - Triggers whenever we get a heartbeat message - """ - print(msg) - - def on_volume(self, msg): - """ - Triggers whenever we get a 24hvolume message - """ - print(msg) - - def on_ticker(self, msg): - """ - Triggers whenever we get a ticker message - """ - print(msg) - - def on_market(self, msg): - """ - Triggers whenever we get a market ('currencyPair') message - """ - print(msg) - - def on_account(self, msg): - """ - Triggers whenever we get an account message - """ - print(msg) - -sock = MySocket() # helps show what is going on -sock.logger.setLevel(logging.DEBUG) -# start the websocket thread and subscribe to '24hvolume' -sock.startws(subscribe=['24hvolume']) +logging.basicConfig() +poloniex.logger.setLevel(logging.DEBUG) + +def on_volume(data): + print(data) +# make instance +sock = poloniex.PoloniexSocketed() +# start the websocket thread and subscribe to '24hvolume' setting the callback to 'on_volume' +sock.startws(subscribe={'24hvolume': on_volume}) # give the socket some time to init -poloniex.sleep(5) -# this won't work: -#sock.subscribe('ticker') -# use channel id to un/sub -sock.subscribe('1002') -poloniex.sleep(1) -# unsub from ticker -sock.unsubscribe('1002') -poloniex.sleep(4) +sleep(5) +# use the channel name str or id int to subscribe/unsubscribe +sock.subscribe(chan='ticker', callback=print) +sleep(1) +# unsub from ticker using id (str name can be use as well) +sock.unsubscribe(1002) +sleep(4) +# stop websocket sock.stopws() ``` @@ -152,5 +123,12 @@ DEBUG:poloniex:Unsubscribed to ticker DEBUG:poloniex:Websocket Closed INFO:poloniex:Websocket thread stopped/joined ``` +You can also subscribe and start the websocket thread when creating an instance of `PoloniexSocketed` by using the `subscribe` and `start` args: +```python + +sock = poloniex.PoloniexSocketed(subscribe={'24hvolume': print}, start=True) + +``` + **More examples of how to use websocket push API can be found [here](https://github.com/s4w3d0ff/python-poloniex/tree/master/examples).** diff --git a/examples/websocket/dictTicker.py b/examples/websocket/dictTicker.py index 7deb98cd..6499d565 100644 --- a/examples/websocket/dictTicker.py +++ b/examples/websocket/dictTicker.py @@ -45,7 +45,7 @@ def on_ticker(self, data): polo = TickPolo() poloniex.logging.basicConfig() polo.logger.setLevel(poloniex.logging.DEBUG) - polo.startws(['ticker']) + polo.startws({'ticker': polo.on_ticker}) for i in range(3): pprint(polo.ticker('BTC_LTC')) poloniex.sleep(10) diff --git a/examples/websocket/stopLimit.py b/examples/websocket/stopLimit.py index 57fc3c36..505472ad 100644 --- a/examples/websocket/stopLimit.py +++ b/examples/websocket/stopLimit.py @@ -9,7 +9,7 @@ def __init__(self, *args, **kwargs): def on_ticker(self, msg): data = [float(dat) for dat in msg] # check stop orders - mkt = self.channels[str(int(data[0]))]['name'] + mkt = self._getChannelName(str(int(data[0]))) la = data[2] hb = data[3] for id in self.stopOrders: @@ -98,6 +98,6 @@ def callbk(id): callback=callbk, # remove or set 'test' to false to place real orders test=True) - test.startws(['ticker']) + test.startws({'ticker': test.on_ticker}) poloniex.sleep(120) test.stopws(3) diff --git a/poloniex/__init__.py b/poloniex/__init__.py index fbf1051a..116f601f 100644 --- a/poloniex/__init__.py +++ b/poloniex/__init__.py @@ -683,14 +683,8 @@ def toggleAutoRenew(self, orderNumber): class PoloniexSocketed(Poloniex): """ Child class of Poloniex with support for the websocket api """ - def __init__(self, *args, **kwargs): - subscribe = False - start = False - if 'subscribe' in kwargs: - subscribe = kwargs.pop('subscribe') - if 'startws' in kwargs: - start = kwargs.pop('startws') - super(PoloniexSocketed, self).__init__(*args, **kwargs) + def __init__(self, key=None, secret=None, subscribe={}, start=False, *args, **kwargs): + super(PoloniexSocketed, self).__init__(key, secret, *args, **kwargs) self.socket = WebSocketApp(url="wss://api2.poloniex.com/", on_open=self.on_open, on_message=self.on_message, @@ -699,61 +693,30 @@ def __init__(self, *args, **kwargs): self._t = None self._running = False self.channels = { - '1000': {'name': 'account', - 'sub': False, - 'callback': self.on_account}, - '1002': {'name': 'ticker', - 'sub': False, - 'callback': self.on_ticker}, - '1003': {'name': '24hvolume', - 'sub': False, - 'callback': self.on_volume}, - '1010': {'name': 'heartbeat', - 'sub': False, - 'callback': self.on_heartbeat}, + 'account': {'id': '1000'}, + 'ticker': {'id': '1002'}, + '24hvolume': {'id': '1003'}, + 'heartbeat': {'id': '1010', + 'callback': self.on_heartbeat}, } # add each market to channels list by id - # (wish there was a cleaner way of doing this...) tick = self.returnTicker() for market in tick: - self.channels[str(tick[market]['id'])] = { - 'name': market, - 'sub': False, - 'callback': self.on_market - } + self.channels[market] = {'id': str(tick[market]['id'])} + # handle init subscribes if subscribe: - for chan in self.channels: - if self.channels[chan]['name'] in subscribe or chan in subscribe: - self.channels[chan]['sub'] = True + self.setSubscribes(**subscribe) if start: self.startws() - - def _handle_sub(self, message): - """ Handles websocket un/subscribe messages """ - chan = str(message[0]) - # skip heartbeats - if not chan == '1010': - # Subscribed - if message[1] == 1: - # update self.channels[chan]['sub'] flag - self.channels[chan]['sub'] = True - self.logger.debug('Subscribed to %s', self.channels[chan]['name']) - # return False so no callback trigger - return False - # Unsubscribed - if message[1] == 0: - # update self.channels[chan]['sub'] flag - self.channels[chan]['sub'] = False - self.logger.debug('Unsubscribed to %s', self.channels[chan]['name']) - # return False so no callback trigger - return False - # return chan name - return chan + def _getChannelName(self, id): + return next( + (chan for chan in self.channels if self.channels[chan]['id'] == id), + False) def on_open(self, *ws): for chan in self.channels: - if self.channels[chan]['sub']: + if 'sub' in self.channels[chan] and self.channels[chan]['sub']: self.subscribe(chan) def on_message(self, data): @@ -766,12 +729,23 @@ def on_message(self, data): # catch errors if 'error' in message: return self.logger.error(message['error']) + chan = self._getChannelName(str(message[0])) # handle sub/unsub - chan = self._handle_sub(message) - if chan: + # skip heartbeats + if not chan == 'heartbeat': + # Subscribed + if message[1] == 1: + self.logger.debug('Subscribed to %s', chan) + # return False so no callback trigger + return False + # Unsubscribed + if message[1] == 0: + self.logger.debug('Unsubscribed to %s', chan) + # return False so no callback trigger + return False + if 'callback' in self.channels[chan]: # activate chan callback - # heartbeats are funky - if not chan == '1010': + if not chan in ['account', 'heartbeat']: message = message[2] self.socket._callback(self.channels[chan]['callback'], message) @@ -779,32 +753,39 @@ def on_error(self, error): self.logger.error(error) def on_close(self, *args): - self.logger.debug('Websocket Closed') + self.logger.info('Websocket Closed') - def on_ticker(self, args): - self.logger.debug(args) - - def on_account(self, args): - self.logger.debug(args) - - def on_market(self, args): + def on_heartbeat(self, args): self.logger.debug(args) - def on_volume(self, args): - self.logger.debug(args) + def setCallback(self, chan, callback): + """ Sets the callback function for """ + if isinstance(chan, int): + chan = self._getChannelName(str(chan)) + self.channels[chan]['callback'] = callback - def on_heartbeat(self, args): - self.logger.debug(args) + def setSubscribes(self, **subs): + for sub in subs: + if not sub in self.channels: + self.logger.warning('Invalid channel: %s', sub) + else: + self.channels[sub]['sub'] = True + self.channels[sub]['callback'] = subs[sub] - def subscribe(self, chan): + def subscribe(self, chan, callback=None): """ Sends the 'subscribe' command for """ + if isinstance(chan, int): + chan = self._getChannelName(str(chan)) # account chan? - if chan in ['1000', 1000]: + if chan == 'account': # sending commands to 'account' requires a key, secret and nonce if not self.key or not self.secret: raise PoloniexError( "self.key and self.secret needed for 'account' channel" ) + self.channels[chan]['sub'] = True + if callback: + self.channels[chan]['callback'] = callback payload = {'nonce': self.nonce} payload_encoded = _urlencode(payload) sign = _new( @@ -814,22 +795,29 @@ def subscribe(self, chan): self.socket.send(_dumps({ 'command': 'subscribe', - 'channel': chan, + 'channel': self.channels[chan]['id'], 'sign': sign.hexdigest(), 'key': self.key, 'payload': payload_encoded})) else: - self.socket.send(_dumps({'command': 'subscribe', 'channel': chan})) + self.channels[chan]['sub'] = True + if callback: + self.channels[chan]['callback'] = callback + self.socket.send(_dumps({'command': 'subscribe', + 'channel': self.channels[chan]['id']})) def unsubscribe(self, chan): """ Sends the 'unsubscribe' command for """ + if isinstance(chan, int): + chan = self._getChannelName(str(chan)) # account chan? - if chan in ['1000', 1000]: + if chan == 'account': # sending commands to 'account' requires a key, secret and nonce if not self.key or not self.secret: raise PoloniexError( "self.key and self.secret needed for 'account' channel" ) + self.channels[chan]['sub'] = False payload = {'nonce': self.nonce} payload_encoded = _urlencode(payload) sign = _new( @@ -839,39 +827,35 @@ def unsubscribe(self, chan): self.socket.send(_dumps({ 'command': 'unsubscribe', - 'channel': chan, + 'channel': self.channels[chan]['id'], 'sign': sign.hexdigest(), 'key': self.key, 'payload': payload_encoded})) else: - self.socket.send(_dumps({'command': 'unsubscribe', 'channel': chan})) + self.channels[chan]['sub'] = False + self.socket.send(_dumps({'command': 'unsubscribe', + 'channel': self.channels[chan]['id']})) - def setCallback(self, chan, callback): - """ Sets the callback function for """ - self.channels[chan]['callback'] = callback - - def startws(self, subscribe=[]): + def startws(self, subscribe=False): """ Run the websocket in a thread, use 'subscribe' arg to subscribe - to a channel on start + to channels on start """ self._t = Thread(target=self.socket.run_forever) self._t.daemon = True self._running = True # set subscribes - for chan in self.channels: - if self.channels[chan]['name'] in subscribe or chan in subscribe: - self.channels[chan]['sub'] = True + if subscribe: + self.setSubscribes(**subscribe) self._t.start() self.logger.info('Websocket thread started') - - def stopws(self, wait=0): + def stopws(self, wait=1): """ Stop/join the websocket thread """ self._running = False # unsubscribe from subs for chan in self.channels: - if self.channels[chan]['sub'] == True: + if 'sub' in self.channels[chan] and self.channels[chan]['sub']: self.unsubscribe(chan) sleep(wait) try: