Skip to content

Commit b4e1bee

Browse files
committed
Add native database abstraction
1 parent ad9555a commit b4e1bee

16 files changed

Lines changed: 1471 additions & 2282 deletions

Cargo.lock

Lines changed: 617 additions & 1079 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@ panic = "abort"
1818
codegen-units = 2
1919

2020
[dependencies]
21-
sqlx = { version = "0.9.0", default-features = false, features = [
22-
"runtime-tokio",
23-
"tls-rustls",
24-
"migrate",
25-
"sqlite",
26-
"postgres",
27-
"mysql",
28-
"chrono",
29-
"bigdecimal",
30-
"json",
31-
"uuid",
32-
] }
33-
sqlx-sqlserver = { version = "0.0.2", features = ["migrate"] }
34-
sqlx-odbc = { version = "0.0.1", features = ["runtime-tokio"] }
21+
tokio-postgres = { version = "0.7.17", features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] }
22+
mysql_async = { version = "0.37.0", default-features = false, features = ["default-rustls-ring", "chrono", "bigdecimal"] }
23+
tiberius = { version = "0.12.3", default-features = false, features = ["tds73", "tokio", "tokio-util", "rustls", "chrono", "bigdecimal"] }
24+
tokio-rusqlite = { version = "0.7.0", features = ["bundled", "functions", "load_extension", "collation", "chrono", "blob"] }
25+
odbc-api = { version = "28.0.0", default-features = false, features = ["odbc_version_3_80"] }
26+
deadpool = { version = "0.13.0", features = ["rt_tokio_1"] }
27+
uuid = "1"
3528
chrono = "0.4.23"
3629
actix-web = { version = "4", features = ["rustls-0_23", "cookies"] }
3730
percent-encoding = "2.2.0"
@@ -70,7 +63,7 @@ sha2 = "0.11"
7063
rustls-acme = "0.15"
7164
dotenvy = "0.15.7"
7265
csv-async = { version = "1.2.6", features = ["tokio"] }
73-
rustls = { version = "0.23" } # keep in sync with actix-web, awc, rustls-acme, and sqlx
66+
rustls = { version = "0.23" } # keep in sync with actix-web, awc, and rustls-acme
7467
rustls-native-certs = "0.8.1"
7568
awc = { version = "3", features = ["rustls-0_23-webpki-roots"] }
7669
clap = { version = "4.5.17", features = ["derive"] }
@@ -94,7 +87,7 @@ opentelemetry-semantic-conventions = { version = "0.32", features = ["semconv_ex
9487

9588
[features]
9689
default = []
97-
odbc-static = ["sqlx-odbc/vendored-unix-odbc"]
90+
odbc-static = ["odbc-api/vendored-unix-odbc"]
9891
lambda-web = ["dep:lambda-web", "odbc-static"]
9992

10093
[dev-dependencies]

src/filesystem.rs

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use crate::webserver::ErrorWithStatus;
22
use crate::webserver::database::SupportedDatabase;
33
use crate::webserver::{Database, StatusCodeResultExt, make_placeholder};
4+
use crate::webserver::database::{DbParam, driver::DbValue};
45
use crate::{AppState, TEMPLATES_DIR};
56
use anyhow::Context;
67
use chrono::{DateTime, Utc};
7-
use sqlx::any::{AnyStatement, AnyTypeInfo};
8-
use sqlx::postgres::types::PgTimeTz;
9-
use sqlx::{Executor, Postgres, Statement, Type};
108
use std::fmt::Write;
119
use std::io::ErrorKind;
1210
use std::path::{Component, Path, PathBuf};
@@ -241,9 +239,9 @@ async fn file_modified_since_local(path: &Path, since: DateTime<Utc>) -> tokio::
241239
}
242240

243241
pub struct DbFsQueries {
244-
was_modified: AnyStatement<'static>,
245-
read_file: AnyStatement<'static>,
246-
exists: AnyStatement<'static>,
242+
was_modified: String,
243+
read_file: String,
244+
exists: String,
247245
}
248246

249247
impl DbFsQueries {
@@ -277,44 +275,40 @@ impl DbFsQueries {
277275

278276
async fn check_table_available(db: &Database) -> anyhow::Result<()> {
279277
db.connection
280-
.execute("SELECT 1 FROM sqlpage_files WHERE 1 = 0")
278+
.acquire()
279+
.await
280+
.context("Unable to acquire database connection")?
281+
.execute_command("SELECT 1 FROM sqlpage_files WHERE 1 = 0", &[])
281282
.await
282-
.map(|_| ())
283283
.context("Unable to access sqlpage_files")?;
284284
Ok(())
285285
}
286286

287-
async fn make_was_modified_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
287+
async fn make_was_modified_query(db: &Database) -> anyhow::Result<String> {
288288
let was_modified_query = format!(
289289
"SELECT 1 from sqlpage_files WHERE last_modified >= {} AND path = {}",
290290
make_placeholder(db.info.kind, 1),
291291
make_placeholder(db.info.kind, 2)
292292
);
293-
let param_types: &[AnyTypeInfo; 2] = &[
294-
PgTimeTz::type_info().into(),
295-
<str as Type<Postgres>>::type_info().into(),
296-
];
297293
log::debug!("Preparing the database filesystem was_modified_query: {was_modified_query}");
298-
db.prepare_with(&was_modified_query, param_types).await
294+
Ok(was_modified_query)
299295
}
300296

301-
async fn make_read_file_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
297+
async fn make_read_file_query(db: &Database) -> anyhow::Result<String> {
302298
let read_file_query = format!(
303299
"SELECT contents from sqlpage_files WHERE path = {}",
304300
make_placeholder(db.info.kind, 1),
305301
);
306-
let param_types: &[AnyTypeInfo; 1] = &[<str as Type<Postgres>>::type_info().into()];
307302
log::debug!("Preparing the database filesystem read_file_query: {read_file_query}");
308-
db.prepare_with(&read_file_query, param_types).await
303+
Ok(read_file_query)
309304
}
310305

311-
async fn make_exists_query(db: &Database) -> anyhow::Result<AnyStatement<'static>> {
306+
async fn make_exists_query(db: &Database) -> anyhow::Result<String> {
312307
let exists_query = format!(
313308
"SELECT 1 from sqlpage_files WHERE path = {}",
314309
make_placeholder(db.info.kind, 1),
315310
);
316-
let param_types: &[AnyTypeInfo; 1] = &[<str as Type<Postgres>>::type_info().into()];
317-
db.prepare_with(&exists_query, param_types).await
311+
Ok(exists_query)
318312
}
319313

320314
async fn file_modified_since_in_db(
@@ -323,22 +317,22 @@ impl DbFsQueries {
323317
path: &Path,
324318
since: DateTime<Utc>,
325319
) -> anyhow::Result<bool> {
326-
let query = self
327-
.was_modified
328-
.query_as::<(i32,)>()
329-
.bind(since)
330-
.bind(path.display().to_string());
320+
let params = [
321+
DbParam::Timestamp(since),
322+
DbParam::Text(path.display().to_string()),
323+
];
331324
log::trace!(
332325
"Checking if file {} was modified since {} by executing query: \n\
333326
{}\n\
334327
with parameters: {:?}",
335328
path.display(),
336329
since,
337-
self.was_modified.sql(),
330+
self.was_modified,
338331
(since, path)
339332
);
340-
let was_modified_i32 = query
341-
.fetch_optional(&app_state.db.connection)
333+
let mut conn = app_state.db.connection.acquire().await?;
334+
let was_modified_i32 = conn
335+
.fetch_optional(&self.was_modified, &params)
342336
.await
343337
.with_context(|| {
344338
format!(
@@ -350,20 +344,25 @@ impl DbFsQueries {
350344
"DB File {} was modified result: {was_modified_i32:?}",
351345
path.display()
352346
);
353-
Ok(was_modified_i32 == Some((1,)))
347+
Ok(was_modified_i32.is_some())
354348
}
355349

356350
async fn read_file(&self, app_state: &AppState, path: &Path) -> anyhow::Result<Vec<u8>> {
357351
log::debug!("Reading file {} from the database", path.display());
358-
self.read_file
359-
.query_as::<(Vec<u8>,)>()
360-
.bind(path.display().to_string())
361-
.fetch_optional(&app_state.db.connection)
352+
let mut conn = app_state.db.connection.acquire().await?;
353+
conn.fetch_optional(
354+
&self.read_file,
355+
&[DbParam::Text(path.display().to_string())],
356+
)
362357
.await
363358
.map_err(anyhow::Error::from)
364-
.and_then(|modified| {
365-
if let Some((modified,)) = modified {
366-
Ok(modified)
359+
.and_then(|row| {
360+
if let Some(row) = row {
361+
match row.values.first() {
362+
Some(DbValue::Bytes(bytes)) => Ok(bytes.clone()),
363+
Some(DbValue::Text(text)) => Ok(text.as_bytes().to_vec()),
364+
_ => Ok(Vec::new()),
365+
}
367366
} else {
368367
Err(ErrorWithStatus {
369368
status: actix_web::http::StatusCode::NOT_FOUND,
@@ -375,19 +374,17 @@ impl DbFsQueries {
375374
}
376375

377376
async fn file_exists(&self, app_state: &AppState, path: &Path) -> anyhow::Result<bool> {
378-
let query = self
379-
.exists
380-
.query_as::<(i32,)>()
381-
.bind(path.display().to_string());
377+
let params = [DbParam::Text(path.display().to_string())];
382378
log::trace!(
383379
"Checking if file {} exists by executing query: \n\
384380
{}\n\
385381
with parameters: {:?}",
386382
path.display(),
387-
self.exists.sql(),
383+
self.exists,
388384
(path,)
389385
);
390-
let result = query.fetch_optional(&app_state.db.connection).await;
386+
let mut conn = app_state.db.connection.acquire().await?;
387+
let result = conn.fetch_optional(&self.exists, &params).await;
391388
log::debug!("DB File exists result: {result:?}");
392389
result.map(|result| result.is_some()).with_context(|| {
393390
format!(
@@ -401,7 +398,6 @@ impl DbFsQueries {
401398
#[actix_web::test]
402399
async fn test_sql_file_read_utf8() -> anyhow::Result<()> {
403400
use crate::app_config;
404-
use sqlx::Executor;
405401
let config = app_config::tests::test_config();
406402
let state = AppState::init(&config).await?;
407403

@@ -417,20 +413,24 @@ async fn test_sql_file_read_utf8() -> anyhow::Result<()> {
417413
let create_table_sql = DbFsQueries::get_create_table_sql(state.db.info.database_type);
418414
let db = &state.db;
419415
let conn = &db.connection;
420-
conn.execute("DROP TABLE IF EXISTS sqlpage_files").await?;
416+
let mut conn = db.connection.acquire().await?;
417+
conn.execute_command("DROP TABLE IF EXISTS sqlpage_files", &[]).await?;
421418
log::debug!("Creating table sqlpage_files: {create_table_sql}");
422-
conn.execute(create_table_sql).await?;
419+
conn.execute_command(create_table_sql, &[]).await?;
423420

424421
let dbms = db.info.kind;
425422
let insert_sql = format!(
426423
"INSERT INTO sqlpage_files(path, contents) VALUES ({}, {})",
427424
make_placeholder(dbms, 1),
428425
make_placeholder(dbms, 2)
429426
);
430-
sqlx::query(&insert_sql)
431-
.bind("unit test file.txt")
432-
.bind("Héllö world! 😀".as_bytes())
433-
.execute(conn)
427+
conn.execute_command(
428+
&insert_sql,
429+
&[
430+
DbParam::Text("unit test file.txt".into()),
431+
DbParam::Bytes("Héllö world! 😀".as_bytes().to_vec()),
432+
],
433+
)
434434
.await?;
435435

436436
let fs = FileSystem::init("/", db).await;

src/telemetry_metrics.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use opentelemetry::global;
22
use opentelemetry::metrics::{Histogram, ObservableGauge};
33
use opentelemetry_semantic_conventions::attribute as otel;
44
use opentelemetry_semantic_conventions::metric as otel_metric;
5-
use sqlx::AnyPool;
5+
use crate::webserver::database::DbPool;
66

77
pub struct TelemetryMetrics {
88
pub http_request_duration: Histogram<f64>,
@@ -41,7 +41,7 @@ impl Default for TelemetryMetrics {
4141

4242
impl TelemetryMetrics {
4343
#[must_use]
44-
pub fn new(pool: &AnyPool, db_system_name: &'static str) -> Self {
44+
pub fn new(pool: &DbPool, db_system_name: &'static str) -> Self {
4545
let meter = global::meter("sqlpage");
4646
let http_request_duration = meter
4747
.f64_histogram(otel_metric::HTTP_SERVER_REQUEST_DURATION)
@@ -60,9 +60,8 @@ impl TelemetryMetrics {
6060
.with_description("Number of connections in the database pool.")
6161
.with_callback(move |observer| {
6262
let size = pool_ref.size();
63-
let idle_u32 = u32::try_from(pool_ref.num_idle()).unwrap_or(u32::MAX);
64-
let used = i64::from(size.saturating_sub(idle_u32));
65-
let idle = i64::from(idle_u32);
63+
let idle = i64::from(pool_ref.num_idle());
64+
let used = i64::from(size);
6665
observer.observe(
6766
used,
6867
&[

0 commit comments

Comments
 (0)