Skip to content

Commit e6ab8ce

Browse files
committed
made raft more testable
moved main function to tokio instead axtic
1 parent de090f5 commit e6ab8ce

16 files changed

+352
-180
lines changed

cli/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ reqwest = { version = "0.11", features = ["json"] }
1010
tokio = { version = "1", features = ["full"] }
1111
clap = { version = "3.0", features = ["derive"] }
1212
uuid = { version = "0.8.2", features = ["serde", "v4"] }
13-
rand = "0.8.5"
13+
rand = "0.8.5"
14+
futures = "0.3.30"

cli/src/main.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use clap::{Parser, Subcommand};
2-
use rand::distributions::{Alphanumeric};
2+
use futures::stream::{self, StreamExt};
3+
use rand::distributions::Alphanumeric;
34
use rand::{thread_rng, Rng};
45
use std::convert::TryInto;
56
use std::{thread, time};
6-
7+
use tokio::io::AsyncRead;
8+
use tokio::sync::SemaphorePermit;
79
mod null_client;
810

911
#[derive(Parser)]
@@ -55,6 +57,13 @@ enum Commands {
5557
host: String,
5658
},
5759

60+
BenchDisk {
61+
#[clap(long, default_value = "localhost")]
62+
host: String,
63+
#[clap(long, default_value = "8001")]
64+
port: String,
65+
},
66+
5867
Compact {
5968
#[clap(long, default_value = "localhost")]
6069
host: String,
@@ -79,9 +88,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7988
let args = Cli::parse();
8089

8190
match &args.command {
82-
Commands::Put { key, value, host, port } => {
91+
Commands::Put {
92+
key,
93+
value,
94+
host,
95+
port,
96+
} => {
8397
println!("putting data {}", value);
84-
println!("connecting at: {}",format!("http://{}:{}/{}/{}", host, port, "v1/data", key));
98+
println!(
99+
"connecting at: {}",
100+
format!("http://{}:{}/{}/{}", host, port, "v1/data", key)
101+
);
85102
let client = reqwest::Client::new();
86103
let data = value.clone();
87104
let resp = client
@@ -111,7 +128,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
111128
Commands::Get { key, host, port } => {
112129
let then = time::Instant::now();
113130
println!("getting data for key {}", key);
114-
let resp = reqwest::get(format!("http://{}:{}/{}/{}\n", host,port, "v1/data", key))
131+
let resp = reqwest::get(format!("http://{}:{}/{}/{}\n", host, port, "v1/data", key))
115132
.await?
116133
.text()
117134
.await?;
@@ -142,11 +159,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
142159
benchmark(*records, *duration, host.to_string()).await;
143160
}
144161

162+
Commands::BenchDisk { host, port } => {
163+
println!("benchmarking disk");
164+
load_disks(1_000, 32, host.to_string(), port.to_string()).await;
165+
}
166+
145167
Commands::Compact { host, port } => {
146168
println!("Making a compation request!");
147169
let _resp = reqwest::get(format!(
148170
"http://{}:{}/{}\n",
149-
host,port, "v1/management/compact"
171+
host, port, "v1/management/compact"
150172
))
151173
.await?
152174
.text()
@@ -202,6 +224,23 @@ async fn benchmark(records: i32, duration: i32, host: String) -> Option<()> {
202224
return Some(());
203225
}
204226

227+
async fn load_disks(records: u32, threads: u32, host: String, port: String) {
228+
let client = null_client::NullClient::new(format!("http://{host}:{port}/v1/data").to_string());
229+
230+
let _ = futures::stream::iter(0..records)
231+
.map(|_| {
232+
let c = client.clone();
233+
tokio::spawn(async move {
234+
let _ = c.post(
235+
"testdata".to_string(),
236+
"This is the value of our record, it's awesome, not super short but kind of short."
237+
.to_string()).await;
238+
})
239+
})
240+
.buffered(threads as usize)
241+
.count()
242+
.await;
243+
}
205244
fn get_random_string(length: usize) -> String {
206245
let chars: Vec<u8> = rand::thread_rng()
207246
.sample_iter(&Alphanumeric)

cli/src/null_client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use reqwest::{Error, Response};
22

3+
#[derive(Debug, Clone)]
34
pub struct NullClient {
45
url: String,
56
client: reqwest::Client,

database/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ serde = { version = "1.0.130", features = ["derive"] }
2222
serde_json = "1.0.70"
2323
tempfile = "3.8.0"
2424
tokio = { version = "1.26.0", features = ["full"] }
25+
tokio-util = "0.7.11"
2526
tonic = { version = "0.10.2" }
2627

2728

database/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM rust:1.74.1 as builder
1+
FROM rust:1.79.0 as builder
22
#! /bin/bash
33
RUN apt update
44
RUN apt install -y protobuf-compiler

database/compose.yaml

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,34 @@
11
services:
22
node1:
33
build: .
4-
command: 'null-db --roster=node2:3001,node3:3002 --id=3000 --encoding=json'
4+
command: "null-db --roster=node1:3001,node2:3002,node3:3003 --id=node1:3001 --encoding=json"
55
environment:
66
- RUST_LOG=info
77
ports:
8-
- 3000:3000
8+
- 3001:3001
99
- 30001:8080
1010
volumes:
1111
- ./node1:/db
12-
develop:
13-
watch:
14-
- action: rebuild
15-
path: .
1612

1713
node2:
1814
build: .
19-
command: 'null-db --roster=node1:3000,node3:3002 --id=3001 --encoding=json'
15+
command: "null-db --roster=node1:3001,node2:3002,node3:3003 --id=node2:3002 --encoding=json"
2016
environment:
2117
- RUST_LOG=info
2218
ports:
23-
- 3001:3001
24-
- 30011:8080
19+
- 3002:3002
20+
- 30002:8080
2521
volumes:
2622
- ./node2:/db
27-
develop:
28-
watch:
29-
- action: rebuild
30-
path: .
3123

3224
node3:
3325
build: .
34-
command: 'null-db --roster=node2:3001,node1:3000 --id=3002 --encoding=json'
26+
command: "null-db --roster=node1:3001,node2:3002,node3:3003 --id=node3:3003 --encoding=json"
3527
environment:
3628
- RUST_LOG=info
3729
ports:
38-
- 3002:3002
39-
- 30021:8080
30+
- 3003:3003
31+
- 30003:8080
4032
volumes:
4133
- ./node3:/db
4234
develop:
43-
watch:
44-
- action: rebuild
45-
path: .

database/hack/reset.sh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
# Reset the database
3+
# Usage: ./reset.sh
4+
# Author: Arun Prakash Jana
5+
# Date: 26th May 2020
6+
# Version: 1.0
7+
# Description: This script will reset the database
8+
9+
# remove all files from the node directories
10+
rm -rf node1/*
11+
rm -rf node2/*
12+
rm -rf node3/*

database/hack/run_3.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#! /bin/bash
2+
# Run 3 Null DB's with 3 different seeds
3+
4+
export RUST_LOG=info
5+
6+
#node 1
7+
cargo run -- --roster=localhost:3002,localhost:3003 --id=3001 --port=8001 --encoding=json --dir=node1/ &
8+
#node 2
9+
cargo run -- --roster=localhost:3001,localhost:3003 --id=3002 --port=8002 --encoding=json --dir=node2/ &
10+
#node 3
11+
cargo run -- --roster=localhost:3001,localhost:3002 --id=3003 --port=8003 --encoding=json --dir=node3/ &
12+
13+
wait

database/src/file_compactor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use super::file::record;
22
use super::utils;
33
use crate::index::generate_index_for_segment;
4-
use crate::nulldb::NullDB;
4+
use crate::nulldb::{DatabaseLog, NullDB};
55
use actix_web::web::Data;
66
use anyhow::anyhow;
77
use std::collections::HashMap;

database/src/main.rs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ use actix_web::{
77
use clap::Parser;
88
use errors::NullDbReadError;
99
use file_reader::EasyReader;
10-
use nulldb::{Config, NullDB};
10+
use nulldb::{Config, DatabaseLog, NullDB};
1111
use raft::grpcserver::RaftEvent;
12-
use std::path::PathBuf;
13-
use tokio::sync::mpsc::Sender;
12+
use std::{path::PathBuf, time::Duration};
13+
use tokio::{
14+
signal::unix::{signal, SignalKind},
15+
sync::mpsc::Sender,
16+
};
17+
use tokio_util::sync::CancellationToken;
1418

1519
mod errors;
1620
mod file;
@@ -25,20 +29,30 @@ mod utils;
2529
#[clap(author, version, about, long_about = None)]
2630
struct Args {
2731
#[clap(short, long)]
32+
#[arg(default_value = "false")]
2833
compaction: bool,
34+
2935
#[clap(short, long)]
3036
#[arg(default_value=get_work_dir().into_os_string())]
3137
dir: PathBuf,
38+
3239
#[clap(short, long)]
33-
roster: String,
40+
roster: Option<String>,
41+
3442
#[clap(short, long)]
43+
#[arg(default_value = "localhost:3000")]
3544
id: String,
45+
3646
#[clap(short, long)]
3747
#[arg(default_value = "html")]
3848
encoding: String,
49+
50+
#[clap(short, long)]
51+
#[arg(default_value = "8080")]
52+
port: u16,
3953
}
4054

41-
#[actix_web::main]
55+
#[tokio::main]
4256
async fn main() -> Result<(), std::io::Error> {
4357
env_logger::init();
4458
let Args {
@@ -47,24 +61,58 @@ async fn main() -> Result<(), std::io::Error> {
4761
roster,
4862
id,
4963
encoding,
64+
port,
5065
} = Args::parse();
5166

52-
let nodes = roster.split(",").map(Into::into).collect::<Vec<String>>();
67+
let nodes = parse_roster(roster);
5368

5469
let (sender, receiver) = tokio::sync::mpsc::channel(1000);
5570
let config = Config::new(dir, compaction, encoding);
5671
let raft_config = raft::config::RaftConfig {
5772
roster: nodes,
5873
candidate_id: id,
5974
};
75+
76+
let mut sigterm = signal(SignalKind::terminate()).unwrap();
77+
let mut sigquit = signal(SignalKind::quit()).unwrap();
78+
let mut sigint = signal(SignalKind::interrupt()).unwrap();
79+
let cancel = CancellationToken::new();
80+
let cancel_clone = cancel.clone();
81+
82+
let tx = sender.clone();
83+
let sender_ark = Data::new(sender);
84+
6085
let db_mutex = create_db(config).expect("could not start db");
6186
let mut raft = raft::RaftNode::new(raft_config, receiver, db_mutex);
62-
let tx = sender.clone();
87+
let db_thread = tokio::spawn(async move {
88+
let _ = raft.run(tx, cancel_clone).await;
89+
});
90+
println!("listening for signals");
6391
tokio::spawn(async move {
64-
let _ = raft.run(tx).await;
92+
tokio::select! {
93+
_ = sigterm.recv() => {
94+
println!("Received SIGTERM");
95+
cancel.cancel();
96+
db_thread.await.unwrap();
97+
}
98+
_ = sigint.recv() => {
99+
println!("Received SIGINT");
100+
cancel.cancel();
101+
db_thread.await.unwrap();
102+
}
103+
_ = sigquit.recv() => {
104+
println!("Received SIGQUIT");
105+
cancel.cancel();
106+
db_thread.await.unwrap();
107+
}
108+
_ = tokio::signal::ctrl_c() => {
109+
println!("Received ctrl_c");
110+
cancel.cancel();
111+
db_thread.await.unwrap();
112+
}
113+
}
65114
});
66115

67-
let sender_ark = Data::new(sender);
68116
println!("starting web server");
69117
HttpServer::new(move || {
70118
App::new()
@@ -75,9 +123,17 @@ async fn main() -> Result<(), std::io::Error> {
75123
.service(compact_data)
76124
.service(get_index)
77125
})
78-
.bind("0.0.0.0:8080")?
126+
.bind(format!("0.0.0.0:{port}"))?
79127
.run()
80-
.await
128+
.await;
129+
Ok(())
130+
}
131+
132+
fn parse_roster(roster: Option<String>) -> Option<Vec<String>> {
133+
match roster {
134+
Some(r) => Some(r.split(",").map(Into::into).collect::<Vec<String>>()),
135+
None => None,
136+
}
81137
}
82138

83139
fn get_work_dir() -> PathBuf {
@@ -116,7 +172,6 @@ pub async fn put_value_for_key(
116172
key: web::Path<String>,
117173
req_body: String,
118174
) -> impl Responder {
119-
println!("putting data {req_body}");
120175
let (tx, receiver) = tokio::sync::oneshot::channel();
121176
let event = RaftEvent::NewEntry {
122177
key: key.into_inner(),
@@ -185,7 +240,7 @@ mod tests {
185240
let tmp_dir = TempDir::new().expect("could not get temp dir");
186241
let _workdir = setup_base_data(tmp_dir.path(), cargo_path);
187242

188-
let config = Config::new(tmp_dir.into_path(), false);
243+
let config = Config::new(tmp_dir.into_path(), false, "html".to_string());
189244
let db = create_db(config).expect("could not start database");
190245

191246
let result = db.get_value_for_key("name").expect("should retrive value");

0 commit comments

Comments
 (0)