-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_work.py
executable file
·275 lines (238 loc) · 9.26 KB
/
thread_work.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python
# -*- encoding=utf8 -*-
from threading import Thread, Timer, Lock
from queue import Queue, Full, Empty
from time import sleep
from socket import socket, AF_INET, SOCK_STREAM
from ssh2.session import Session
from ssh2.exceptions import *
from ftplib import FTP
from tkinter import messagebox
class ThreadWork:
"""Class to create and control a network related thread"""
def __init__(self, mode, host, port, name, password, enc, timeout=10, descr="ThreadWork", max_size=10, ui=None):
"""
ThreaWork constructor
:param mode: connection mode sftp/ftp
:type mode: str
:param host: remote host name/ip
:type host: str
:param port: port for remote host
:type port: int
:param name: user name for remote host
:type name: str
:param password: password for remote user
:type password: str
:param enc: encoding for remote user
:type enc: str
:param timeout: login timeout. defaults to 10 seconds
:type timeout: int
:param descr: the name for the current thread object
:type descr: str
:param max_size: maximum default size for the queue
:type max_size: int
:param ui: parent Tk object
:type ui: Tk"""
self.name = descr
self.quitting = False
self.end = False
self.lock = Lock()
self._mode = mode
self._host = host
self._port = port
self._name = name
self._passwd = password
self._enc = enc
self.q = Queue(max_size)
self._connection = None
self._timeout = None
self._timeout_seconds = timeout
self._running = False
self._abort = False
self.fileDescriptor = None
self.parent_ui = ui
self._thread = Thread(target=self._do_work, daemon=False)
self._thread.start()
def isConnected(self):
"""
Check if the connection is still alive
:return: bool
"""
return self._connection is not None
def check_idle(self, not_timeout=False):
"""check if the connection is no longer used and disconnect. Check every 30 seconds
:param not_timeout: check ff check_idle is triggered by the timer
:type not_timeout: bool
"""
if self._timeout is None and self._connection is not None:
self._timeout = Timer(30, self.check_idle)
self._timeout.start()
return
if (not_timeout or self._running) and self._connection is not None:
self._timeout.cancel()
del self._timeout
self._timeout = Timer(30, self.check_idle)
self._timeout.start()
return
if self._connection is not None or (not self._running and not not_timeout):
self.disconnect()
def add_task(self, func, args=None):
"""add a task for the current connection and create a connection if necessary
:param func: the function to be executed on this thread
:type func: any
:param args: arguments to be passed to the function object
:type args: list of arguments
"""
#: :param args:
if func and self._connection is None:
Thread(target=self._connect, daemon=True).start()
try:
if not func:
self.q.put((None, None), block=False)
else:
self.q.put((func, args), block=False)
except Full:
messagebox.showwarning("Queue is full", "The queue is full. Try again later.")
def _do_work(self):
"""check and wait for a connection and then execute the function with given parameters"""
while not self.quitting:
try:
func, data = self.q.get(block=False)
except Empty:
sleep(0.1)
continue
while self._connection is None and not self._abort and not self.quitting: # suspend thread until there is an connection
# print(f"{self.name} waiting for connection")
sleep(0.3)
continue
if self.quitting:
self.q.queue.clear()
try:
self.q.task_done()
except:
pass
break
if self._abort:
self.q.task_done()
continue
self._running = True
if data:
try:
func(self._connection, *data)
except SocketDisconnectError:
with self.lock:
print("disconnect error")
if not self.quitting:
messagebox.showerror("Connection Error", "Lost Connection.")
self.disconnect()
except Exception as e:
with self.lock:
print("Unexpected Error:", type(e), str(e))
if not self.quitting:
messagebox.showerror("Unexpected Error", "%s" % str(e) if str(e) else type(e))
self.disconnect()
finally:
if self.fileDescriptor:
self.fileDescriptor.close()
self.fileDescriptor = None
else:
try:
func(self._connection)
except Exception as e:
with self.lock:
print("exception ftp")
print(e)
if not self.quitting:
self.disconnect()
finally:
if self.fileDescriptor:
self.fileDescriptor.close()
self.fileDescriptor = None
self.q.task_done()
self._running = False
if not self.quitting:
self.check_idle(True)
self.end = True
def _connect(self):
"""create a connection for this thread"""
self._abort = False
size_bk = 0
if self.parent_ui:
size_bk += self.parent_ui.progress["maximum"]
self.parent_ui.progress.configure(mode="indeterminate", maximum=100)
self.parent_ui.progress.start()
if self._mode == "SFTP":
try:
sock = socket(AF_INET, SOCK_STREAM)
sock.settimeout(10)
sock.connect((self._host, self._port))
cli = Session()
cli.set_timeout(10000)
cli.handshake(sock)
cli.userauth_password(self._name, self._passwd)
cli.set_timeout(0)
self._connection = cli.sftp_init()
except Timeout:
self._abort = True
messagebox.showerror("Connection Error", "Connection timeout on login.")
except AuthenticationError as e:
self._abort = True
messagebox.showerror("Authentication Error", "Wrong login credentials.")
except Exception as e:
print(type(e), e.args, str(e))
self._abort = True
messagebox.showerror("Connection Error", "Could not establish a connection.\n%s" % e)
finally:
if self.parent_ui:
self.parent_ui.progress.stop()
self.parent_ui.progress.configure(value=0, mode="determinate", maximum=size_bk)
else: # FTP
try:
ftp = FTP()
ftp.encoding = self._enc
ftp.connect(self._host, self._port, 10)
ftp.login(self._name, self._passwd)
self._connection = ftp
except Exception as e:
self._abort = True
messagebox.showerror("Connection Error", str(e))
finally:
if self.parent_ui:
self.parent_ui.progress.stop()
self.parent_ui.progress.configure(value=0, mode="determinate", maximum=size_bk)
def disconnect(self, quit=False):
"""stop and clear this thread. disconnect when necessary."""
if not self.quitting:
self.quitting = quit
else:
return
if self._timeout:
self._timeout.cancel()
self._timeout = None
self.q.queue.clear()
if self._connection:
try:
if self._mode == "SFTP":
self._connection.session.disconnect()
else:
# don't wait for timeout, just close connection
self._connection.close()
except Exception as e:
print("quit execption")
print(e)
self._connection = None
if self.fileDescriptor:
self.fileDescriptor.close()
self.fileDescriptor = None
def singleShot(func, args=None):
"""single shot thread to execute a single task
:param func: function object to be executed
:type func: function
:param args: arguments to be passed to the given function
:type args: list of arguments
"""
if args is None:
args = []
t = Thread(target=func, args=args, daemon=False)
t.start()
__all__ = ['ThreadWork', 'singleShot']