-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMPL3115A2.py
217 lines (170 loc) Β· 8.55 KB
/
MPL3115A2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/local/bin/python3
#
# MPL3115A2 Driver for Raspberry Pi
#
# This code is heavily copied from the Adafruit CircuitPython code (https://github.com/adafruit/Adafruit_CircuitPython_MPL3115A2)
# and adapted for use with the smbus library. This was mostly done as an exercise for me, since re-typing code is a good way to learn it.
from smbus2 import SMBus
from NOAA import NOAA
from threading import Timer
import logging
import time
#pylint: disable=bad-whitespace
# Internal constants:
_MPL3115A2_ADDRESS = 0x60
_MPL3115A2_REGISTER_STATUS = 0x00
_MPL3115A2_REGISTER_PRESSURE_MSB = 0x01
_MPL3115A2_REGISTER_PRESSURE_CSB = 0x02
_MPL3115A2_REGISTER_PRESSURE_LSB = 0x03
_MPL3115A2_REGISTER_TEMP_MSB = 0x04
_MPL3115A2_REGISTER_TEMP_LSB = 0x05
_MPL3115A2_REGISTER_DR_STATUS = 0x06
_MPL3115A2_OUT_P_DELTA_MSB = 0x07
_MPL3115A2_OUT_P_DELTA_CSB = 0x08
_MPL3115A2_OUT_P_DELTA_LSB = 0x09
_MPL3115A2_OUT_T_DELTA_MSB = 0x0A
_MPL3115A2_OUT_T_DELTA_LSB = 0x0B
_MPL3115A2_WHOAMI = 0x0C
_MPL3115A2_BAR_IN_MSB = 0x14
_MPL3115A2_BAR_IN_LSB = 0x15
_MPL3115A2_REGISTER_STATUS_TDR = 0x02
_MPL3115A2_REGISTER_STATUS_PDR = 0x04
_MPL3115A2_REGISTER_STATUS_PTDR = 0x08
_MPL3115A2_PT_DATA_CFG = 0x13
_MPL3115A2_PT_DATA_CFG_TDEFE = 0x01
_MPL3115A2_PT_DATA_CFG_PDEFE = 0x02
_MPL3115A2_PT_DATA_CFG_DREM = 0x04
_MPL3115A2_CTRL_REG1 = 0x26
_MPL3115A2_CTRL_REG2 = 0x27
_MPL3115A2_CTRL_REG3 = 0x28
_MPL3115A2_CTRL_REG4 = 0x29
_MPL3115A2_CTRL_REG5 = 0x2A
_MPL3115A2_CTRL_REG1_SBYB = 0x01
_MPL3115A2_CTRL_REG1_OST = 0x02
_MPL3115A2_CTRL_REG1_RST = 0x04
_MPL3115A2_CTRL_REG1_RAW = 0x40
_MPL3115A2_CTRL_REG1_ALT = 0x80
_MPL3115A2_CTRL_REG1_BAR = 0x00
_MPL3115A2_CTRL_REG1_OS1 = 0x00
_MPL3115A2_CTRL_REG1_OS2 = 0x08
_MPL3115A2_CTRL_REG1_OS4 = 0x10
_MPL3115A2_CTRL_REG1_OS8 = 0x18
_MPL3115A2_CTRL_REG1_OS16 = 0x20
_MPL3115A2_CTRL_REG1_OS32 = 0x28
_MPL3115A2_CTRL_REG1_OS64 = 0x30
_MPL3115A2_CTRL_REG1_OS128 = 0x38
_MPL3115A2_REGISTER_STARTCONVERSION = 0x12
#pylint: enable=bad-whitespace
class MPL3115A2:
def __init__(self, i2cBusID, address=_MPL3115A2_ADDRESS, fetchPressure=True):
# If fetchPressure is True then this library will send requests to geolocate the current IP and fetch weather data from the nearest station using the NOAA API.
logging.debug("π Hello, World!")
# Bring up SMBus
self.bus = SMBus(i2cBusID)
if self.bus is None:
raise Exception("Couldn't init I2C bus with identifier %x" % i2cBusID)
self.address = address
# Check that we can speak to the chip and that we're dealing with the right chip
whoami = self.bus.read_byte_data(self.address, _MPL3115A2_WHOAMI)
if whoami != 0xC4:
raise Exception("I2C device found at address %0x but it is not a MPL3115A2 chip (whoami returned %0x)" % (self.address, whoami))
logging.debug("π Connected to MPL3115A2 chip")
# Kick off a reset so that we know the configuration of the device
# I'm not sure this is the best idea, but it's the simplest, and it's what the Adafruit library does so I'm copying it for now
logging.debug("β‘οΈ Initiating a reset...")
# Ignore write errors since the chip might reset in the middle of our message
try:
self.bus.write_byte_data(self.address, _MPL3115A2_CTRL_REG1, _MPL3115A2_CTRL_REG1_RST)
except OSError:
pass
self._pollForReg1(_MPL3115A2_CTRL_REG1_RST)
logging.debug("π« Chip has been reset")
logging.debug("πΆ Configuring chip...")
self._ctrlReg1 = (_MPL3115A2_CTRL_REG1_ALT | _MPL3115A2_CTRL_REG1_OS128)
self._writeReg1()
if fetchPressure is True:
seaLevelPressure = 0
self.noaa = NOAA()
self._updatePressure()
self.bus.write_byte_data(self.address, _MPL3115A2_PT_DATA_CFG, (_MPL3115A2_PT_DATA_CFG_TDEFE | _MPL3115A2_PT_DATA_CFG_PDEFE | _MPL3115A2_PT_DATA_CFG_DREM))
logging.debug("β
Chip activated!")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
logging.info("π
Shutting down. Goodbye!")
self.bus.close()
def _writeReg1(self, requestData=False):
if requestData is True:
self._ctrlReg1 |= _MPL3115A2_CTRL_REG1_OST
else:
self._ctrlReg1 &= ~_MPL3115A2_CTRL_REG1_OST
logging.debug("βοΈ Setting control register to %x" % self._ctrlReg1)
self.bus.write_byte_data(self.address, _MPL3115A2_CTRL_REG1, self._ctrlReg1)
def _pollForReg1(self, mask):
while self.bus.read_byte_data(self.address, _MPL3115A2_CTRL_REG1) & mask > 0:
time.sleep(0.01)
def _pollForStatus(self, mask):
while self.bus.read_byte_data(self.address, _MPL3115A2_REGISTER_STATUS) & mask == 0:
time.sleep(0.01)
def _updatePressure(self):
try:
logging.debug("π¦ Fetching pressure data from NOAA...")
pressure = self.noaa.getCurrentSeaLevelPressure()
logging.debug("βπ‘ Pressure fetched from NOAA is %dPa" % pressure)
self.seaLevelPressure = pressure
except Exception as e:
logging.info("π¨ Error fetching pressure data from NOAA: {}".format(e))
# ----
# Temperature
@property
def temperature(self):
self._pollForStatus(_MPL3115A2_REGISTER_STATUS_TDR)
tempBytes = self.bus.read_i2c_block_data(self.address, _MPL3115A2_REGISTER_TEMP_MSB, 2)
return (tempBytes[0]) + ((tempBytes[1]>>4 & 0x0F)/16.0)
# ----
# ----
# Altitude
@property
def altitude(self):
# Wait for any previous measurements to finish
self._pollForReg1(_MPL3115A2_CTRL_REG1_OST)
if (self._ctrlReg1 & _MPL3115A2_CTRL_REG1_ALT) != _MPL3115A2_CTRL_REG1_ALT:
self._ctrlReg1 |= _MPL3115A2_CTRL_REG1_ALT
self._writeReg1()
# Request a sensor read
self._writeReg1(requestData=True)
# Wait for PDR to be set
self._pollForStatus(_MPL3115A2_REGISTER_STATUS_PDR)
altitudeBytes = self.bus.read_i2c_block_data(self.address, _MPL3115A2_REGISTER_PRESSURE_MSB, 3)
logging.debug("π» Altitude data read from chip is %s" % (",".join([hex(i) for i in altitudeBytes])))
# The reading is a 20 bit value. The first two bytes are the meters (in 2's complement) and the third byte is the fractions of a meter, stored in the top four bits (not 2's complement)
return (altitudeBytes[0]<<8 | altitudeBytes[1]) + ((altitudeBytes[2]>>4 & 0x0F)/16.0)
@property
def seaLevelPressure(self):
pressureBytes = self.bus.read_i2c_block_data(self.address, _MPL3115A2_BAR_IN_MSB, 2)
return ((pressureBytes[0] << 8) | (pressureBytes[1])) << 1
@seaLevelPressure.setter
def seaLevelPressure(self, pressure):
pressure //= 2
pressureLSB = pressure & 0xFF
pressureMSB = (pressure >> 8) & 0xFF
logging.debug("π‘ Writing pressure data of %dPa (%d, %0x|%0x)" % (pressure >> 1, pressure, pressureMSB, pressureLSB))
self.bus.write_i2c_block_data(self.address, _MPL3115A2_BAR_IN_MSB, [pressureMSB, pressureLSB])
# ----
# ----
# Pressure
@property
def pressure(self):
# Wait for any previous measurements to finish
self._pollForReg1(_MPL3115A2_CTRL_REG1_OST)
if (self._ctrlReg1 & _MPL3115A2_CTRL_REG1_ALT) == _MPL3115A2_CTRL_REG1_ALT:
self._ctrlReg1 &= ~_MPL3115A2_CTRL_REG1_ALT
self._writeReg1()
# Request a sensor read
self._writeReg1(requestData=True)
# Wait for PDR to be set
self._pollForStatus(_MPL3115A2_REGISTER_STATUS_PDR)
pressureBytes = self.bus.read_i2c_block_data(self.address, _MPL3115A2_REGISTER_PRESSURE_MSB, 3)
logging.debug("π¬ Pressure data read from chip is %s" % (",".join([hex(i) for i in pressureBytes])))
# The reading is a 20 bit value. The first two bytes are the meters (in 2's complement) and the third byte is the fractions of a meter, stored in the top four bits (not 2's complement)
return (pressureBytes[0]<<10 | pressureBytes[1]<<2 | ((pressureBytes[2]&0xC0 >> 6) & 0x03a)) + ((pressureBytes[2]>>4 & 0x03)/4.0)