Skip to content

Commit 9d6e776

Browse files
committed
Add API for reading message slices from a source.
This change will also support: - Generic statistics (esrlabs#31) - Reading messages from streams (esrlabs#34) Additionally it provides a cleanup of the crate's feature names and removes the dependecy to the legacy buf_redux create.
1 parent 25706ea commit 9d6e776

15 files changed

+565
-315
lines changed

CHANGELOG.md

+13-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.20.0] - 2025-02-25
11+
12+
### Added
13+
14+
- Support generic statistics from reading messages
15+
- Support parsing of DLT messages from streams
16+
17+
### Changed
18+
19+
- Removed buf_redux dependency
20+
- Cleanup feature names
21+
1022
## [0.19.2] - 2025-02-06
1123

1224
### Changed
@@ -36,7 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3648

3749
### Changed
3850

39-
- Add feature "serde-support", which adds to crate's types Serialize/Deserialize
51+
- Add feature "serialization", which adds to crate's types Serialize/Deserialize
4052

4153
## [0.17.0] - 2024-10-04
4254

Cargo.toml

+15-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dlt-core"
3-
version = "0.19.2"
3+
version = "0.20.0"
44
authors = ["esrlabs.com"]
55
edition = "2021"
66
description = """
@@ -10,7 +10,6 @@ license = "Apache-2.0"
1010
repository = "https://github.com/esrlabs/dlt-core"
1111

1212
[dependencies]
13-
buf_redux = { version = "0.8.4", optional = true }
1413
byteorder = "1.4"
1514
bytes = "1.0"
1615
log = "0.4"
@@ -21,25 +20,29 @@ rustc-hash = { version = "2.1", optional = true }
2120
serde = { version = "1.0", features = ["derive"], optional = true }
2221
serde_json = { version = "1.0", optional = true }
2322
thiserror = "1.0"
23+
futures = "0.3"
2424

2525
[features]
2626
default = []
27-
statistics = ["buf_redux", "rustc-hash"]
28-
fibex_parser = ["quick-xml"]
29-
debug_parser = []
30-
serde-support = ["serde", "serde_json"]
27+
statistics = ["rustc-hash"]
28+
fibex = ["quick-xml"]
29+
debug = []
30+
serialization = ["serde", "serde_json"]
31+
stream = []
3132

3233
[lints.rust]
3334
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }
3435

3536
[dev-dependencies]
36-
buf_redux = "0.8.4"
3737
criterion = { version = "0.4", features = ["html_reports"] }
3838
dirs = "4.0"
3939
env_logger = "0.10"
4040
pretty_assertions = "1.3"
4141
proptest = "1.6"
4242
proptest-derive = "0.5"
43+
tokio = { version = "1", features = ["full"] }
44+
tokio-stream = "0.1"
45+
tokio-util = "0.7"
4346

4447
[[bench]]
4548
name = "dlt_benchmarks"
@@ -48,3 +51,8 @@ harness = false
4851
[[example]]
4952
name = "file_parser"
5053
path = "examples/file_parser.rs"
54+
55+
[[example]]
56+
name = "dlt_statistics"
57+
path = "examples/dlt_statistics.rs"
58+
required-features = ["statistics"]

README.md

+16-47
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Add this to your `Cargo.toml`:
1818

1919
```toml
2020
[dependencies]
21-
dlt_core = "0.19"
21+
dlt_core = "0.20"
2222
```
2323

2424
This is an example of how to parse a message and serialize it back to a byte array.
@@ -69,68 +69,35 @@ The following example can be run with `cargo run --example file_parser --release
6969

7070
<!-- example start -->
7171
```rust
72-
use buf_redux::{policy::MinBuffered, BufReader};
73-
use dlt_core::parse::{dlt_message, DltParseError};
74-
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};
75-
76-
const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
77-
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
72+
use dlt_core::read::{read_message, DltMessageReader};
73+
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
7874

7975
fn main() {
8076
// collect input file details
8177
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
8278
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
8379
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
84-
// create a reader that maintains a minimum amount of bytes in it's buffer
85-
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
86-
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
8780
// now parse all file content
88-
let mut parsed = 0usize;
81+
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
82+
let mut message_count = 0usize;
8983
let start = Instant::now();
9084
loop {
91-
let consumed: usize = match reader.fill_buf() {
92-
Ok(content) => {
93-
if content.is_empty() {
94-
println!("empty content after {} parsed messages", parsed);
95-
break;
96-
}
97-
let available = content.len();
98-
99-
match dlt_message(content, None, true) {
100-
Ok((rest, _maybe_msg)) => {
101-
let consumed = available - rest.len();
102-
parsed += 1;
103-
consumed
104-
}
105-
Err(DltParseError::IncompleteParse { needed }) => {
106-
println!("parse incomplete, needed: {:?}", needed);
107-
return;
108-
}
109-
Err(DltParseError::ParsingHickup(reason)) => {
110-
println!("parse error: {}", reason);
111-
4 //skip 4 bytes
112-
}
113-
Err(DltParseError::Unrecoverable(cause)) => {
114-
println!("unrecoverable parse failure: {}", cause);
115-
return;
116-
}
117-
}
85+
match read_message(&mut dlt_reader, None).expect("read dlt message") {
86+
Some(_message) => {
87+
message_count += 1;
11888
}
119-
Err(e) => {
120-
println!("Error reading: {}", e);
121-
return;
89+
None => {
90+
break;
12291
}
12392
};
124-
reader.consume(consumed);
12593
}
126-
12794
// print some stats
12895
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
12996
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
13097
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
13198
println!(
13299
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
133-
parsed, duration_in_s, amount_per_second
100+
message_count, duration_in_s, amount_per_second
134101
);
135102
}
136103

@@ -150,11 +117,13 @@ Below is the revised and improved English version of the documentation:
150117

151118
* **`statistics`**: Enables the `statistics` module, which scans the source data and provides a summary of its contents. This gives you an overview of the number of messages and their content.
152119

153-
* **`fibex_parser`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.
120+
* **`fibex`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.
121+
122+
* **`debug`**: Adds additional log output for debugging purposes.
154123

155-
* **`debug_parser`**: Adds additional log output for debugging purposes.
124+
* **`serialization`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.
156125

157-
* **`serde-support`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.
126+
* **`stream`**: Provides API for parsing DLT messages from streams.
158127

159128
## Example users
160129

examples/dlt_statistics.rs

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use dlt_core::{
2+
parse::DltParseError,
3+
statistics::{collect_dlt_statistics, StatisticCollector},
4+
};
5+
use std::{env, fs, path::PathBuf, time::Instant};
6+
7+
pub struct MessageCounter {
8+
count: usize,
9+
}
10+
11+
impl StatisticCollector for MessageCounter {
12+
fn collect_message(&mut self, _message: &[u8]) -> Result<(), DltParseError> {
13+
self.count += 1;
14+
Ok(())
15+
}
16+
}
17+
18+
fn main() {
19+
// collect input file details
20+
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
21+
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
22+
// now parse all file content
23+
let mut collector = MessageCounter { count: 0 };
24+
let start = Instant::now();
25+
collect_dlt_statistics(&dlt_file_path, &mut collector).expect("collect dlt statistics");
26+
// print some stats
27+
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
28+
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
29+
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
30+
println!(
31+
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
32+
collector.count, duration_in_s, amount_per_second
33+
);
34+
}

examples/file_parser.rs

+10-43
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,31 @@
1-
use buf_redux::{policy::MinBuffered, BufReader};
2-
use dlt_core::parse::{dlt_message, DltParseError};
3-
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};
4-
5-
const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
6-
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
1+
use dlt_core::read::{read_message, DltMessageReader};
2+
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
73

84
fn main() {
95
// collect input file details
106
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
117
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
128
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
13-
// create a reader that maintains a minimum amount of bytes in it's buffer
14-
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
15-
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
169
// now parse all file content
17-
let mut parsed = 0usize;
10+
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
11+
let mut message_count = 0usize;
1812
let start = Instant::now();
1913
loop {
20-
let consumed: usize = match reader.fill_buf() {
21-
Ok(content) => {
22-
if content.is_empty() {
23-
println!("empty content after {} parsed messages", parsed);
24-
break;
25-
}
26-
let available = content.len();
27-
28-
match dlt_message(content, None, true) {
29-
Ok((rest, _maybe_msg)) => {
30-
let consumed = available - rest.len();
31-
parsed += 1;
32-
consumed
33-
}
34-
Err(DltParseError::IncompleteParse { needed }) => {
35-
println!("parse incomplete, needed: {:?}", needed);
36-
return;
37-
}
38-
Err(DltParseError::ParsingHickup(reason)) => {
39-
println!("parse error: {}", reason);
40-
4 //skip 4 bytes
41-
}
42-
Err(DltParseError::Unrecoverable(cause)) => {
43-
println!("unrecoverable parse failure: {}", cause);
44-
return;
45-
}
46-
}
14+
match read_message(&mut dlt_reader, None).expect("read dlt message") {
15+
Some(_message) => {
16+
message_count += 1;
4717
}
48-
Err(e) => {
49-
println!("Error reading: {}", e);
50-
return;
18+
None => {
19+
break;
5120
}
5221
};
53-
reader.consume(consumed);
5422
}
55-
5623
// print some stats
5724
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
5825
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
5926
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
6027
println!(
6128
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
62-
parsed, duration_in_s, amount_per_second
29+
message_count, duration_in_s, amount_per_second
6330
);
6431
}

0 commit comments

Comments
 (0)