35
35
import serial
36
36
import threading
37
37
import logging
38
+ from typing import Optional , Tuple , Union
38
39
39
40
# Third-party imports
40
41
import numpy as np
@@ -69,8 +70,22 @@ class MP285:
69
70
If a command returns data, the last byte returned is the task-completed indicator.
70
71
"""
71
72
72
- def __init__ (self , com_port , baud_rate , timeout = 0.25 ):
73
+ def __init__ (self , com_port : str , baud_rate : int , timeout = 0.25 ) -> None :
74
+ """Initialize the MP-285 stage.
75
+
76
+ Parameters
77
+ ----------
78
+ com_port : str
79
+ COM port of the MP-285 stage.
80
+ baud_rate : int
81
+ Baud rate of the MP-285 stage.
82
+ timeout : float
83
+ Timeout for the serial connection.
84
+ """
85
+
86
+ #: serial.Serial: Serial connection to the MP-285 stage
73
87
self .serial = serial .Serial ()
88
+
74
89
self .serial .port = com_port
75
90
self .serial .baudrate = baud_rate
76
91
self .serial .timeout = timeout
@@ -80,32 +95,72 @@ def __init__(self, com_port, baud_rate, timeout=0.25):
80
95
self .serial .xonxoff = False
81
96
self .serial .rtscts = True
82
97
98
+ #: int: Speed of the stage in microns/sec
83
99
self .speed = 1000 # None
100
+
101
+ #: str: Resolution of the stage. High or Low.
84
102
self .resolution = "high" # None
103
+
104
+ #: bool: Wait until the stage is done moving before returning
85
105
self .wait_until_done = True
86
106
107
+ #: float: Time to wait between checking if the stage is done moving
87
108
self .wait_time = 0.002
109
+
110
+ #: int: Number of times to check if the stage is done moving
88
111
self .n_waits = max (int (timeout / self .wait_time ), 1 )
89
112
90
113
# Thread blocking here to prevent calls to get_current_position()
91
114
# while move_to_specified_position is waiting for a response. Serial
92
115
# commands must complete or the MP-285A completely locks up and has
93
116
# to be power cycled.
117
+
118
+ #: threading.Event: Event to prevent writing to the serial port
94
119
self .safe_to_write = threading .Event ()
95
120
self .safe_to_write .set ()
121
+
122
+ #: threading.Lock: Lock to prevent writing to the serial port
96
123
self .write_done_flag = threading .Lock ()
124
+
125
+ #: bool: Flag to indicate if the stage is moving
97
126
self .is_moving = False
127
+
128
+ #: time.time: Time of the last write to the serial port
98
129
self .last_write_time = time .time ()
99
130
131
+ #: int: Number of commands to buffer
100
132
self .commands_num = 10
133
+
134
+ #: int: Index of the top command in the buffer
101
135
self .top_command_idx = 0
136
+
137
+ #: int: Index of the last command in the buffer
102
138
self .last_command_idx = 0
139
+
140
+ #: bool: Flag to indicate if the stage is interrupted
103
141
self .is_interrupted = False
142
+
143
+ #: bool: Flat to indicate of the stage is moving.
104
144
self .is_moving = False
145
+
146
+ #: list: Buffer to store the number of bytes to read for each command
105
147
self .commands_buffer = [1 ] * self .commands_num
106
148
149
+ def send_command (self , command : bytes , response_num = 1 ) -> int :
150
+ """Send a command to the MP-285 stage.
107
151
108
- def send_command (self , command , response_num = 1 ):
152
+ Parameters
153
+ ----------
154
+ command : bytes
155
+ Command to send to the MP-285 stage.
156
+ response_num : int
157
+ Number of bytes to read for the response.
158
+
159
+ Returns
160
+ -------
161
+ idx : int
162
+ Index of the command in the buffer.
163
+ """
109
164
self .safe_to_write .wait ()
110
165
self .safe_to_write .clear ()
111
166
self .write_done_flag .acquire ()
@@ -119,11 +174,23 @@ def send_command(self, command, response_num=1):
119
174
self .last_command_idx = (self .last_command_idx + 1 ) % self .commands_num
120
175
self .write_done_flag .release ()
121
176
return idx
122
-
123
- def read_response (self , idx ):
177
+
178
+ def read_response (self , idx : int ) -> Union [bytes , str , None ]:
179
+ """Read the response from the MP-285 stage.
180
+
181
+ Parameters
182
+ ----------
183
+ idx : int
184
+ Index of the command in the buffer.
185
+
186
+ Returns
187
+ -------
188
+ response : bytes, str, None
189
+ Response from the MP-285 stage.
190
+ """
124
191
if idx != self .top_command_idx :
125
192
return None
126
-
193
+
127
194
for _ in range (self .n_waits ):
128
195
if self .serial .in_waiting >= self .commands_buffer [self .top_command_idx ]:
129
196
r = self .serial .read (self .commands_buffer [self .top_command_idx ])
@@ -132,26 +199,38 @@ def read_response(self, idx):
132
199
self .safe_to_write .set ()
133
200
return r
134
201
time .sleep (self .wait_time )
135
-
136
- logger .error ("Haven't received any responses from MP285! Please check the stage device!" )
202
+
203
+ logger .error (
204
+ "Haven't received any responses from MP285! "
205
+ "Please check the stage device!"
206
+ )
137
207
self .top_command_idx = (self .top_command_idx + 1 ) % self .commands_num
138
208
self .safe_to_write .set ()
139
209
return ""
140
- # raise TimeoutError("Haven't received any responses from MP285! Please check the stage device!")
210
+ # raise TimeoutError("Haven't received any responses
211
+ # from MP285! Please check the stage device!")
212
+
213
+ def connect_to_serial (self ) -> None :
214
+ """Connect to the serial port of the MP-285 stage.
141
215
142
- def connect_to_serial (self ):
216
+ Raises
217
+ ------
218
+ serial.SerialException
219
+ If the serial connection fails.
220
+ """
143
221
try :
144
222
self .serial .open ()
145
223
except serial .SerialException as e :
146
224
print ("MP285 serial connection failed." )
147
225
logger .error (f"{ str (self )} , Could not open port { self .serial .port } " )
148
226
raise e
149
227
150
- def disconnect_from_serial (self ):
228
+ def disconnect_from_serial (self ) -> None :
229
+ """Disconnect from the serial port of the MP-285 stage."""
151
230
self .serial .close ()
152
231
153
232
@staticmethod
154
- def convert_microsteps_to_microns (microsteps ) :
233
+ def convert_microsteps_to_microns (microsteps : float ) -> float :
155
234
"""Converts microsteps to microns
156
235
157
236
Parameters
@@ -169,7 +248,7 @@ def convert_microsteps_to_microns(microsteps):
169
248
return microns
170
249
171
250
@staticmethod
172
- def convert_microns_to_microsteps (microns ) :
251
+ def convert_microns_to_microsteps (microns : float ) -> float :
173
252
"""Converts microsteps to microns.
174
253
175
254
Parameters
@@ -186,7 +265,9 @@ def convert_microns_to_microsteps(microns):
186
265
microsteps = np .divide (microns , 0.04 )
187
266
return microsteps
188
267
189
- def get_current_position (self ):
268
+ def get_current_position (
269
+ self ,
270
+ ) -> Tuple [Optional [float ], Optional [float ], Optional [float ]]:
190
271
"""Get the current stage position.
191
272
192
273
Gets the stage position. The data returned consists of 13 bytes:
@@ -234,20 +315,29 @@ def get_current_position(self):
234
315
235
316
# print(f"received: {position_information}")
236
317
self .is_interrupted = False
237
- l = self .commands_buffer [idx ]
318
+ l = self .commands_buffer [idx ] # noqa
238
319
if len (position_information ) < l :
239
320
return None , None , None
240
- xs = int .from_bytes (position_information [l - 13 :l - 9 ], byteorder = "little" , signed = True )
241
- ys = int .from_bytes (position_information [l - 9 :l - 5 ], byteorder = "little" , signed = True )
242
- zs = int .from_bytes (position_information [l - 5 :- 1 ], byteorder = "little" , signed = True )
321
+ xs = int .from_bytes (
322
+ position_information [l - 13 : l - 9 ], byteorder = "little" , signed = True
323
+ )
324
+ ys = int .from_bytes (
325
+ position_information [l - 9 : l - 5 ], byteorder = "little" , signed = True
326
+ )
327
+ zs = int .from_bytes (
328
+ position_information [l - 5 : - 1 ], byteorder = "little" , signed = True
329
+ )
330
+
243
331
# print(f"converted to microsteps: {xs} {ys} {zs}")
244
332
x_pos = self .convert_microsteps_to_microns (xs )
245
333
y_pos = self .convert_microsteps_to_microns (ys )
246
334
z_pos = self .convert_microsteps_to_microns (zs )
247
335
# print(f"converted to position: {x_pos} {y_pos} {z_pos}")
248
336
return x_pos , y_pos , z_pos
249
337
250
- def move_to_specified_position (self , x_pos , y_pos , z_pos ):
338
+ def move_to_specified_position (
339
+ self , x_pos : float , y_pos : float , z_pos : float
340
+ ) -> bool :
251
341
"""Move to Specified Position (‘m’) Command
252
342
253
343
This command instructs the controller to move all three axes to the position
@@ -297,7 +387,7 @@ def move_to_specified_position(self, x_pos, y_pos, z_pos):
297
387
298
388
return r == bytes .fromhex ("0d" )
299
389
300
- def set_resolution_and_velocity (self , speed , resolution ) :
390
+ def set_resolution_and_velocity (self , speed : int , resolution : str ) -> bool :
301
391
"""Sets the MP-285 stage speed and resolution.
302
392
303
393
This command instructs the controller to move all three axes to the position
@@ -325,7 +415,7 @@ def set_resolution_and_velocity(self, speed, resolution):
325
415
resolution_bit = 1
326
416
if speed > 1310 :
327
417
speed = 1310
328
- logger .error (f "Speed for the high-resolution mode is too fast." )
418
+ logger .error ("Speed for the high-resolution mode is too fast." )
329
419
raise UserWarning (
330
420
"High resolution mode of Sutter MP285 speed too "
331
421
"high. Setting to 1310 microns/sec."
@@ -334,13 +424,13 @@ def set_resolution_and_velocity(self, speed, resolution):
334
424
resolution_bit = 0
335
425
if speed > 3000 :
336
426
speed = 3000
337
- logger .error (f "Speed for the low-resolution mode is too fast." )
427
+ logger .error ("Speed for the low-resolution mode is too fast." )
338
428
raise UserWarning (
339
429
"Low resolution mode of Sutter MP285 speed too "
340
430
"high. Setting to 3000 microns/sec."
341
431
)
342
432
else :
343
- logger .error (f "MP-285 resolution must be 'high' or 'low'" )
433
+ logger .error ("MP-285 resolution must be 'high' or 'low'" )
344
434
raise UserWarning ("MP-285 resolution must be 'high' or 'low'" )
345
435
346
436
speed_and_res = int (resolution_bit * 32768 + speed )
@@ -367,7 +457,7 @@ def set_resolution_and_velocity(self, speed, resolution):
367
457
self .safe_to_write .set ()
368
458
return command_complete
369
459
370
- def interrupt_move (self ):
460
+ def interrupt_move (self ) -> Union [ bool , None ] :
371
461
"""Interrupt stage movement.
372
462
373
463
This command interrupts and stops a move in progress that originally
@@ -417,7 +507,7 @@ def interrupt_move(self):
417
507
self .is_interrupted = False
418
508
return False
419
509
420
- def set_absolute_mode (self ):
510
+ def set_absolute_mode (self ) -> bool :
421
511
"""Set MP285 to Absolute Position Mode.
422
512
423
513
This command sets the nature of the positional values specified with the Move
@@ -442,32 +532,7 @@ def set_absolute_mode(self):
442
532
time .sleep (self .wait_time )
443
533
return False
444
534
445
- # def set_relative_mode(self):
446
- # """Set MP285 to Relative Position Mode.
447
- #
448
- # This command sets the nature of the positional values specified with the Move
449
- # (‘m’) command as relative positions as measured from the current position
450
- # (absolute position returned by the Get Current Position (‘c’) command).
451
- # The command sequence consists of 2 bytes: Command byte, followed by the
452
- # terminator. Return data consists of 1 byte (task-complete indicator).
453
- #
454
- # Returns
455
- # -------
456
- # command_complete : bool
457
- # True if command was successful, False if not.
458
- # """
459
- # # print("calling set_relative_mode")
460
- # self.flush_buffers()
461
- # self.safe_write(bytes.fromhex("62") + bytes.fromhex("0d"))
462
- # response = self.serial.read(1)
463
- # if response == bytes.fromhex("0d"):
464
- # command_complete = True
465
- # else:
466
- # command_complete = False
467
- # self.safe_to_write.set()
468
- # return command_complete
469
-
470
- def refresh_display (self ):
535
+ def refresh_display (self ) -> bool :
471
536
"""Refresh the display on the MP-285 controller.
472
537
473
538
This command refreshes the VFD (Vacuum Fluorescent Display) of the controller.
@@ -490,7 +555,7 @@ def refresh_display(self):
490
555
491
556
return response == bytes .fromhex ("0d" )
492
557
493
- def reset_controller (self ):
558
+ def reset_controller (self ) -> bool :
494
559
"""Reset the MP-285 controller.
495
560
496
561
This command resets the controller. The command sequence consists of 2 bytes:
@@ -503,15 +568,15 @@ def reset_controller(self):
503
568
True if command was successful, False if not.
504
569
"""
505
570
idx = self .send_command (bytes .fromhex ("72" ) + bytes .fromhex ("0d" ))
506
-
571
+
507
572
response = self .read_response (idx )
508
573
if response == bytes .fromhex ("0d" ):
509
574
command_complete = True
510
575
else :
511
576
command_complete = False
512
577
return command_complete
513
578
514
- def get_controller_status (self ):
579
+ def get_controller_status (self ) -> bool :
515
580
"""Get the status of the MP-285 controller.
516
581
517
582
This command gets status information from the controller and returns it in
@@ -526,7 +591,9 @@ def get_controller_status(self):
526
591
"""
527
592
# print("calling get_controller_status")
528
593
# self.flush_buffers()
529
- idx = self .send_command (bytes .fromhex ("73" ) + bytes .fromhex ("0d" ), response_num = 33 )
594
+ idx = self .send_command (
595
+ bytes .fromhex ("73" ) + bytes .fromhex ("0d" ), response_num = 33
596
+ )
530
597
response = self .read_response (idx )
531
598
if len (response ) == 33 and response [- 1 ] == bytes .fromhex ("0d" ):
532
599
command_complete = True
@@ -536,6 +603,6 @@ def get_controller_status(self):
536
603
# not implemented yet. See page 74 of documentation.
537
604
return command_complete
538
605
539
- def close (self ):
606
+ def close (self ) -> None :
540
607
"""Close the serial connection to the stage"""
541
608
self .serial .close ()
0 commit comments