Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 21 additions & 36 deletions cln-rpc/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@ use std::{io, str};
use tokio_util::codec::{Decoder, Encoder};

pub use crate::jsonrpc::JsonRpc;
use crate::{
model::{Request},
notifications::Notification,
};
use crate::{model::Request, notifications::Notification};

/// A simple codec that parses messages separated by two successive
/// `\n` newlines.
#[derive(Default)]
pub struct MultiLineCodec {}

/// Find two consecutive newlines, i.e., an empty line, signalling the
/// end of one message and the start of the next message.
fn find_separator(buf: &mut BytesMut) -> Option<usize> {
buf.iter()
.zip(buf.iter().skip(1))
.position(|b| *b.0 == b'\n' && *b.1 == b'\n')
pub struct MultiLineCodec {
search_pos: usize,
}

fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
Expand All @@ -39,14 +30,24 @@ impl Decoder for MultiLineCodec {
type Item = String;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
if let Some(newline_offset) = find_separator(buf) {
let line = buf.split_to(newline_offset + 2);
let line = &line[..line.len() - 2];
let line = utf8(line)?;
Ok(Some(line.to_string()))
} else {
Ok(None)
let bytes = &buf[..];
let mut i = self.search_pos;

while i + 1 < bytes.len() {
if bytes[i] == b'\n' && bytes[i + 1] == b'\n' {
let line = buf.split_to(i + 2);
let line = &line[..line.len() - 2];

self.search_pos = 0;

return Ok(Some(utf8(line)?.to_owned()));
}
i += 1;
}

self.search_pos = bytes.len().saturating_sub(1);

Ok(None)
}
}

Expand Down Expand Up @@ -129,27 +130,11 @@ impl Decoder for JsonRpcCodec {

#[cfg(test)]
mod test {
use super::{find_separator, JsonCodec, MultiLineCodec};
use super::{JsonCodec, MultiLineCodec};
use bytes::{BufMut, BytesMut};
use serde_json::json;
use tokio_util::codec::{Decoder, Encoder};

#[test]
fn test_separator() {
struct Test(String, Option<usize>);
let tests = vec![
Test("".to_string(), None),
Test("}\n\n".to_string(), Some(1)),
Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)),
];

for t in tests.iter() {
let mut buf = BytesMut::new();
buf.put_slice(t.0.as_bytes());
assert_eq!(find_separator(&mut buf), t.1);
}
}

#[test]
fn test_ml_decoder() {
struct Test(String, Option<String>, String);
Expand Down
52 changes: 20 additions & 32 deletions plugins/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ use crate::messages::{Notification, Request};
/// A simple codec that parses messages separated by two successive
/// `\n` newlines.
#[derive(Default)]
pub struct MultiLineCodec {}

/// Find two consecutive newlines, i.e., an empty line, signalling the
/// end of one message and the start of the next message.
fn find_separator(buf: &mut BytesMut) -> Option<usize> {
buf.iter()
.zip(buf.iter().skip(1))
.position(|b| *b.0 == b'\n' && *b.1 == b'\n')
pub struct MultiLineCodec {
search_pos: usize,
}

fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
Expand All @@ -36,14 +30,24 @@ impl Decoder for MultiLineCodec {
type Item = String;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
if let Some(newline_offset) = find_separator(buf) {
let line = buf.split_to(newline_offset + 2);
let line = &line[..line.len() - 2];
let line = utf8(line)?;
Ok(Some(line.to_string()))
} else {
Ok(None)
let bytes = &buf[..];
let mut i = self.search_pos;

while i + 1 < bytes.len() {
if bytes[i] == b'\n' && bytes[i + 1] == b'\n' {
let line = buf.split_to(i + 2);
let line = &line[..line.len() - 2];

self.search_pos = 0;

return Ok(Some(utf8(line)?.to_owned()));
}
i += 1;
}

self.search_pos = bytes.len().saturating_sub(1);

Ok(None)
}
}

Expand Down Expand Up @@ -125,27 +129,11 @@ impl Decoder for JsonRpcCodec {

#[cfg(test)]
mod test {
use super::{find_separator, JsonCodec, MultiLineCodec};
use super::{JsonCodec, MultiLineCodec};
use bytes::{BufMut, BytesMut};
use serde_json::json;
use tokio_util::codec::{Decoder, Encoder};

#[test]
fn test_separator() {
struct Test(String, Option<usize>);
let tests = vec![
Test("".to_string(), None),
Test("}\n\n".to_string(), Some(1)),
Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)),
];

for t in tests.iter() {
let mut buf = BytesMut::new();
buf.put_slice(t.0.as_bytes());
assert_eq!(find_separator(&mut buf), t.1);
}
}

#[test]
fn test_ml_decoder() {
struct Test(String, Option<String>, String);
Expand Down
Loading