Skip to content

Commit bb5537d

Browse files
committedDec 27, 2023
Basic modifications..
1 parent 7b8f2d4 commit bb5537d

11 files changed

+557
-70
lines changed
 

‎Cargo.lock

+264-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ edition = "2021"
99
axum = { version = "0.7.2", features = ["http2", "multipart", "http1"] }
1010
axum-core = "0.4.1"
1111
bytes = "1.5.0"
12+
config = "0.13.4"
1213
dashmap = "5.5.3"
1314
futures = "0.3.29"
1415
futures-stream = "0.0.0"

‎settings.toml

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
3+
data = "./data"
4+
file_size = 1_000_0000

‎src/authorization.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use axum::http::{HeaderMap, HeaderValue, header::*};
2+
3+
use crate::errors::{FragmentError, HeaderErrors};
4+
5+
6+
pub async fn ensure_headers(mut headers: &HeaderMap) -> Result<(), FragmentError> {
7+
8+
let header = {
9+
let fields = [
10+
ACCESS_CONTROL_ALLOW_ORIGIN,
11+
CONTENT_LENGTH,
12+
ACCEPT_ENCODING];
13+
14+
for assumed_header in fields.iter() {
15+
if let Some(val) = headers.get(assumed_header){
16+
continue;
17+
}
18+
}
19+
};
20+
21+
22+
Ok(header_value)
23+
24+
}
25+
26+
27+
// return the value of the associated field in the headermap
28+
pub async fn extract_header_fields(headers: &HeaderMap, header_field: &HeaderName) -> Result<HeaderValue, FragmentError> {
29+
30+
if let Some(val) = headers.get(header_field) {
31+
return Ok(val.to_owned());
32+
}
33+
else {
34+
return Err(HeaderErrors::HeaderFieldMissing(header_field));
35+
}
36+
}

‎src/config.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use std::path::PathBuf;
2+
3+
use config::Config;
4+
5+
6+
#[derive(Debug)]
7+
pub struct LoadConfig {
8+
path: PathBuf,
9+
config_data: Config
10+
}
11+
12+
impl Default for LoadConfig {
13+
fn default() -> Self {
14+
15+
let path_name = "./settings.toml";
16+
let path = PathBuf::from(path_name);
17+
18+
let config = Config::builder()
19+
.add_source(config::File::with_name(path_name))
20+
.build()
21+
.expect("builder unable to parse the config file");
22+
23+
Self {
24+
path: path,
25+
config_data: config
26+
}
27+
}
28+
}
29+
30+
impl LoadConfig {
31+
fn new(source: impl AsRef<str>) -> Self {
32+
33+
let path_name = source.as_ref();
34+
let path = PathBuf::from(path_name);
35+
36+
let config = Config::builder()
37+
.add_source(config::File::with_name(path_name))
38+
.build()
39+
.expect("builder unable to parse the config file");
40+
41+
Self {
42+
path: path,
43+
config_data: config
44+
}
45+
}
46+
}

‎src/constants.rs

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
3+
4+
5+
pub const File_SIZE_LIMIT: u64 = (78 * 1024 * 1024) * 1024 ; // 78 gigs

‎src/errors.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{panic::Location, backtrace::Backtrace};
1+
use std::{panic::Location, backtrace::Backtrace, borrow::Cow};
22

33
use axum::{http::{StatusCode, HeaderMap, Response, self, HeaderValue},response::IntoResponse, body::Body};
44
use thiserror::Error;
@@ -18,6 +18,24 @@ use std::convert::From;
1818
// }
1919
// }
2020

21+
pub trait OptionExt<V, E, T>
22+
{
23+
fn to_result(self, method: T) -> Result<V, E>;
24+
}
25+
26+
impl<V, E, T> OptionExt<V, E, T> for Option<V>
27+
28+
where
29+
E: Into<ErrorStates>,
30+
T: FnOnce() -> E
31+
{
32+
fn to_result(self, method: T) -> Result<V, E>
33+
{
34+
let err = method();
35+
Err(err)
36+
}
37+
}
38+
2139
#[derive(Debug)]
2240
pub struct ErrorReport {
2341
reason: ErrorStates,
@@ -125,6 +143,11 @@ pub enum ErrorStates {
125143
InternalError(#[from] axum::Error),
126144

127145

146+
#[http(code = 404, message = "Missing Header Field")]
147+
#[error("Missing header field")]
148+
RequestError(#[from] HeaderErrors<'static>),
149+
150+
128151
// #[http(code = 500, message = "server went into undesired mode")]
129152
// #[error("internal socket Error")]
130153
// SocketError(#[from] ),
@@ -138,3 +161,32 @@ pub enum ErrorStates {
138161

139162

140163
}
164+
165+
166+
#[derive(Debug, thiserror::Error, HttpError)]
167+
pub enum HeaderErrors<'a> {
168+
169+
#[error("Missing header field: {0:?}")]
170+
HeaderFieldMissing(Cow<'a, str>),
171+
172+
#[error("Field mismatch: {0:?}")]
173+
FieldMismatch(Cow<'a, str>),
174+
175+
#[error("Invalid field input: {0:?}")]
176+
InvalidField(Cow<'a, str>),
177+
178+
#[error("Invalid field input: {0:?}")]
179+
HeaderUnwrapError(#[from] http::header::ToStrError),
180+
181+
// #[error("Invalid field input: {0:?}")]
182+
// InvalidField(Cow<'a, str>)
183+
}
184+
185+
186+
// impl<'a, T> std::convert::From<T> for HeaderErrors<'a>
187+
// where T: std::error::Error {
188+
189+
// fn from(val: T) -> Self {
190+
// val.into()
191+
// }
192+
// }

‎src/file.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{path::PathBuf, usize};
1+
use std::{path::PathBuf, usize, borrow::Cow};
22
use serde::{Serialize, Deserialize};
33
use tokio::sync::oneshot;
44
use uuid::Uuid;
@@ -8,33 +8,40 @@ use uuid::Uuid;
88

99
#[derive(Debug, Serialize, Deserialize )]
1010
pub struct FileObject {
11-
path: PathBuf,
11+
pub path: PathBuf,
1212
state: UploadState,
1313
file_size: usize,
1414
name: String,
1515
uuid: Uuid,
1616
// hash: [u8; 256],
17-
// sender: oneshot::Sender<usize>
1817
}
1918

2019
impl FileObject {
2120

22-
pub fn new(path: impl Into<PathBuf>, size: usize, ) -> Self {
21+
pub fn new(path: impl Into<PathBuf>, size: usize, name: impl ToString) -> Self {
2322
Self {
24-
path: path.into(),
23+
path: path.into(),
2524
state: UploadState::UnInit,
2625
file_size: size,
27-
name: "".to_string(),
26+
name: name.to_string(),
2827
uuid: Uuid::new_v4(),
2928
// hash: [0_u8; 256]
3029
}
3130
}
3231

33-
34-
pub fn update_state(&mut self, state: UploadState) {
32+
#[inline(always)]
33+
pub fn set_state(&mut self, state: UploadState) {
3534
self.state = state;
3635
}
3736

37+
#[inline(always)]
38+
pub fn get_uuid(&self) -> Cow<'_, Uuid> {
39+
Cow::Borrowed(&(self.uuid))
40+
}
41+
42+
pub fn output_file_path(&self) -> PathBuf {
43+
self.path.join(self.name)
44+
}
3845

3946
}
4047

@@ -43,10 +50,10 @@ impl FileObject {
4350
#[derive(Debug, Serialize, Deserialize)]
4451
pub enum UploadState {
4552
UnInit,
46-
Failed,
4753
Broken(usize),
4854
Resume(usize),
4955
Complete,
56+
Failed,
5057
}
5158

5259

‎src/handlers.rs

+111-53
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,147 @@
1-
use std::{sync::Arc, path::PathBuf, str::FromStr};
1+
use std::{sync::Arc, path::PathBuf, str::FromStr, borrow::Cow};
22

33
use dashmap::DashMap;
4-
use axum::{http::{Request, HeaderValue}, body::Body, response::IntoResponse, Extension};
5-
use tokio::task::JoinHandle;
6-
use uuid::{Uuid, uuid};
4+
use axum::{http::{Request, HeaderValue, HeaderMap, header::{*}}, body::Body, response::IntoResponse, Extension};
5+
use uuid::Uuid;
76
use futures::stream::StreamExt;
8-
use tokio::io::{AsyncWrite, AsyncWriteExt};
7+
use tokio::io::AsyncWriteExt;
8+
use crate::{errors::{OptionExt, ErrorStates, HeaderErrors}, authorization::extract_header_fields};
99

1010
use crate::{file::FileObject, utils::generate_file, FragmentError};
1111

1212

1313
// Task handle for keeping track of all the spawned instance on the runtime
14-
pub type JobHandle<E> = Arc<DashMap<Uuid, JoinHandle<Result<FileObject, E>>>>;
14+
pub type JobHandle<E> = Arc< DashMap<Uuid, Result<FileObject, E>>>;
1515

16+
/*
1617
17-
pub async fn init_upload_process(
18-
mut ext: Extension<JobHandle<FragmentError>>,
19-
mut req: Request<Body>,
20-
) -> Result<impl IntoResponse, FragmentError> {
18+
*/
19+
async fn streamer_uploader(
20+
mut body: Body,
21+
mut file: FileObject
22+
) -> Result<(), FragmentError> {
23+
24+
let streamer = body.into_data_stream();
2125

22-
let (header, body) = req.into_parts();
26+
let mut buf_size = 1_000_0000; //REMOVE: allocated buffer_size
2327

24-
let uuid = uuid::Uuid::new_v4();
25-
let mut path = PathBuf::from("./data/");
2628

27-
//******************************** */
28-
let file_name = header.headers.get("FileName").unwrap().to_str().unwrap().to_string();
29-
// let file_size = header.headers
30-
// .get("FileSize")
31-
// .unwrap()
32-
// .as_ref()
33-
// .parse::<usize>()
34-
// .unwrap();
29+
let file_path = file.
30+
31+
let mut buf_writer = generate_file( streamer, buf_size).await?;
32+
33+
let mut _chunk_counter = 0;
34+
let mut _byte_counter = 0;
35+
36+
println!("we entered the stream");
37+
38+
while let Some(chunk) = stream.next().await {
39+
let bytes = chunk?;
40+
41+
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
42+
_byte_counter += buf_writer.write(&bytes).await?;
43+
_chunk_counter += 1;
44+
}
45+
46+
drop(stream);
3547

36-
let file_size = 1000000;
48+
let _ = buf_writer.shutdown().await?;
49+
}
3750

38-
let mut file_obj = FileObject::new(path.clone(), file_size);
3951

40-
println!("task called");
52+
//validating the header data of the
53+
fn validate_headers(header: HeaderValue) -> Result<(), HeaderErrors<'static>> {
54+
55+
if header.is_empty() {
56+
return Err(HeaderErrors::HeaderFieldMissing(Cow::Borrowed("FileName")));
57+
}
4158

42-
// let handle = tokio::task::spawn(async move {
43-
let mut stream = body.into_data_stream();
44-
//=========================================================
45-
// let mut file_name = PathBuf::from("xxxxx-yyyyy.txt");
46-
let mut buf_size = 1_000_0000; //allocated buffer_size
47-
//=========================================================
48-
let mut buf_writer = generate_file(path.join(PathBuf::from_str(file_name.as_str()).unwrap()), buf_size).await?;
59+
let str_header = header.to_str()?;
60+
61+
if str_header.chars().all(|e| !e.is_ascii() || !e.is_alphanumeric() || e.is_ascii_hexdigit()) {
62+
return Err(HeaderErrors::InvalidField(Cow::Borrowed(str_header)));
63+
}
4964

50-
let mut _chunk_counter = 0;
51-
let mut _byte_counter = 0;
65+
Ok(())
66+
}
5267

53-
println!("we entered the stream");
5468

55-
while let Some(chunk) = stream.next().await{
56-
let bytes = chunk?;
57-
// println!("chunk accepted: {} len: {}", _chunk_counter, bytes.len());
58-
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
59-
_byte_counter += buf_writer.write(&bytes).await?;
60-
_chunk_counter += 1;
61-
}
6269

63-
println!("stream closed: bytes rem");
6470

65-
// drop(stream);
66-
67-
let _ = buf_writer.shutdown().await?;
71+
pub async fn init_upload_process(
72+
Extension(ext): Extension<JobHandle<FragmentError>>,
73+
req: Request<Body>,
74+
) -> Result<impl IntoResponse, FragmentError> {
75+
76+
let (parts, body) = req.into_parts();
77+
let headers = parts.headers;
78+
79+
//=========================================================
80+
let headers_names = [
81+
"FileName",
82+
"Content-Length",
83+
];
84+
85+
let extracted_headers = futures::future::join_all(headers_names
86+
.iter()
87+
.map(|field_name| {
88+
let field = HeaderName::from_str(field_name).expect("Invalid header name");
89+
extract_header_fields(&headers, &field)
90+
})).await;
91+
92+
//=========================================================
93+
let (path, file_size, file_name) = {
94+
let file_name = extracted_headers[0]?.to_owned();
95+
let file_size = extracted_headers[1]?.to_owned();
96+
97+
let _ = validate_headers(file_name)?;
98+
let _ = validate_headers(file_size)?;
99+
100+
let path = "./data"; // TODO: change path to const.
101+
102+
let file_size = file_size.to_str()
103+
.map_err(|e| HeaderErrors::HeaderUnwrapError(e))?
104+
.parse::<usize>()
105+
.unwrap();
106+
107+
let file_name = file_name.to_str()
108+
.map_err(|e| HeaderErrors::HeaderUnwrapError(e))?;
109+
110+
(path, file_size, file_name)
111+
};
68112

69-
// println!("stream closed");
70-
// Ok(file_obj)
71-
// });
113+
let mut file_obj = FileObject::new(path, file_size, file_name);
114+
//=========================================================
72115

73-
// let _ = ext.entry(uuid).or_insert(handle); //return error if stream is already present
116+
let _ = streamer_uploader(body, file_obj).await?;
117+
118+
119+
120+
let _ = ext.entry(uuid).or_insert(handle); //return error if stream is already present
74121

75-
println!("task exited");
122+
// println!("task exited");
76123

77-
Ok(uuid.as_hyphenated().to_string())
124+
// Ok(uuid.as_hyphenated().to_string())
125+
126+
Ok()
78127

79128
}
80129

81130

131+
132+
//Resume the interrupted upload process
133+
pub async fn resume_upload(
134+
Extension(ext): Extension<JobHandle<FragmentError>>,
135+
req: Request<Body>
136+
) -> Result<impl IntoResponse, FragmentError> {
137+
138+
}
139+
140+
// Handle for acquring the status of the In_progress, discarded or cancelled upload process
82141
pub async fn task_progress(
83142
mut ext: Extension<JobHandle<FragmentError>>,
84143
mut req: Request<Body>,
85-
) -> impl IntoResponse
86-
// where E: Into<FragmentError>
144+
) -> Result<impl IntoResponse, FragmentError>
87145
{
88146

89147
let (mut headers, body) = req.into_parts();

‎src/main.rs

+13-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ mod file;
99
mod errors;
1010
mod utils;
1111
mod handlers;
12+
mod authorization;
13+
mod config;
1214

13-
#[tokio::main]
14-
async fn main() -> Result<(), FragmentError> {
15+
async fn tokio_main() -> Result<(), FragmentError> {
1516

1617
let addr = [0_u8; 4].into();
1718
let port = 2053;
@@ -38,5 +39,14 @@ async fn main() -> Result<(), FragmentError> {
3839
Ok(())
3940
}
4041

42+
fn main() {
43+
let mut runtime = tokio::runtime::Builder::new_multi_thread()
44+
.enable_all()
45+
.global_queue_interval(40)
46+
.build()
47+
.unwrap();
4148

42-
49+
let tokio_main_process = tokio_main();
50+
51+
runtime.block_on(tokio_main_process);
52+
}

‎src/utils.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use tokio::fs::File;
66
use tokio::io::BufWriter;
77

88
use crate::FragmentError;
9-
use crate::handlers::{JobHandle, init_upload_process, task_progress};
9+
use crate::config::LoadConfig;
10+
use crate::handlers::{JobHandle, init_upload_process, task_progress, resume_upload};
1011

1112

1213
pub async fn create_router(
@@ -16,7 +17,8 @@ pub async fn create_router(
1617
let router = Router::new()
1718
.route("/upload_file", get(init_upload_process))
1819
.route("/status", get(task_progress))
19-
.layer(Extension(ext));
20+
.route("/resume_upload", get(resume_upload))
21+
.layer(Extension(ext));
2022

2123
Ok(router)
2224
}
@@ -31,4 +33,8 @@ pub async fn generate_file(path: impl AsRef<Path>, size: usize) -> Result<BufWri
3133
let file = File::create(path).await?;
3234
let buffered_file = BufWriter::with_capacity(size, file);
3335
Ok(buffered_file)
36+
}
37+
38+
pub async fn load_config(path: impl AsRef<str>) -> Result<LoadConfig, FragmentError> {
39+
Ok()
3440
}

0 commit comments

Comments
 (0)
Please sign in to comment.