-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadaradioInterface.py
290 lines (245 loc) · 10.5 KB
/
adaradioInterface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# MIT License - Copyright (c) 2024 Mark Qvist / unsigned.io
# This example illustrates creating a custom interface
# definition, that can be loaded and used by Reticulum at
# runtime. Any number of custom interfaces can be created
# and loaded. To use the interface place it in the folder
# ~/.reticulum/interfaces, and add an interface entry to
# your Reticulum configuration file similar to this:
# [[adaradio Custom Interface]]
# type = ExampleInterface
# enabled = no
# mode = gateway
# port = /dev/ttyUSB0
# speed = 115200
# databits = 8
# parity = none
# stopbits = 1
from time import sleep
import sys
import threading
import time
import busio
from digitalio import DigitalInOut, Direction, Pull
import board
# Import RFM9x
import adafruit_rfm9x
# This HDLC helper class is used by the interface
# to delimit and packetize data over the physical
# medium - in this case a serial connection.
class HDLC():
# This example interface packetizes data using
# simplified HDLC framing, similar to PPP
FLAG = 0x7E
ESC = 0x7D
ESC_MASK = 0x20
@staticmethod
def escape(data):
data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK]))
data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK]))
return data
# Let's define our custom interface class. It must
# be a sub-class of the RNS "Interface" class.
class adaradioInterface(Interface):
# All interface classes must define a default
# IFAC size, used in IFAC setup when the user
# has not specified a custom IFAC size. This
# option is specified in bytes.
DEFAULT_IFAC_SIZE = 8
# The following properties are local to this
# particular interface implementation.
owner = None
port = None
speed = None
databits = None
parity = None
stopbits = None
serial = None
# All Reticulum interfaces must have an __init__
# method that takes 2 positional arguments:
# The owner RNS Transport instance, and a dict
# of configuration values.
def __init__(self, owner, configuration):
# The following lines demonstrate handling
# potential dependencies required for the
# interface to function correctly.
# import importlib
# if importlib.util.find_spec('busio') != None:
# Import Blinka Libraries
# else:
# RNS.log("Using this interface requires the adafruit_rfm9x to be installed.", RNS.LOG_CRITICAL)
# RNS.log("You can find this at https://github.com/chengtripp/Adafruit_CircuitPython_RFM9x", RNS.LOG_CRITICAL)
# RNS.panic()
# We start out by initialising the super-class
super().__init__()
# To make sure the configuration data is in the
# correct format, we parse it through the following
# method on the generic Interface class. This step
# is required to ensure compatibility on all the
# platforms that Reticulum supports.
ifconf = Interface.get_config_obj(configuration)
# Read the interface name from the configuration
# and set it on our interface instance.
name = ifconf["name"]
self.name = name
# All interfaces must supply a hardware MTU value
# to the RNS Transport instance. This value should
# be the maximum data packet payload size that the
# underlying medium is capable of handling in all
# cases without any segmentation.
self.HW_MTU = 256 #I've changed this
self.owner = owner
# We initially set the "online" property to false,
# since the interface has not actually been fully
# initialised and connected yet.
self.online = False
# In this case, we can also set the indicated bit-
# rate of the interface to the serial port speed.
self.bitrate = 9600
# Configure internal properties on the interface
# according to the supplied configuration.
#self.port = port
self.timeout = 100
# Since all required parameters are now configured,
# we will try opening the serial port.
# Configure LoRa Radio
self.open_port()
# If opening the port succeeded, run any post-open
# configuration required.
#if self.serial.is_open:
self.configure_device()
#else:
# raise IOError("Could not open serial port")
# Open the serial port with supplied configuration
# parameters and store a reference to the open port.
def open_port(self):
try:
CS = DigitalInOut(board.CE1)
RESET = DigitalInOut(board.D25)
self.spi = busio.SPI(board.SCK, MOSI=board.MOSI, MISO=board.MISO)
except Exception as e:
RNS.log("Could not setup radio for interface "+str(self), RNS.LOG_ERROR)
raise e
RNS.log("Configuring Radio", RNS.LOG_VERBOSE)
self.rfm9x = adafruit_rfm9x.RFM9x(self.spi, CS, RESET, 867.2)
self.rfm9x.tx_power = 7
self.rfm9x.signal_bandwidth = 125000
self.rfm9x.coding_rate = 5
self.rfm9x.spreading_factor = 8
self.rfm9x.enable_crc = False
self.rfm9x.preamble_length = 18
# The only thing required after opening the port
# is to wait a small amount of time for the
# hardware to initialise and then start a thread
# that reads any incoming data from the device.
def configure_device(self):
sleep(0.5)
thread = threading.Thread(target=self.read_loop)
thread.daemon = True
thread.start()
self.online = True
RNS.log("Radio Online", RNS.LOG_VERBOSE)
# This method will be called from our read-loop
# whenever a full packet has been received over
# the underlying medium.
def process_incoming(self, data):
# Update our received bytes counter
self.rxb += len(data)
# Might need some parsing
data = data[1:]
# And send the data packet to the Transport
# instance for processing.
self.owner.inbound(data, self)
# The running Reticulum Transport instance will
# call this method on the interface whenever the
# interface must transmit a packet.
def process_outgoing(self,data):
if self.online:
# First, escape and packetize the data
# according to HDLC framing.
data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG])
# Then write the framed data to the port
written = self.serial.write(data)
# Update the transmitted bytes counter
# and ensure that all data was written
self.txb += len(data)
if written != len(data):
raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data)))
# This read loop runs in a thread and continously
# receives bytes from the underlying serial port.
# When a full packet has been received, it will
# be sent to the process_incoming methed, which
# will in turn pass it to the Transport instance.
def read_loop(self):
# try:
#in_frame = False
#escape = False
#data_buffer = b""
#last_read_ms = int(time.time()*1000)
while True:
self.online = True
data_buffer = self.rfm9x.receive_raw()
if data_buffer != None:
RNS.log("data buffer: {}".format(data_buffer), RNS.LOG_VERBOSE)
self.process_incoming(data_buffer)
time.sleep(0.05)
# if self.serial.in_waiting:
# byte = ord(self.serial.read(1))
# last_read_ms = int(time.time()*1000)
# if (in_frame and byte == HDLC.FLAG):
# in_frame = False
# self.process_incoming(data_buffer)
# elif (byte == HDLC.FLAG):
# in_frame = True
# data_buffer = b""
# elif (in_frame and len(data_buffer) < self.HW_MTU):
# if (byte == HDLC.ESC):
# escape = True
# else:
# if (escape):
# if (byte == HDLC.FLAG ^ HDLC.ESC_MASK):
# byte = HDLC.FLAG
# if (byte == HDLC.ESC ^ HDLC.ESC_MASK):
# byte = HDLC.ESC
# escape = False
# data_buffer = data_buffer+bytes([byte])
#else:
# time_since_last = int(time.time()*1000) - last_read_ms
# if len(data_buffer) > 0 and time_since_last > self.timeout:
# data_buffer = b""
# in_frame = False
# escape = False
# sleep(0.08)
# except Exception as e:
# self.online = False
# RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR)
# RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR)
# if RNS.Reticulum.panic_on_interface_error:
# RNS.panic()
# RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR)
self.online = False
self.serial.close()
self.reconnect_port()
# This method handles serial port disconnects.
def reconnect_port(self):
#while not self.online:
# try:
# time.sleep(5)
# RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_VERBOSE)
# self.open_port()
# if self.serial.is_open:
# self.configure_device()
# except Exception as e:
# RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.log("Reconnected serial port for "+str(self))
# Signal to Reticulum that this interface should
# not perform any ingress limiting.
def should_ingress_limit(self):
return False
# We must provide a string representation of this
# interface, that is used whenever the interface
# is printed in logs or external programs.
def __str__(self):
return "adaradioInterface["+self.name+"]"
# Finally, register the defined interface class as the
# target class for Reticulum to use as an interface
interface_class = adaradioInterface