Skip to content

Commit 070a869

Browse files
committed
Add support for reading DLT messages.
This change includes: - new API to read DLT messages from a source - new API to read DLT messages from a stream (esrlabs#34) - new API to collect generic DLT statistics (esrlabs#31) - removing of any dependency to the buf_redux crate - consolidation of the crate's feature names - increase of the crate's version to 0.20.0
1 parent 25706ea commit 070a869

17 files changed

+863
-540
lines changed

CHANGELOG.md

+14-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.20.0] - 2025-02-25
11+
12+
### Added
13+
14+
- API to read DLT messages from a source
15+
- API to read DLT messages from a stream
16+
- API to collect generic DLT statistics
17+
18+
### Changed
19+
20+
- Removed buf_redux dependency
21+
- Cleanup feature names
22+
1023
## [0.19.2] - 2025-02-06
1124

1225
### Changed
@@ -36,7 +49,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3649

3750
### Changed
3851

39-
- Add feature "serde-support", which adds to crate's types Serialize/Deserialize
52+
- Add feature "serialization", which adds to crate's types Serialize/Deserialize
4053

4154
## [0.17.0] - 2024-10-04
4255

Cargo.toml

+20-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "dlt-core"
3-
version = "0.19.2"
3+
version = "0.20.0"
44
authors = ["esrlabs.com"]
55
edition = "2021"
66
description = """
@@ -10,9 +10,9 @@ license = "Apache-2.0"
1010
repository = "https://github.com/esrlabs/dlt-core"
1111

1212
[dependencies]
13-
buf_redux = { version = "0.8.4", optional = true }
1413
byteorder = "1.4"
1514
bytes = "1.0"
15+
futures = "0.3"
1616
log = "0.4"
1717
memchr = "2.4"
1818
nom = "7.1"
@@ -23,23 +23,26 @@ serde_json = { version = "1.0", optional = true }
2323
thiserror = "1.0"
2424

2525
[features]
26+
debug = []
2627
default = []
27-
statistics = ["buf_redux", "rustc-hash"]
28-
fibex_parser = ["quick-xml"]
29-
debug_parser = []
30-
serde-support = ["serde", "serde_json"]
28+
fibex = ["quick-xml"]
29+
serialization = ["serde", "serde_json"]
30+
statistics = ["rustc-hash"]
31+
stream = []
3132

3233
[lints.rust]
3334
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }
3435

3536
[dev-dependencies]
36-
buf_redux = "0.8.4"
3737
criterion = { version = "0.4", features = ["html_reports"] }
3838
dirs = "4.0"
3939
env_logger = "0.10"
4040
pretty_assertions = "1.3"
4141
proptest = "1.6"
4242
proptest-derive = "0.5"
43+
tokio = { version = "1", features = ["full"] }
44+
tokio-stream = "0.1"
45+
tokio-util = { version = "0.7", features = ["compat"] }
4346

4447
[[bench]]
4548
name = "dlt_benchmarks"
@@ -48,3 +51,13 @@ harness = false
4851
[[example]]
4952
name = "file_parser"
5053
path = "examples/file_parser.rs"
54+
55+
[[example]]
56+
name = "file_streamer"
57+
path = "examples/file_streamer.rs"
58+
required-features = ["stream"]
59+
60+
[[example]]
61+
name = "dlt_statistics"
62+
path = "examples/dlt_statistics.rs"
63+
required-features = ["statistics"]

README.md

+20-51
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Add this to your `Cargo.toml`:
1818

1919
```toml
2020
[dependencies]
21-
dlt_core = "0.19"
21+
dlt_core = "0.20"
2222
```
2323

2424
This is an example of how to parse a message and serialize it back to a byte array.
@@ -69,68 +69,35 @@ The following example can be run with `cargo run --example file_parser --release
6969

7070
<!-- example start -->
7171
```rust
72-
use buf_redux::{policy::MinBuffered, BufReader};
73-
use dlt_core::parse::{dlt_message, DltParseError};
74-
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};
75-
76-
const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
77-
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
72+
use dlt_core::read::{read_message, DltMessageReader};
73+
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
7874

7975
fn main() {
8076
// collect input file details
81-
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
82-
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
83-
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
84-
// create a reader that maintains a minimum amount of bytes in it's buffer
85-
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
86-
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
77+
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("no filename given"));
78+
let dlt_file = File::open(&dlt_file_path).expect("open input file");
79+
let dlt_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
8780
// now parse all file content
88-
let mut parsed = 0usize;
81+
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
82+
let mut message_count = 0usize;
8983
let start = Instant::now();
9084
loop {
91-
let consumed: usize = match reader.fill_buf() {
92-
Ok(content) => {
93-
if content.is_empty() {
94-
println!("empty content after {} parsed messages", parsed);
95-
break;
96-
}
97-
let available = content.len();
98-
99-
match dlt_message(content, None, true) {
100-
Ok((rest, _maybe_msg)) => {
101-
let consumed = available - rest.len();
102-
parsed += 1;
103-
consumed
104-
}
105-
Err(DltParseError::IncompleteParse { needed }) => {
106-
println!("parse incomplete, needed: {:?}", needed);
107-
return;
108-
}
109-
Err(DltParseError::ParsingHickup(reason)) => {
110-
println!("parse error: {}", reason);
111-
4 //skip 4 bytes
112-
}
113-
Err(DltParseError::Unrecoverable(cause)) => {
114-
println!("unrecoverable parse failure: {}", cause);
115-
return;
116-
}
117-
}
85+
match read_message(&mut dlt_reader, None).expect("read dlt message") {
86+
Some(_message) => {
87+
message_count += 1;
11888
}
119-
Err(e) => {
120-
println!("Error reading: {}", e);
121-
return;
89+
None => {
90+
break;
12291
}
12392
};
124-
reader.consume(consumed);
12593
}
126-
12794
// print some stats
12895
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
129-
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
96+
let file_size_in_mb = dlt_file_size as f64 / 1024.0 / 1024.0;
13097
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
13198
println!(
13299
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
133-
parsed, duration_in_s, amount_per_second
100+
message_count, duration_in_s, amount_per_second
134101
);
135102
}
136103

@@ -150,11 +117,13 @@ Below is the revised and improved English version of the documentation:
150117

151118
* **`statistics`**: Enables the `statistics` module, which scans the source data and provides a summary of its contents. This gives you an overview of the number of messages and their content.
152119

153-
* **`fibex_parser`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.
120+
* **`fibex`**: Enables the `fibex` module, which allows to parse configurations for non-verbose messages from a fibex model.
121+
122+
* **`debug`**: Adds additional log output for debugging purposes.
154123

155-
* **`debug_parser`**: Adds additional log output for debugging purposes.
124+
* **`serialization`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.
156125

157-
* **`serde-support`**: Adds `Serialize` and `Deserialize` implementations (via `serde`) to all public types. This feature is useful if you need to encode or decode these types for transmission or storage.
126+
* **`stream`**: Provides API for parsing DLT messages from streams.
158127

159128
## Example users
160129

examples/dlt_statistics.rs

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use dlt_core::{
2+
parse::DltParseError,
3+
read::DltMessageReader,
4+
statistics::{collect_statistics, Statistic, StatisticCollector},
5+
};
6+
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
7+
8+
pub struct MessageCounter {
9+
count: usize,
10+
}
11+
12+
impl StatisticCollector for MessageCounter {
13+
fn collect_statistic(&mut self, _statistic: Statistic) -> Result<(), DltParseError> {
14+
self.count += 1;
15+
Ok(())
16+
}
17+
}
18+
19+
fn main() {
20+
// collect input file details
21+
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("no filename given"));
22+
let dlt_file = File::open(&dlt_file_path).expect("open input file");
23+
let dlt_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
24+
// now scan all file content
25+
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
26+
let mut dlt_collector = MessageCounter { count: 0 };
27+
let start = Instant::now();
28+
collect_statistics(&mut dlt_reader, &mut dlt_collector).expect("collect dlt statistics");
29+
// print some stats
30+
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
31+
let file_size_in_mb = dlt_file_size as f64 / 1024.0 / 1024.0;
32+
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
33+
println!(
34+
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
35+
dlt_collector.count, duration_in_s, amount_per_second
36+
);
37+
}

examples/file_parser.rs

+14-47
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,31 @@
1-
use buf_redux::{policy::MinBuffered, BufReader};
2-
use dlt_core::parse::{dlt_message, DltParseError};
3-
use std::{env, fs, fs::File, io::BufRead, path::PathBuf, time::Instant};
4-
5-
const BIN_READER_CAPACITY: usize = 10 * 1024 * 1024;
6-
const BIN_MIN_BUFFER_SPACE: usize = 10 * 1024;
1+
use dlt_core::read::{read_message, DltMessageReader};
2+
use std::{env, fs, fs::File, path::PathBuf, time::Instant};
73

84
fn main() {
95
// collect input file details
10-
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("No filename given"));
11-
let dlt_file = File::open(&dlt_file_path).expect("could not open file");
12-
let source_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
13-
// create a reader that maintains a minimum amount of bytes in it's buffer
14-
let mut reader = BufReader::with_capacity(BIN_READER_CAPACITY, dlt_file)
15-
.set_policy(MinBuffered(BIN_MIN_BUFFER_SPACE));
6+
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("no filename given"));
7+
let dlt_file = File::open(&dlt_file_path).expect("open input file");
8+
let dlt_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
169
// now parse all file content
17-
let mut parsed = 0usize;
10+
let mut dlt_reader = DltMessageReader::new(dlt_file, true);
11+
let mut message_count = 0usize;
1812
let start = Instant::now();
1913
loop {
20-
let consumed: usize = match reader.fill_buf() {
21-
Ok(content) => {
22-
if content.is_empty() {
23-
println!("empty content after {} parsed messages", parsed);
24-
break;
25-
}
26-
let available = content.len();
27-
28-
match dlt_message(content, None, true) {
29-
Ok((rest, _maybe_msg)) => {
30-
let consumed = available - rest.len();
31-
parsed += 1;
32-
consumed
33-
}
34-
Err(DltParseError::IncompleteParse { needed }) => {
35-
println!("parse incomplete, needed: {:?}", needed);
36-
return;
37-
}
38-
Err(DltParseError::ParsingHickup(reason)) => {
39-
println!("parse error: {}", reason);
40-
4 //skip 4 bytes
41-
}
42-
Err(DltParseError::Unrecoverable(cause)) => {
43-
println!("unrecoverable parse failure: {}", cause);
44-
return;
45-
}
46-
}
14+
match read_message(&mut dlt_reader, None).expect("read dlt message") {
15+
Some(_message) => {
16+
message_count += 1;
4717
}
48-
Err(e) => {
49-
println!("Error reading: {}", e);
50-
return;
18+
None => {
19+
break;
5120
}
5221
};
53-
reader.consume(consumed);
5422
}
55-
5623
// print some stats
5724
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
58-
let file_size_in_mb = source_file_size as f64 / 1024.0 / 1024.0;
25+
let file_size_in_mb = dlt_file_size as f64 / 1024.0 / 1024.0;
5926
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
6027
println!(
6128
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
62-
parsed, duration_in_s, amount_per_second
29+
message_count, duration_in_s, amount_per_second
6330
);
6431
}

examples/file_streamer.rs

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use dlt_core::stream::{read_message, DltStreamReader};
2+
use std::{env, fs, path::PathBuf, time::Instant};
3+
use tokio::fs::File;
4+
use tokio_util::compat::TokioAsyncReadCompatExt;
5+
6+
#[tokio::main]
7+
async fn main() {
8+
// collect input file details
9+
let dlt_file_path = PathBuf::from(&env::args().nth(1).expect("no filename given"));
10+
let dlt_file = File::open(&dlt_file_path).await.expect("open input file");
11+
let dlt_file_size = fs::metadata(&dlt_file_path).expect("file size error").len();
12+
// now parse all file content
13+
let mut dlt_reader = DltStreamReader::new(dlt_file.compat(), true);
14+
let mut message_count = 0usize;
15+
let start = Instant::now();
16+
loop {
17+
match read_message(&mut dlt_reader, None)
18+
.await
19+
.expect("read dlt message")
20+
{
21+
Some(_message) => {
22+
message_count += 1;
23+
}
24+
None => {
25+
break;
26+
}
27+
};
28+
}
29+
// print some stats
30+
let duration_in_s = start.elapsed().as_millis() as f64 / 1000.0;
31+
let file_size_in_mb = dlt_file_size as f64 / 1024.0 / 1024.0;
32+
let amount_per_second: f64 = file_size_in_mb / duration_in_s;
33+
println!(
34+
"parsing {} messages took {:.3}s! ({:.3} MB/s)",
35+
message_count, duration_in_s, amount_per_second
36+
);
37+
}

0 commit comments

Comments
 (0)