-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathjmri.py
198 lines (171 loc) · 5.7 KB
/
jmri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import json
import urllib.request
import urllib.parse
import logging
from enums import *
SIGNAL_ENUM_TO_JMRI_ASPECT = {
SIGNAL_CLEAR: 'Clear',
SIGNAL_ADVANCE_APPROACH: 'Advance Approach',
SIGNAL_APPROACH: 'Approach',
SIGNAL_APPROACH_CLEAR_SIXTY: 'Approach Clear Sixty',
SIGNAL_APPROACH_CLEAR_FIFTY: 'Approach Clear Fifty',
SIGNAL_APPROACH_DIVERGING: 'Approach Diverging',
SIGNAL_APPROACH_RESTRICTING: 'Approach Restricting',
SIGNAL_RESTRICTING: 'Restricting',
SIGNAL_DIVERGING_CLEAR: 'Diverging Clear',
SIGNAL_DIVERGING_CLEAR_LIMITED: 'Diverging Clear Limited',
SIGNAL_DIVERGING_ADVANCE_APPROACH: 'Diverging Advance Approach',
SIGNAL_DIVERGING_APPROACH: 'Diverging Approach',
SIGNAL_DIVERGING_RESTRICTING: 'Restricting (Diverging)',
SIGNAL_STOP: 'Stop',
}
# Yup: https://github.com/JMRI/JMRI/blob/master/java/src/jmri/SignalHead.java#L56
HEAD_ENUM_TO_JMRI_NUMBER = {
HEAD_GREEN: 16,
HEAD_FLASHING_GREEN: 32,
HEAD_YELLOW: 4,
HEAD_FLASHING_YELLOW: 8,
HEAD_RED: 1,
HEAD_FLASHING_RED: 2,
HEAD_DARK: 0,
}
class JMRI(object):
def __init__(self, jmri_server_address):
self._jmri_server_address = jmri_server_address
def _GetJsonData(self, url_path):
# JMRI JSON docs:
# http://jmri.sourceforge.net/help/en/html/web/JsonServlet.shtml
url = urllib.parse.urljoin(self._jmri_server_address, url_path)
logging.debug('Fetching JMRI JSON data from %s', url)
return json.load(urllib.request.urlopen(url))
def _PostToJMRI(self, url, json_data, create_on_404=False):
req = urllib.request.Request(url, json_data.encode(), {'Content-Type': 'application/json'})
try:
f = urllib.request.urlopen(req)
except Exception as err:
logging.error('JMRI POST failed: %s [%s]', err, url)
return
response = f.read()
f.close()
logging.debug('JMRI POST response: %s', response)
def GetCurrentTurnoutData(self):
"""Returns a dictionary of {turnout name} -> {turnout value}."""
turnout_data_json = self._GetJsonData('/json/turnouts')
turnout_states = {}
for turnout in turnout_data_json:
name = turnout['data']['name']
state = turnout['data']['state']
if state == 2:
turnout_state = TURNOUT_CLOSED
elif state == 4:
turnout_state = TURNOUT_THROWN
else:
#logging.debug(
# 'Turnout %s had unknown json state value %s', name, state)
turnout_state = TURNOUT_UNKNOWN
turnout_states[name] = turnout_state
logging.debug('Fetched data for %d turnouts', len(turnout_states))
return turnout_states
def GetCurrentSensorData(self):
"""Returns a dictionary of {sensor name} -> {sensor value}."""
sensor_data_json = self._GetJsonData('/json/sensors')
sensor_states = {}
logging.debug(sensor_data_json)
for sensor in sensor_data_json:
name = sensor['data']['name']
userName = sensor['data'].get('userName')
state = sensor['data']['state']
if state == 2:
sensor_state = SENSOR_ACTIVE
elif state == 4:
sensor_state = SENSOR_INACTIVE
else:
logging.debug(
'Sensor %s (%s) had unknown json state value %s', name, userName, state)
sensor_state = SENSOR_UNKNOWN
sensor_states[name] = sensor_state
if userName:
sensor_states[userName] = sensor_state
logging.debug('Fetched data for %d sensors', len(sensor_states))
return sensor_states
def GetMemoryVariables(self):
"""Returns {memory_name -> memory_value}."""
memory_data_json = self._GetJsonData('/json/memory')
memory_states = {}
for var in memory_data_json:
name = var['data']['name']
val = var['data']['value']
memory_states[name] = val
logging.debug('Fetched %d memory values', len(memory_states))
return memory_states
def SetTriLightSignalHeadAppearance(self, head_name, unused_address, appearance):
jmri_number = HEAD_ENUM_TO_JMRI_NUMBER.get(appearance, -1)
if jmri_number == -1:
raise RuntimeError('Appearance %s invalid' % appearance)
path = '/json/signalHead/' + head_name
url = urllib.parse.urljoin(self._jmri_server_address, path)
json_data = json.dumps({
"type": "signalHead",
"data": {
"name": head_name,
"state": jmri_number,
}
})
logging.debug('Posting signal head change to %s: %s', url, json_data)
self._PostToJMRI(url, json_data)
def SetSignalMastAspect(self, mast_name, unused_address, aspect):
"""Sets mast_name to an aspect.
mast_name matches a signal mast name in JMRI.
aspect is a SIGNAL_* enum value.
"""
json_state = SIGNAL_ENUM_TO_JMRI_ASPECT.get(aspect)
if not json_state:
raise RuntimeError('Aspect invalid')
path = '/json/signalMast/' + mast_name
url = urllib.parse.urljoin(self._jmri_server_address, path)
json_data = json.dumps({
"type": "signalMast",
"data": {
"name": mast_name,
"state": json_state,
}
})
logging.debug('Posting signal aspect change to %s: %s', url, json_data)
self._PostToJMRI(url, json_data)
def SetMemoryVar(self, var_name, value):
path = '/json/memory/' + var_name
url = urllib.parse.urljoin(self._jmri_server_address, path)
json_data = json.dumps({
"type": "memory",
"data": {
"value": value,
}
})
logging.info("Posting memory var to %s: %s", url, json_data)
self._PostToJMRI(url, json_data, create_on_404=False)
class FakeJMRI(JMRI):
def __init__(self):
super(FakeJMRI, self).__init__('fake_jmri_address')
def GetCurrentTurnoutData(self):
return {
'NT176': TURNOUT_CLOSED,
'NT225': TURNOUT_CLOSED,
'NT227': TURNOUT_CLOSED,
'NT325': TURNOUT_CLOSED,
}
def GetCurrentSensorData(self):
return {
'LS174': SENSOR_INACTIVE,
'LS176': SENSOR_INACTIVE,
'LS177': SENSOR_INACTIVE,
'LS204': SENSOR_INACTIVE,
}
def GetMemoryVariables(self):
return {
'IMBA174': '',
'IMSVL_DISPATCH_SIGNALING': '',
}
def _GetJsonData(self, url_path):
raise NotImplementedError
def _PostToJMRI(self, url, json_data):
return True