Skip to content

Commit 0be063a

Browse files
committed
crates: improved json codec decoding performance
Changelog-None
1 parent 9627bf9 commit 0be063a

File tree

2 files changed

+41
-68
lines changed

2 files changed

+41
-68
lines changed

cln-rpc/src/codec.rs

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,13 @@ use std::{io, str};
1212
use tokio_util::codec::{Decoder, Encoder};
1313

1414
pub use crate::jsonrpc::JsonRpc;
15-
use crate::{
16-
model::{Request},
17-
notifications::Notification,
18-
};
15+
use crate::{model::Request, notifications::Notification};
1916

2017
/// A simple codec that parses messages separated by two successive
2118
/// `\n` newlines.
2219
#[derive(Default)]
23-
pub struct MultiLineCodec {}
24-
25-
/// Find two consecutive newlines, i.e., an empty line, signalling the
26-
/// end of one message and the start of the next message.
27-
fn find_separator(buf: &mut BytesMut) -> Option<usize> {
28-
buf.iter()
29-
.zip(buf.iter().skip(1))
30-
.position(|b| *b.0 == b'\n' && *b.1 == b'\n')
20+
pub struct MultiLineCodec {
21+
search_pos: usize,
3122
}
3223

3324
fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
@@ -39,14 +30,24 @@ impl Decoder for MultiLineCodec {
3930
type Item = String;
4031
type Error = Error;
4132
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
42-
if let Some(newline_offset) = find_separator(buf) {
43-
let line = buf.split_to(newline_offset + 2);
44-
let line = &line[..line.len() - 2];
45-
let line = utf8(line)?;
46-
Ok(Some(line.to_string()))
47-
} else {
48-
Ok(None)
33+
let bytes = &buf[..];
34+
let mut i = self.search_pos;
35+
36+
while i + 1 < bytes.len() {
37+
if bytes[i] == b'\n' && bytes[i + 1] == b'\n' {
38+
let line = buf.split_to(i + 2);
39+
let line = &line[..line.len() - 2];
40+
41+
self.search_pos = 0;
42+
43+
return Ok(Some(utf8(line)?.to_owned()));
44+
}
45+
i += 1;
4946
}
47+
48+
self.search_pos = bytes.len().saturating_sub(1);
49+
50+
Ok(None)
5051
}
5152
}
5253

@@ -129,27 +130,11 @@ impl Decoder for JsonRpcCodec {
129130

130131
#[cfg(test)]
131132
mod test {
132-
use super::{find_separator, JsonCodec, MultiLineCodec};
133+
use super::{JsonCodec, MultiLineCodec};
133134
use bytes::{BufMut, BytesMut};
134135
use serde_json::json;
135136
use tokio_util::codec::{Decoder, Encoder};
136137

137-
#[test]
138-
fn test_separator() {
139-
struct Test(String, Option<usize>);
140-
let tests = vec![
141-
Test("".to_string(), None),
142-
Test("}\n\n".to_string(), Some(1)),
143-
Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)),
144-
];
145-
146-
for t in tests.iter() {
147-
let mut buf = BytesMut::new();
148-
buf.put_slice(t.0.as_bytes());
149-
assert_eq!(find_separator(&mut buf), t.1);
150-
}
151-
}
152-
153138
#[test]
154139
fn test_ml_decoder() {
155140
struct Test(String, Option<String>, String);

plugins/src/codec.rs

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,8 @@ use crate::messages::{Notification, Request};
1717
/// A simple codec that parses messages separated by two successive
1818
/// `\n` newlines.
1919
#[derive(Default)]
20-
pub struct MultiLineCodec {}
21-
22-
/// Find two consecutive newlines, i.e., an empty line, signalling the
23-
/// end of one message and the start of the next message.
24-
fn find_separator(buf: &mut BytesMut) -> Option<usize> {
25-
buf.iter()
26-
.zip(buf.iter().skip(1))
27-
.position(|b| *b.0 == b'\n' && *b.1 == b'\n')
20+
pub struct MultiLineCodec {
21+
search_pos: usize,
2822
}
2923

3024
fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
@@ -36,14 +30,24 @@ impl Decoder for MultiLineCodec {
3630
type Item = String;
3731
type Error = Error;
3832
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
39-
if let Some(newline_offset) = find_separator(buf) {
40-
let line = buf.split_to(newline_offset + 2);
41-
let line = &line[..line.len() - 2];
42-
let line = utf8(line)?;
43-
Ok(Some(line.to_string()))
44-
} else {
45-
Ok(None)
33+
let bytes = &buf[..];
34+
let mut i = self.search_pos;
35+
36+
while i + 1 < bytes.len() {
37+
if bytes[i] == b'\n' && bytes[i + 1] == b'\n' {
38+
let line = buf.split_to(i + 2);
39+
let line = &line[..line.len() - 2];
40+
41+
self.search_pos = 0;
42+
43+
return Ok(Some(utf8(line)?.to_owned()));
44+
}
45+
i += 1;
4646
}
47+
48+
self.search_pos = bytes.len().saturating_sub(1);
49+
50+
Ok(None)
4751
}
4852
}
4953

@@ -125,27 +129,11 @@ impl Decoder for JsonRpcCodec {
125129

126130
#[cfg(test)]
127131
mod test {
128-
use super::{find_separator, JsonCodec, MultiLineCodec};
132+
use super::{JsonCodec, MultiLineCodec};
129133
use bytes::{BufMut, BytesMut};
130134
use serde_json::json;
131135
use tokio_util::codec::{Decoder, Encoder};
132136

133-
#[test]
134-
fn test_separator() {
135-
struct Test(String, Option<usize>);
136-
let tests = vec![
137-
Test("".to_string(), None),
138-
Test("}\n\n".to_string(), Some(1)),
139-
Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)),
140-
];
141-
142-
for t in tests.iter() {
143-
let mut buf = BytesMut::new();
144-
buf.put_slice(t.0.as_bytes());
145-
assert_eq!(find_separator(&mut buf), t.1);
146-
}
147-
}
148-
149137
#[test]
150138
fn test_ml_decoder() {
151139
struct Test(String, Option<String>, String);

0 commit comments

Comments
 (0)