Skip to content

Commit 9738195

Browse files
committed
feat: data utils
implements utilities for inferring schemas and converting CSVs and JSONs to parquet files
1 parent d55b031 commit 9738195

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4509
-71
lines changed

Cargo.lock

Lines changed: 793 additions & 58 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[workspace]
22
resolver = "2"
3-
members = ["archiver", "config", "runner", "template"]
3+
members = ["archiver", "config", "runner", "template", "data-utils"]
44

55
[package]
66
name = "aqora"
@@ -29,6 +29,8 @@ aqora-archiver = { path = "archiver", features = [
2929
aqora-config = { path = "config" }
3030
aqora-runner = { path = "runner", features = ["clap"] }
3131
aqora-template = { path = "template" }
32+
aqora-data-utils = { path = "data-utils" }
33+
3234
async-tempfile = "0.6"
3335
axum = "0.7"
3436
base32 = "0.5"
@@ -71,10 +73,7 @@ sentry = { version = "0.32", default-features = false, features = [
7173
"tracing",
7274
] }
7375
serde = { version = "1.0", features = ["derive"] }
74-
serde_json = { version = "1.0", features = [
75-
"preserve_order",
76-
"arbitrary_precision",
77-
] }
76+
serde_json = { version = "1.0", features = ["preserve_order"] }
7877
supports-color = "3.0"
7978
tempfile = "3.9"
8079
thiserror = "1.0"
@@ -106,10 +105,17 @@ graphql-ws-client = { version = "0.11", features = [
106105
] }
107106
tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] }
108107
qrcode = { version = "0.14.1", default-features = false }
108+
regex = "1.11.1"
109+
comfy-table = "7.1.4"
110+
ron = { version = "0.10", features = ["indexmap"] }
111+
parquet = { version = "54.3", features = ["snap"] }
109112

110113
[build-dependencies]
111114
toml_edit = "0.22"
112115
sentry-types = "0.32"
113116

114117
[dev-dependencies]
115118
pretty_assertions = "1.4.0"
119+
120+
[patch.crates-io]
121+
serde_arrow = { git = "https://github.com/aqora-io/serde_arrow.git", branch = "feat-allow-to-strings" }

data-utils/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pkg

data-utils/Cargo.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
[package]
2+
name = "aqora-data-utils"
3+
version = "0.13.0"
4+
edition = "2021"
5+
publish = false
6+
7+
[lib]
8+
crate-type = ["cdylib", "rlib"]
9+
10+
[features]
11+
default = ["all-formats"]
12+
default-wasm = ["wasm", "console_error_panic_hook", "all-formats"]
13+
all-formats = ["json", "csv"]
14+
csv = ["dep:csv", "csv-core"]
15+
json = ["serde_json"]
16+
wasm = ["wasm-bindgen", "wasm-bindgen-futures", "web-sys", "js-sys"]
17+
fs = ["tokio/fs"]
18+
19+
[dependencies]
20+
wasm-bindgen = { version = "0.2", optional = true }
21+
wasm-bindgen-futures = { version = "0.4", optional = true, features = [
22+
"futures-core-03-stream",
23+
] }
24+
js-sys = { version = "0.3", optional = true }
25+
web-sys = { version = "0.3", optional = true, features = [
26+
"console",
27+
"Blob",
28+
"ReadableStream",
29+
] }
30+
console_error_panic_hook = { version = "0.1", optional = true }
31+
wasm-bindgen-test = { version = "0.3", optional = true }
32+
33+
arrow = { version = "54", default-features = false }
34+
serde_arrow = { version = "0.13", features = ["arrow-54"] }
35+
parquet = { version = "54", default-features = false, features = [
36+
"arrow",
37+
"async",
38+
] }
39+
thiserror = "2"
40+
tokio = { version = "1", default-features = false, features = [
41+
"io-util",
42+
"rt",
43+
"sync",
44+
"macros",
45+
] }
46+
futures = { version = "0.3", default-features = false, features = ["std"] }
47+
serde_json = { version = "1.0", optional = true, features = ["preserve_order"] }
48+
csv = { version = "1.3", optional = true }
49+
csv-core = { version = "0.1", optional = true }
50+
bytes = "1.10"
51+
serde = "1.0"
52+
pin-project-lite = "0.2"
53+
regex = "1.11"
54+
indexmap = { version = "2.9", features = ["serde"] }
55+
ron = { version = "0.10.1", features = ["indexmap"] }
56+
chrono = "0.4"

data-utils/Makefile

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
BUILDFLAGS?=
2+
RUSTFLAGS?=
3+
4+
ifeq ($(RELEASE),)
5+
BUILDFLAGS+= --dev
6+
else
7+
BUILDFLAGS+= --release
8+
RUSTFLAGS+= -C opt-level=s
9+
endif
10+
11+
BROWSER?=chrome
12+
13+
.PHONY: wasm
14+
wasm:
15+
RUSTFLAGS="${RUSTFLAGS}" wasm-pack build --target web --out-name index ${BUILDFLAGS} --features default-wasm
16+
17+
.PHONY: watch
18+
watch:
19+
cargo watch -s "make wasm"
20+
21+
.PHONY: test
22+
test:
23+
wasm-pack test --${BROWSER} --features "default-wasm wasm-bindgen-test"
24+
25+
.PHONY: clean
26+
clean:
27+
rm -rf pkg

data-utils/src/csv/mod.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
pub mod reader;
2+
pub use reader::{CsvProcessor, CsvReadStream};
3+
4+
use csv_core::{Reader, ReaderBuilder, Terminator};
5+
use futures::prelude::*;
6+
use regex::Regex;
7+
use tokio::io::{self, AsyncRead, AsyncSeek, AsyncSeekExt};
8+
9+
use crate::value::{DateParseOptions, Value, ValueExt};
10+
use crate::Format;
11+
12+
#[derive(Debug, Clone)]
13+
pub struct CsvFormat {
14+
pub has_headers: bool,
15+
pub delimiter: u8,
16+
/// None is CLRF
17+
pub terminator: Option<u8>,
18+
pub quote: u8,
19+
pub escape: Option<u8>,
20+
pub comment: Option<u8>,
21+
pub null_regex: Option<Regex>,
22+
pub true_regex: Option<Regex>,
23+
pub false_regex: Option<Regex>,
24+
pub date_parse: DateParseOptions,
25+
}
26+
27+
impl CsvFormat {
28+
#[inline]
29+
pub(crate) fn check_null(&self, s: &str) -> bool {
30+
match self.null_regex.as_ref() {
31+
Some(r) => r.is_match(s),
32+
None => s.is_empty(),
33+
}
34+
}
35+
#[inline]
36+
pub(crate) fn check_true(&self, s: &str) -> bool {
37+
match self.null_regex.as_ref() {
38+
Some(r) => r.is_match(s),
39+
None => matches!(s, "true" | "True" | "TRUE"),
40+
}
41+
}
42+
#[inline]
43+
pub(crate) fn check_false(&self, s: &str) -> bool {
44+
match self.null_regex.as_ref() {
45+
Some(r) => r.is_match(s),
46+
None => matches!(s, "false" | "False" | "FALSE"),
47+
}
48+
}
49+
}
50+
51+
impl Default for CsvFormat {
52+
fn default() -> Self {
53+
Self {
54+
has_headers: false,
55+
terminator: None,
56+
delimiter: b',',
57+
quote: b'"',
58+
escape: None,
59+
comment: None,
60+
null_regex: None,
61+
true_regex: None,
62+
false_regex: None,
63+
date_parse: DateParseOptions::default(),
64+
}
65+
}
66+
}
67+
68+
impl From<&CsvFormat> for Reader {
69+
fn from(value: &CsvFormat) -> Self {
70+
let mut reader = ReaderBuilder::default();
71+
reader
72+
.delimiter(value.delimiter)
73+
.quote(value.quote)
74+
.escape(value.escape)
75+
.comment(value.comment)
76+
.terminator(
77+
value
78+
.terminator
79+
.map(Terminator::Any)
80+
.unwrap_or(Terminator::CRLF),
81+
);
82+
reader.build()
83+
}
84+
}
85+
86+
impl From<CsvFormat> for Format {
87+
fn from(value: CsvFormat) -> Self {
88+
Self::Csv(value)
89+
}
90+
}
91+
92+
pub async fn read<'a, R>(
93+
mut reader: R,
94+
options: CsvFormat,
95+
) -> io::Result<impl Stream<Item = io::Result<Value>> + 'a>
96+
where
97+
R: AsyncRead + AsyncSeek + Unpin + 'a,
98+
{
99+
let headers = if options.has_headers {
100+
let headers =
101+
CsvReadStream::<_, Vec<String>>::new(&mut reader, CsvProcessor::new(options.clone()))
102+
.try_next()
103+
.await?;
104+
reader.rewind().await?;
105+
headers
106+
} else {
107+
None
108+
};
109+
let has_headers = headers.is_some();
110+
let mut stream =
111+
CsvReadStream::<_, Vec<Value>>::new(reader, CsvProcessor::new(options.clone()))
112+
.map(move |seq| Ok(Value::Seq(seq?).with_headers(headers.clone())?))
113+
.map_ok(move |v| {
114+
v.map_values(|v| match v {
115+
Value::String(s) => {
116+
if options.check_null(&s) {
117+
Value::Unit
118+
} else if options.check_true(&s) {
119+
Value::Bool(true)
120+
} else if options.check_false(&s) {
121+
Value::Bool(false)
122+
} else {
123+
Value::String(options.date_parse.normalize(s))
124+
}
125+
}
126+
_ => v,
127+
})
128+
})
129+
.boxed_local();
130+
if has_headers {
131+
stream = stream.skip(1).boxed_local();
132+
}
133+
Ok(stream)
134+
}
135+
136+
#[cfg(test)]
137+
mod test {
138+
use super::*;
139+
use crate::{format::AsyncFileReader, infer, FormatReader};
140+
141+
pub fn load_csv(path: &str, format: CsvFormat) -> FormatReader<impl AsyncFileReader> {
142+
FormatReader::new(
143+
std::io::Cursor::new(std::fs::read(format!("tests/data/csv/{path}.csv")).unwrap()),
144+
format.into(),
145+
)
146+
}
147+
148+
fn infer_options() -> infer::Options {
149+
infer::Options::new().coerce_numbers(true)
150+
}
151+
152+
#[tokio::test]
153+
async fn basic_example() {
154+
println!(
155+
"basic_example: {:#?}",
156+
load_csv(
157+
"example",
158+
CsvFormat {
159+
has_headers: true,
160+
..Default::default()
161+
}
162+
)
163+
.infer_schema(infer_options(), None)
164+
.await
165+
.unwrap()
166+
);
167+
}
168+
169+
#[tokio::test]
170+
async fn example_no_headers() {
171+
println!(
172+
"example_no_headers: {:#?}",
173+
load_csv(
174+
"example_no_headers",
175+
CsvFormat {
176+
has_headers: false,
177+
..Default::default()
178+
}
179+
)
180+
.infer_schema(infer_options(), None)
181+
.await
182+
.unwrap()
183+
);
184+
}
185+
186+
#[tokio::test]
187+
async fn null_test() {
188+
println!(
189+
"null_test: {:#?}",
190+
load_csv(
191+
"null_test",
192+
CsvFormat {
193+
has_headers: true,
194+
..Default::default()
195+
}
196+
)
197+
.infer_schema(infer_options(), None)
198+
.await
199+
.unwrap()
200+
);
201+
}
202+
}

0 commit comments

Comments
 (0)