@@ -17,26 +17,24 @@ def sig_handler(signum, frame):
17
17
server .stop ()
18
18
19
19
20
- class Connection (threading .Thread ):
21
- cmd_list = []
22
- CHUNK_SIZE = 1024
20
+ class Connection :
21
+ cmd_list = ['connect' , 'ping' , 'pingd' , 'quit' , 'finish' ]
23
22
24
- def __init__ (self , conn , addr ):
23
+ def __init__ (self , conn , addr , srv ):
25
24
self .conn = conn
26
25
self .addr = addr
26
+ self .srv = srv
27
27
self .stop = threading .Event ()
28
- threading .Thread .__init__ (self )
29
-
30
- def terminate (self ):
31
- self .stop .set ()
28
+ self .thread = threading .Thread (target = self .run )
29
+ self .thread .start ()
32
30
33
31
def run (self ):
34
32
while not self .stop .wait (0 ):
35
33
try :
36
34
data = self .recv ()
37
35
if data and len (data .strip ()):
38
36
cmd , cmd_args = self .parse_data (data )
39
- if cmd in self .cmd_list and callable (getattr (self , cmd )):
37
+ if cmd in self .cmd_list and callable (getattr (self , cmd , None )):
40
38
getattr (self , cmd )(cmd_args )
41
39
else :
42
40
self .send ('No such command' )
@@ -49,31 +47,59 @@ def run(self):
49
47
else :
50
48
logger .info ("IO error: {}, {}" .format (e .errno , e .strerror ))
51
49
break
50
+ self .stop .set ()
52
51
self .conn .close ()
53
52
53
+ def terminate (self ):
54
+ self .stop .set ()
55
+
54
56
def send (self , data ):
55
57
logger .debug ("sending: {}" .format (repr (data )))
56
58
snd_data = data .encode ('utf-8' )
59
+ snd_length = int (len (snd_data )).to_bytes (4 , 'big' )
60
+ self .conn .sendall (snd_length )
57
61
self .conn .sendall (snd_data )
58
62
59
63
def recv (self ):
60
- rcv_data = self .conn .recv (self .CHUNK_SIZE )
64
+ rcv_length = int .from_bytes (self .conn .recv (4 ), 'big' )
65
+ rcv_data = self .conn .recv (rcv_length )
61
66
if not rcv_data :
62
67
logger .info ("Connection closed by foreign host." )
63
- self .terminate ()
68
+ self .stop . set ()
64
69
return False
65
70
data = rcv_data .decode ('utf-8' )
66
- logger .debug ("received: %s " .format (repr (data )))
71
+ logger .debug ("received: {} " .format (repr (data )))
67
72
return data
68
73
69
74
def parse_data (self , data ):
70
- data_list = data .split ()
75
+ data_list = data .split (' \n ' , 1 )
71
76
return data_list [0 ], data_list [1 :]
72
77
78
+ def connect (self , data ):
79
+ self .srv .multicast_message ('connected\n {}' .format (
80
+ ':' .join (map (str , self .addr ))
81
+ ))
82
+
83
+ def ping (self , data ):
84
+ self .send ('pong' )
85
+
86
+ def pingd (self , data ):
87
+ self .send ('pongd\n {}' .format (' ' .join (data )))
88
+
89
+ def quit (self , data ):
90
+ if data :
91
+ self .srv .multicast_message ('ackquit\n {}' .format (data ))
92
+ self .stop .set ()
93
+
94
+ def finish (self , data ):
95
+ self .stop .set ()
96
+ self .srv .terminate ()
97
+
73
98
74
99
class Server :
75
100
running = False
76
101
connections = []
102
+ connections_lock = threading .RLock ()
77
103
78
104
def __init__ (self , host = '' , port = 39999 ):
79
105
logger .info ("Init server at {}:{}" .format (host , port ))
@@ -96,20 +122,35 @@ def run(self):
96
122
while self .running :
97
123
try :
98
124
conn , addr = self .socket .accept ()
99
- except ( socket .timeout , InterruptedError ) :
125
+ except socket .timeout :
100
126
pass
127
+ except (KeyboardInterrupt , InterruptedError ):
128
+ self .stop ()
101
129
else :
102
- th_connection = Connection (conn , addr )
103
- th_connection .start ()
104
- self .connections .append (th_connection )
130
+ client_connection = Connection (conn , addr , self )
131
+ with self .connections_lock :
132
+ self .connections .append (client_connection )
133
+ self .stop ()
134
+
135
+ def terminate (self ):
136
+ self .running = False
105
137
106
138
def stop (self ):
107
139
self .running = False
140
+ self .multicast_message ('ackfinish' )
141
+ # self.ward_thread.join()
142
+ with self .connections_lock :
143
+ for conn in self .connections :
144
+ conn .terminate ()
145
+ conn .thread .join ()
146
+ del conn
108
147
self .socket .close ()
109
- for conn in self .connections :
110
- conn .terminate ()
111
- conn .join ()
112
- del conn
148
+
149
+ def multicast_message (self , msg ):
150
+ with self .connections_lock :
151
+ for conn in self .connections :
152
+ if not conn .stop .wait (0 ):
153
+ conn .send (msg )
113
154
114
155
115
156
if __name__ == '__main__' :
0 commit comments