-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroker.py
91 lines (75 loc) · 3.11 KB
/
broker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from socket import *
from threading import Thread
import traceback
import os,sys
import shlex
MAX_BUF = 2048
SERV_PORT = 50000
topics = {}
def shutdown_socket(): #close socket in topics
try:
for sockets in topics.values():
for s in sockets:
s.send('QUIT'.encode('utf-8')) #send 'QUIT' message to all subscriber to shutdown.
except:
pass
def delete_socket(conn_sock): #remove socket in topics
for sockets in topics.values():
for s in sockets:
if(s == conn_sock):
del s
def add_socket(conn_sock, topic): #add socket to topics
if(topic in topics):
topics[topic].append(conn_sock) #append socket to existing topic
else:
topics[topic] = [conn_sock] #add new topic
def handle_client(conn_sock, cli_sock_addr): #handle client socket
while True:
try:
txtin = conn_sock.recv(2048).decode('utf-8') #receive text for client and decode
message = shlex.split(txtin) #split text with shell lexical split
if(message[0] == 'subscribe'): #if this socket is subscribe
if(message[1] == 'quit'): #if subscribe is quit
print('%s:%s disconneted' %(cli_sock_addr[0],cli_sock_addr[1]))
delete_socket(conn_sock) #delete socket in topics
break
else:
print('%s:%s subscribe for topic %s' %(cli_sock_addr[0],cli_sock_addr[1],message[2]))
add_socket(conn_sock,message[2]) #add socket to topic
elif(message[0] == 'publish'):
if(message[2] in topics.keys()):
for s in topics[message[2]]:
s.send(message[3].encode('utf-8')) #publish value to subscriber in that topic
print('%s:%s disconnected' %(cli_sock_addr[0],cli_sock_addr[1]))
conn_sock.close() #close socket
break
except BlockingIOError:
pass
def main():
serv_sock_addr = ('127.0.0.1', SERV_PORT) #broker address
welcome_sock.bind(serv_sock_addr) #bind welcome socket with address
welcome_sock.listen(5) #enable to 5 connections for max
print ('Broker started at %s' %serv_sock_addr[0])
while True:
conn_sock, cli_sock_addr = welcome_sock.accept() #accept connection socket for new connection
ip, port = str(cli_sock_addr[0]), str(cli_sock_addr[1])
print ('New client connected from ' + ip + ':' + port)
try:
Thread(target=handle_client, args=(conn_sock, cli_sock_addr)).start() #start thread server
except:
print("Cannot start thread..")
trackback.print_exc()
if __name__ == '__main__':
try:
welcome_sock = socket(AF_INET, SOCK_STREAM) #open welcome socket
main()
except KeyboardInterrupt:
shutdown_socket() #close all socket after shutdown broker
print('')
print('Shut down socket')
welcome_sock.close() #close welcome socket
print('Broker shutdown')
try:
sys.exit(0)
except SystemExit:
os._exit(0)