-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmain.rs
340 lines (292 loc) · 11.7 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
// General
pub mod aggregate;
pub mod carbon;
pub mod config;
pub mod errors;
pub mod management;
pub mod peer;
pub mod raft;
pub mod async_udp;
pub mod sync_udp;
pub mod stats;
pub mod fast_task;
pub mod slow_task;
pub mod util;
pub mod cache;
use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{panic, process, thread};
use slog::{info, o, debug};
use futures::future::{pending, TryFutureExt};
use slog::{error, warn};
use tokio::runtime::Builder;
use tokio::time::interval_at;
use tokio1::runtime::current_thread::Runtime;
use tokio1::timer::Delay;
use bioyino_metric::metric::Metric;
use bioyino_metric::name::MetricName;
use once_cell::sync::Lazy;
use crate::carbon::carbon_timer;
use crate::config::{Command, Network, System};
use crate::errors::GeneralError;
use crate::management::MgmtClient;
use crate::peer::{NativeProtocolServer, NativeProtocolSnapshot};
use crate::raft::start_internal_raft;
pub use crate::stats::OwnStats;
use crate::fast_task::start_fast_threads;
use crate::slow_task::start_slow_threads;
use crate::sync_udp::start_sync_udp;
use crate::async_udp::start_async_udp;
use crate::util::{retry_with_backoff, setup_logging, try_resolve, Backoff};
use crate::management::ConsensusState;
use crate::config::ConsensusKind;
// floating type used all over the code, can be changed to f32, to use less memory at the price of
// precision
#[cfg(feature = "f32")]
pub type Float = f32;
#[cfg(not(feature = "f32"))]
pub type Float = f64;
// a type to store pre-aggregated data
pub type Cache = HashMap<MetricName, Metric<Float>>;
pub static CONSENSUS_STATE: Lazy<Mutex<ConsensusState>> = Lazy::new(|| Mutex::new(ConsensusState::Disabled));
pub static IS_LEADER: AtomicBool = AtomicBool::new(false);
fn main() {
let (system, command) = System::load().expect("loading config");
let config = system.clone();
#[allow(clippy::unneeded_field_pattern)]
let System {
verbosity_console,
verbosity_syslog,
daemon,
network:
Network {
listen,
peer_listen,
mgmt_listen,
bufsize,
multimessage,
mm_packets,
mm_async,
mm_timeout,
buffer_flush_time,
..
},
raft,
metrics: _,
aggregation,
naming,
carbon,
n_threads,
a_threads,
stats_interval: s_interval,
start_as_leader,
stats_prefix,
consensus,
..
} = system;
if daemon && verbosity_syslog.is_off() {
eprintln!("syslog is disabled, while daemon mode is on, no logging will be performed");
}
// Since daemonizing closes all opened resources, inclusing syslog, we mut do it before everything else
if daemon {
let result = unsafe { libc::daemon(0, 0) };
if result < 0 {
println!("daemonize failed: {}", io::Error::last_os_error());
std::process::exit(1);
}
}
let rlog = setup_logging(daemon, verbosity_console, verbosity_syslog);
// this lets root logger live as long as it needs
let _guard = slog_scope::set_global_logger(rlog.clone());
slog_stdlog::init().unwrap();
let runtime = Builder::new_multi_thread()
.thread_name("bioyino_async")
.worker_threads(a_threads)
.enable_all()
.build()
.expect("creating runtime for main thread");
if let Command::Query(command, dest) = command {
let dest = try_resolve(&dest);
let command = MgmtClient::new(rlog.clone(), dest, command);
runtime.block_on(command.run()).unwrap_or_else(|e| {
warn!(rlog,
"error sending command";
"dest"=>format!("{}", &dest),
"error"=> format!("{}", e),
)
});
return;
}
let config = Arc::new(config);
let log = rlog.new(o!("thread" => "main"));
// To avoid strange side effects, like blocking the whole process in unknown state
// we prefer to exit after any panic in any thread
// So we set the panic hook to exit
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(42);
}));
let elog = log.clone();
let (fast_chans, fast_prio_chans) = start_fast_threads(log.clone(), config.clone())
.map_err(move |e| error!(elog, "starting parsing worker threads"; "error" => format!("{}", e)))
.expect("starting parsing worker thread");
let elog = log.clone();
let slow_chan = start_slow_threads(log.clone(), &config)
.map_err(move |e| error!(elog, "starting counting worker threads"; "error" => format!("{}", e)))
.expect("starting counting worker threads");
let own_stat_log = log.clone();
let stats_prefix = stats_prefix.trim_end_matches('.').to_string();
// Spawn future gatering bioyino own stats
info!(own_stat_log, "starting own stats counter");
let own_stats = OwnStats::new(
s_interval, stats_prefix, slow_chan.clone(),
fast_prio_chans.clone(), own_stat_log, carbon.clone(),
aggregation.clone(), naming.clone(),
);
runtime.spawn(own_stats.run());
let compat_log = rlog.new(o!("thread" => "compat"));
let snap_err_log = compat_log.clone();
// TODO: unfortunately, the old entities are using timers and/or older tokio spawn function
// therefore they are incompatible between old and new tokio runtimes, even using `compat`
// adaptor trait
//
// this thread cen be eliminated after migrating sender and receiver to latest capnp and std
// futures
thread::Builder::new()
.name("bioyino_compat".into())
.spawn(move || {
use futures1::future::empty;
use futures1::Future as Future1;
let mut runtime = Runtime::new().expect("creating runtime for counting worker");
// Init leader state before starting backend
IS_LEADER.store(start_as_leader, Ordering::SeqCst);
let consensus_log = compat_log.clone();
match consensus {
ConsensusKind::Internal => {
let log = compat_log.clone();
let flog = compat_log.clone();
thread::Builder::new()
.name("bioyino_raft".into())
.spawn(move || {
let mut runtime = Runtime::new().expect("creating runtime for raft thread");
if start_as_leader {
warn!(
log,
"Starting as leader with enabled consensus. More that one leader is possible before consensus settle up."
);
}
let d = Delay::new(Instant::now() + Duration::from_millis(raft.start_delay));
let log = log.clone();
let delayed = d.map_err(|_| ()).and_then(move |_| {
let mut con_state = CONSENSUS_STATE.lock().unwrap();
*con_state = ConsensusState::Enabled;
info!(log, "starting internal consensus"; "initial_state"=>format!("{:?}", *con_state));
start_internal_raft(raft, consensus_log);
Ok(())
});
runtime.spawn(delayed);
runtime.block_on(empty::<(), ()>()).expect("raft thread failed");
info!(flog, "consensus thread stopped");
})
.expect("starting counting worker thread");
}
ConsensusKind::None => {
if !start_as_leader {
// starting as non-leader in this mode can be useful for agent mode
// so we don't disorient user with warnings
info!(
compat_log,
"Starting as non-leader with disabled consensus. No metrics will be sent until leader is switched on by command"
);
}
}
}
runtime.block_on(empty::<(), ()>()).expect("compat thread failed");
})
.expect("starting compat thread");
// settings safe for asap restart
info!(log, "starting snapshot receiver");
let peer_server_bo = Backoff {
delay: 1,
delay_mul: 1f32,
delay_max: 1,
retries: ::std::usize::MAX,
};
let server_chan = slow_chan.clone();
let server_log = log.clone();
let peer_server = retry_with_backoff(peer_server_bo, move || {
let server_log = server_log.clone();
let peer_server = NativeProtocolServer::new(server_log.clone(), peer_listen, server_chan.clone());
peer_server.run().inspect_err(move |e| {
info!(server_log, "error running snapshot server"; "error"=>format!("{}", e));
})
});
runtime.spawn(peer_server);
info!(log, "starting snapshot sender");
let snapshot = NativeProtocolSnapshot::new(
&log,
config.clone(),
&fast_prio_chans,
slow_chan.clone(),
);
runtime.spawn(snapshot.run().map_err(move |e| {
s!(peer_errors);
info!(snap_err_log, "error running snapshot sender"; "error"=>format!("{}", e));
}));
info!(log, "starting management server");
let m_serv_log = rlog.clone();
let m_server = async move { hyper::Server::bind(&mgmt_listen).serve(management::MgmtServer(m_serv_log, mgmt_listen)).await };
runtime.spawn(m_server);
info!(log, "starting carbon backend");
let carbon_log = log.clone();
let carbon_t = carbon_timer(log.clone(), carbon, aggregation, naming, slow_chan.clone())
.map_err(move |e| error!(carbon_log, "running carbon thread"; "error" => format!("{}", e)));
runtime.spawn(carbon_t);
if multimessage {
let flush_flags = start_sync_udp(
log,
listen,
&fast_chans,
config.clone(),
n_threads,
bufsize,
mm_packets,
mm_async,
mm_timeout,
);
// spawn a flushing timer if required
if buffer_flush_time > 0 {
let flush_timer = async move {
let dur = Duration::from_millis(buffer_flush_time);
let mut timer = interval_at(tokio::time::Instant::now() + dur, dur);
loop {
timer.tick().await;
debug!(rlog, "buffer flush requested");
flush_flags.iter().map(|flag| flag.swap(true, Ordering::SeqCst)).last();
}
};
runtime.spawn(flush_timer);
}
} else {
info!(log, "multimessage is disabled, starting in async UDP mode");
let flush_sender = start_async_udp(log, &fast_chans, config.clone());
if buffer_flush_time > 0 {
let flush_timer = async move {
let dur = Duration::from_millis(buffer_flush_time);
let mut timer = interval_at(tokio::time::Instant::now() + dur, dur);
loop {
timer.tick().await;
debug!(rlog, "buffer flush requested");
flush_sender.notify_waiters();
}
};
runtime.spawn(flush_timer);
}
};
runtime.block_on(pending::<()>());
}