Skip to content

Commit 6c7f415

Browse files
authored
Merge pull request #45 from w3f/updates
Update dependencies and additional changes
2 parents 08767f4 + 1c9ac3e commit 6c7f415

File tree

10 files changed

+1334
-1048
lines changed

10 files changed

+1334
-1048
lines changed

Cargo.lock

+1,204-956
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+13-13
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@ name = "matrixbot"
1414
path = "src/main.rs"
1515

1616
[dependencies]
17-
log = "0.4.14"
18-
env_logger = "0.9.0"
17+
log = "0.4.17"
18+
env_logger = "0.10.0"
19+
tokio = "1.26.0"
1920
anyhow = "1.0.43"
20-
serde = "1.0.129"
21-
serde_json = "1.0.66"
22-
serde_yaml = "0.8.20"
21+
serde = "1.0.158"
22+
serde_json = "1.0.94"
23+
serde_yaml = "0.9.19"
2324
matrix-sdk = "0.3.0"
2425
ruma = "0.2.0"
25-
actix = "0.12.0"
26-
actix-web = "4.0.0-beta.9"
26+
actix = "0.13.0"
27+
actix-web = "4.3.1"
2728
url = "2.2.2"
28-
async-trait = "0.1.51"
29-
futures = "0.3.17"
30-
structopt = "0.3.23"
29+
async-trait = "0.1.67"
30+
futures = "0.3.27"
31+
structopt = "0.3.26"
3132
md5 = "0.7.0"
32-
mongodb = { version = "2.0.0-beta", features = ["bson-u2i"] }
33-
bson = "2.0.0-beta"
34-
tokio = "1.14.0"
33+
mongodb = "2.4.0"
34+
bson = "2.6.1"

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# https://github.com/LukeMathWalker/cargo-chef
22
# Leveraging the pre-built Docker images with
33
# cargo-chef and the Rust toolchain
4-
FROM lukemathwalker/cargo-chef:latest-rust-1.55.0 AS chef
4+
FROM lukemathwalker/cargo-chef:latest-rust-1.68.0 AS chef
55
WORKDIR app
66

77
FROM chef AS planner

charts/matrixbot-ack/Chart.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
description: Matrixbot Ack
22
name: matrixbot-ack
3-
version: v0.2.7
3+
version: v0.2.10
44
apiVersion: v2

charts/matrixbot-ack/values.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ environment: production
22

33
image:
44
repository: web3f/matrixbot-ack
5-
tag: v0.2.7
5+
tag: v0.2.10
66
pullPolicy: IfNotPresent
77

88
config:

src/database.rs

+32-11
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@ use crate::{unix_time, AlertId, Result};
44
// TODO: Can this be avoided somehow?
55
use bson::{doc, to_bson};
66
use futures::stream::StreamExt;
7+
use mongodb::IndexModel;
78
use mongodb::{
89
options::{FindOneAndUpdateOptions, ReplaceOptions, ReturnDocument},
910
Client, Database as MongoDb,
1011
};
1112
use std::collections::HashMap;
1213

13-
const PENDING: &'static str = "pending";
14-
const HISTORY: &'static str = "history";
15-
const ID_CURSOR: &'static str = "id_cursor";
14+
const PENDING: &str = "pending";
15+
const HISTORY: &str = "history";
16+
const ID_CURSOR: &str = "id_cursor";
1617

1718
#[derive(Debug, Clone, Serialize, Deserialize)]
1819
pub struct DatabaseConfig {
@@ -41,11 +42,31 @@ struct PendingAlertsEntry(HashMap<AlertId, Alert>);
4142

4243
impl Database {
4344
pub async fn new(config: DatabaseConfig) -> Result<Self> {
44-
Ok(Database {
45-
db: Client::with_uri_str(config.uri)
46-
.await?
47-
.database(&config.name),
48-
})
45+
let db = Client::with_uri_str(config.uri)
46+
.await?
47+
.database(&config.name);
48+
49+
// Create index for fields `id` and `last_notified`.
50+
let index_model = IndexModel::builder()
51+
.keys(doc! {
52+
"id": 1,
53+
"last_notified": 1,
54+
})
55+
.build();
56+
57+
db.collection::<AlertContext>(PENDING)
58+
.create_index(index_model, None)
59+
.await?;
60+
61+
Ok(Database { db })
62+
}
63+
/// Simply checks if a connection could be established to the database.
64+
pub async fn connectivity_check(&self) -> Result<()> {
65+
self.db
66+
.list_collection_names(None)
67+
.await
68+
.map_err(|err| anyhow!("Failed to connect to database: {:?}", err))
69+
.map(|_| ())
4970
}
5071
pub async fn insert_alerts(&self, alerts: &[AlertContext]) -> Result<()> {
5172
if alerts.is_empty() {
@@ -122,8 +143,8 @@ impl Database {
122143
history
123144
.insert_one(
124145
AlertAcknowledged {
125-
alert: alert,
126-
acked_by: acked_by,
146+
alert,
147+
acked_by,
127148
acked_timestamp: unix_time(),
128149
},
129150
None,
@@ -154,7 +175,7 @@ impl Database {
154175
let now = unix_time();
155176
doc! {
156177
"last_notified": {
157-
"$lt": now - escalation_window,
178+
"$lt": (now - escalation_window) as i64,
158179
}
159180
}
160181
} else {

src/lib.rs

+27-8
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ extern crate serde;
77
#[macro_use]
88
extern crate async_trait;
99

10-
use actix::clock::sleep;
1110
use actix::{prelude::*, SystemRegistry};
12-
use std::time::Duration;
1311
use structopt::StructOpt;
12+
use tokio::sync::mpsc::unbounded_channel;
1413

1514
mod database;
1615
mod matrix;
@@ -87,7 +86,7 @@ pub async fn run() -> Result<()> {
8786
"Opening config at {}",
8887
std::fs::canonicalize(&cli.config)?
8988
.to_str()
90-
.ok_or(anyhow!("Path to config is not valid unicode"))?
89+
.ok_or_else(|| anyhow!("Path to config is not valid unicode"))?
9190
);
9291

9392
let content = std::fs::read_to_string(&cli.config)?;
@@ -120,14 +119,21 @@ pub async fn run() -> Result<()> {
120119
let opt_db = if let Some(db_conf) = config.database {
121120
info!("Setting up database {:?}", db_conf);
122121
let db = database::Database::new(db_conf).await?;
122+
db.connectivity_check().await?;
123+
123124
Some(db)
124125
} else {
125126
warn!("Skipping database setup");
126127
None
127128
};
128129

130+
// Setup channels for shutdown signals. The Processor and the API server
131+
// task (below) hold the _sender_. Any message sent to it indicates a full shutdown
132+
// of the service, which is handled at the end of this function.
133+
let (tx, mut recv) = unbounded_channel();
134+
129135
info!("Adding message processor to system registry");
130-
let proc = processor::Processor::new(opt_db, escalation_window, should_escalate);
136+
let proc = processor::Processor::new(opt_db, escalation_window, should_escalate, tx.clone());
131137
SystemRegistry::set(proc.start());
132138

133139
info!("Initializing Matrix client");
@@ -137,9 +143,22 @@ pub async fn run() -> Result<()> {
137143
SystemRegistry::set(matrix.start());
138144

139145
info!("Starting API server");
140-
webhook::run_api_server(&config.listener).await?;
141-
142-
loop {
143-
sleep(Duration::from_secs(u64::MAX)).await;
146+
let tx_api = tx.clone();
147+
let server = webhook::run_api_server(&config.listener).await?;
148+
149+
// Run server in seperate task, send a shutdown signal in case of an error.
150+
tokio::spawn(async move {
151+
if let Err(err) = server.await {
152+
error!("Failed to run API server: {:?}", err);
153+
tx_api.send(()).unwrap();
154+
}
155+
});
156+
157+
// On shutdown signal, shutdown service.
158+
while let Some(_) = recv.recv().await {
159+
warn!("Shutting down service...");
160+
return Ok(());
144161
}
162+
163+
Ok(())
145164
}

src/matrix.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl MatrixClient {
4444
let url = Url::parse(&config.homeserver)?;
4545
let client = Client::new_with_config(url, client_config)?;
4646

47-
info!("Login with credentials");
47+
info!("Logging in with credentials...");
4848
client
4949
.login(
5050
&config.username,
@@ -79,7 +79,7 @@ impl MatrixClient {
7979
client
8080
.sync_token()
8181
.await
82-
.ok_or(anyhow!("Failed to acquire sync token"))?,
82+
.ok_or_else(|| anyhow!("Failed to acquire sync token"))?,
8383
);
8484

8585
// Sync in background.
@@ -137,7 +137,7 @@ impl Handler<NotifyAlert> for MatrixClient {
137137
return Ok(());
138138
}
139139

140-
let current_room_id = rooms.get(0).unwrap_or(rooms.last().unwrap());
140+
let current_room_id = rooms.get(0).unwrap_or_else(|| rooms.last().unwrap());
141141

142142
let mut msg = String::from("⚠️ Alert occurred!\n\n");
143143

@@ -180,11 +180,11 @@ impl Handler<Escalation> for MatrixClient {
180180
// Determine which rooms to send the alerts to.
181181
let current_room_id = rooms
182182
.get(notify.escalation_idx.saturating_sub(1))
183-
.unwrap_or(rooms.last().unwrap());
183+
.unwrap_or_else(|| rooms.last().unwrap());
184184

185185
let next_room_id = rooms
186186
.get(notify.escalation_idx)
187-
.unwrap_or(rooms.last().unwrap());
187+
.unwrap_or_else(|| rooms.last().unwrap());
188188

189189
let is_last = current_room_id == next_room_id;
190190

@@ -286,9 +286,11 @@ impl EventHandler for Listener {
286286
let cmd = match msg_body.trim() {
287287
"pending" => Command::Pending,
288288
"help" => Command::Help,
289-
txt @ _ => {
290-
if txt.to_lowercase().starts_with("ack") || txt.to_lowercase().starts_with("acknowledge") {
291-
let parts: Vec<&str> = txt.split(" ").collect();
289+
txt => {
290+
if txt.to_lowercase().starts_with("ack")
291+
|| txt.to_lowercase().starts_with("acknowledge")
292+
{
293+
let parts: Vec<&str> = txt.split(' ').collect();
292294
if parts.len() == 2 {
293295
if let Ok(id) = AlertId::from_str(parts[1]) {
294296
Command::Ack(id, event.sender.to_string())
@@ -316,7 +318,7 @@ impl EventHandler for Listener {
316318

317319
// Prepare action type.
318320
let action = UserAction {
319-
escalation_idx: escalation_idx,
321+
escalation_idx,
320322
command: cmd,
321323
};
322324

@@ -341,7 +343,7 @@ impl EventHandler for Listener {
341343

342344
match res(room, event.clone()).await {
343345
Ok(_) => {}
344-
Err(err) => error!("{:?}", err),
346+
Err(err) => error!("Error when trying to process Matrix message {:?}", err),
345347
}
346348
}
347349
}

0 commit comments

Comments
 (0)