forked from tino/pyFirmata
-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathmockup.py
145 lines (115 loc) · 3.95 KB
/
mockup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from collections import deque
import pyfirmata2
class MockupSerial(deque):
"""
A Mockup object for python's Serial. Functions as a fifo-stack. Push to
it with ``write``, read from it with ``read``.
"""
def __init__(self, port, baudrate, timeout=0.02):
self.port = port or 'somewhere'
def read(self, count=1):
if count > 1:
val = []
for i in range(count):
try:
val.append(self.popleft())
except IndexError:
break
else:
try:
val = self.popleft()
except IndexError:
val = bytearray()
val = [val] if not hasattr(val, '__iter__') else val
return bytearray(val)
def write(self, value):
"""
Appends bytes flat to the deque. So iterables will be unpacked.
"""
if hasattr(value, '__iter__'):
bytearray(value)
self.extend(value)
else:
bytearray([value])
self.append(value)
def close(self):
self.clear()
def inWaiting(self):
return len(self)
class MockupBoard(pyfirmata2.Board):
def __init__(self, port, layout, values_dict={}):
self.sp = MockupSerial(port, 57600)
self.setup_layout(layout)
self.values_dict = values_dict
self.id = 1
self.samplerThread = Iterator(self)
def reset_taken(self):
for key in self.taken['analog']:
self.taken['analog'][key] = False
for key in self.taken['digital']:
self.taken['digital'][key] = False
def update_values_dict(self):
for port in self.digital_ports:
port.values_dict = self.values_dict
port.update_values_dict()
for pin in self.analog:
pin.values_dict = self.values_dict
class MockupPort(pyfirmata2.Port):
def __init__(self, board, port_number):
self.board = board
self.port_number = port_number
self.reporting = False
self.pins = []
for i in range(8):
pin_nr = i + self.port_number * 8
self.pins.append(MockupPin(self.board, pin_nr, type=pyfirmata2.DIGITAL, port=self))
def update_values_dict(self):
for pin in self.pins:
pin.values_dict = self.values_dict
class MockupPin(pyfirmata2.Pin):
def __init__(self, *args, **kwargs):
self.values_dict = kwargs.get('values_dict', {})
try:
del kwargs['values_dict']
except KeyError:
pass
super(MockupPin, self).__init__(*args, **kwargs)
def read(self):
if self.value is None:
try:
type = self.port and 'd' or 'a'
return self.values_dict[type][self.pin_number]
except KeyError:
return None
else:
return self.value
def get_in_output(self):
if not self.port and not self.mode: # analog input
return 'i'
else:
return 'o'
def set_active(self, active):
self.is_active = active
def get_active(self):
return self.is_active
def write(self, value):
if self.mode == pyfirmata2.UNAVAILABLE:
raise IOError("Cannot read from pin {0}".format(self.pin_number))
if self.mode == pyfirmata2.INPUT:
raise IOError("{0} pin {1} is not an output"
.format(self.port and "Digital" or "Analog", self.get_pin_number()))
if not self.port:
raise AttributeError("AnalogPin instance has no attribute 'write'")
# if value != self.read():
self.value = value
class Iterator(object):
def __init__(self, *args, **kwargs):
self.running = False
def start(self):
pass
def stop(self):
pass
if __name__ == '__main__':
import doctest
doctest.testmod()
# TODO make these unittests as this doesn't work due to relative imports