forked from spraints/fowsr
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfowsr-server.rb
260 lines (228 loc) · 6.4 KB
/
fowsr-server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
require "socket"
require "logger"
require "json"
#/ Usage: ruby fowsr-server.rb [--fowsr /path/to/fowsr] [--listen /path/to/sock]
def main(options)
server = ServerInfo.new(options)
install_signal_handlers server
listen server
respawn_fowsr server
loop do
do_io server
break if server.received_quit_signal?
respawn_fowsr server
end
kill_fowsr server
end
# Avoid dumping stack traces when we exit in a controlled way.
def install_signal_handlers(server)
trap(:CHLD) { server.awake }
[:INT, :TERM, :QUIT].each do |quit_signal|
trap(quit_signal) { server.receive_quit_signal quit_signal }
end
[:HUP, :PIPE, :USR1, :USR2].each do |ignore_signal|
trap(ignore_signal) { }
end
end
# Start listening on the configured socket.
def listen(server)
if File.exist?(server.socket_addr) && File.socket?(server.socket_addr)
begin
UNIXSocket.new(server.socket_addr)
rescue Errno::ECONNREFUSED
# The socket is left over from a previous run.
File.unlink(server.socket_addr)
end
end
server.socket = UNIXServer.new(server.socket_addr)
File.chmod(0777, server.socket_addr)
server.logger.info "Listening on #{File.expand_path(server.socket.path)}, fd = #{server.socket.to_i}"
end
# Wait for any IO activity.
# * read from fowsr
# * accept new clients
def do_io(server)
handlers = {
server.socket => lambda { accept_client(server) },
server.fowsr_reader => lambda { consume_fowsr(server) },
server.awake_reader => lambda { server.awake_reader.read_nonblock(10000) },
}
server.clients.each do |client|
handlers[client] = lambda { check_client(server, client) }
end
server.logger.debug "Checking for input. #{server.clients.size} clients attached."
rs, = IO.select(handlers.keys, nil, nil, 5)
if rs
rs.each do |read_fd|
if handler = handlers[read_fd]
handler.call
end
end
end
end
# Interpret the output of fowsr and send it along to all clients.
def consume_fowsr(server)
data = server.fowsr_reader.read_nonblock(10000)
server.logger.info "Got #{data.size} bytes from fowsr."
if data = convert(server, data)
server.clients.each do |client|
client.write(data)
end
end
end
# Given fowsr data, build the format required for clients.
#
# Conditions:
# Outdoor temp: 33F
# Outdoor humidity: 72%
# Indoor temp: 61F
# Indoor humidity: 52%
# pressure: 28inHg
#
# Input:
# DTime 25-11-2014 00:03:00
# ETime 1416898363
# RHi 52.0
# Ti 16.3
# RHo 72.0
# To 0.7
# RP 1004.3
# WS 0.0
# WG 0.0
# DIR 270.0
# Rtot 0.3
# state 00
#
# Output: (note this only includes data that's valid on my weather station)
# {"time": 1416898363, "indoor_rh": 52, "indoor_c": 16.3, "indoor_f": 61.2, "outdoor_rh": 72, "outdoor_c": 0.7, "outdoor_f": 33.3, "pressure_mbar": 1004.3, "pressure_inhg": 29.656}
def convert(server, data)
converted = {}
data.lines.each do |line|
case line
when /^ETime (\d+)/
converted["time"] = $1.to_i
when /^RH([io]) ([0-9.]+)/
converted["#{Location.fetch($1)}_rh"] = $2.to_f
when /^T([io]) (-?[0-9.]+)/
loc = Location.fetch($1)
converted["#{loc}_c"] = c = $2.to_f
converted["#{loc}_f"] = f = (c * 9 / 5) + 32
when /^RP ([0-9.]+)/
converted["pressure_mbar"] = mbar = $1.to_f
converted["pressure_inhg"] = inhg = mbar * 0.0295299833
when /^DIR ([0-9]+)/
converted["wind_dir"] = $1.to_i
end
end
if converted.any?
JSON.dump(converted)
else
nil
end
end
Location = {"i" => "indoor", "o" => "outdoor"}
# Try to accept a client connection to the socket.
def accept_client(server)
client = server.socket.accept_nonblock
server.logger.info "Accepted client on fd = #{client.to_i}"
server.clients << client
rescue Object => e
server.logger.error "Could not accept: #{e.class.name}: #{e}"
end
# Handle "readable" on client sockets. Also, discard any data they send us.
def check_client(server, client)
client.read_nonblock(10000)
rescue EOFError
server.logger.info "Client #{client.to_i} closed."
server.clients.delete client
client.close
end
def respawn_fowsr(server)
if server.fowsr_pid
if Process.waitpid(server.fowsr_pid, Process::WNOHANG)
server.logger.info "fowsr[#{server.fowsr_pid}] #{$?}"
server.fowsr_pid = nil
# The weather values only change every 60s.
# Most of the time, fowsr runs in ~ 5s.
# So we wait at least 55s between updates.
server.next_start = Time.now + 55
end
end
if server.fowsr_pid.nil? && server.next_start < Time.now
server.fowsr_pid = spawn(server.fowsr_path, "-c", :out => server.fowsr_writer)
server.logger.info "fowsr[#{server.fowsr_pid}] spawned."
end
end
def kill_fowsr(server)
if server.fowsr_pid
Process.kill :QUIT, server.fowsr_pid
end
end
class ServerInfo
def initialize(options)
@options = options
@next_start = Time.now - 1
end
attr_reader :options
# The path to the fowsr executable.
def fowsr_path ; options.fetch(:fowsr) ; end
# The address of the listening socket.
def socket_addr ; options.fetch(:sock) ; end
def logger
@logger ||= Logger.new(STDERR)
end
# The Socket (UNIXServer) where we listen for new connections.
attr_accessor :socket
# The Sockets of clients.
def clients
@clients ||= []
end
# The pair of IO for fowsr.
def fowsr_pipe
@fowsr_pipe ||= IO.pipe
end
def fowsr_reader ; fowsr_pipe[0] ; end
def fowsr_writer ; fowsr_pipe[1] ; end
# The pid of the most-recently spawned `fowsr` process.
attr_accessor :fowsr_pid
# The time to start another `fowsr` process.
attr_accessor :next_start
# Has a quit_signal been received?
def received_quit_signal?
@quit || false
end
# Time to exit.
def receive_quit_signal(signal)
@quit = true
awake
end
# Time to pop out of IO.select.
def awake
awake_writer << "."
end
# The pair of IO for awakening.
def awake_pipe
@awake_pipe ||= IO.pipe
end
def awake_reader ; awake_pipe[0] ; end
def awake_writer ; awake_pipe[1] ; end
end
def parse_args(argv)
options = {
:fowsr => File.expand_path("fowsr", File.dirname(__FILE__)),
:sock => "/var/run/fowsr.sock",
}
while argv.any?
case argv.shift
when "--fowsr"
options[:fowsr] = argv.shift
when "--listen"
options[:sock] = argv.shift
else
puts File.read(__FILE__).lines.grep(/^#\//).map { |line| line[3..-1] }
exit 1
end
end
options.freeze
end
main(parse_args(ARGV))