Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SnifferSV1 to ITF #1580

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/integration-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ jobs:
- name: Roles Integration Tests
run: |
RUST_BACKTRACE=1 RUST_LOG=debug cargo test --manifest-path=test/integration-tests/Cargo.toml --verbose --test '*' -- --nocapture

- name: SV1 Integration Tests
run: |
RUST_BACKTRACE=1 RUST_LOG=debug cargo test --manifest-path=test/integration-tests/Cargo.toml --verbose --test 'sv1' --features sv1 -- --nocapture
13 changes: 1 addition & 12 deletions .github/workflows/mg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,19 @@ jobs:
- name: Run jds-receive-solution-while-processing-declared-job
run: sh ./test/message-generator/test/jds-receive-solution-while-processing-declared-job/jds-receive-solution-while-processing-declared-job.sh

sv1-test:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Run sv1-test
run: sh ./test/message-generator/test/sv1-test/sv1-test.sh

mg-aggregate-results:
name: "Aggregate MG Test Results"
runs-on: ubuntu-latest
if: always()
needs: [
jds-do-not-fail-on-wrong-tsdatasucc,
jds-receive-solution-while-processing-declared-job,
sv1-test
]
steps:
- name: Aggregate Results
run: |
if [ "${{ needs.jds-do-not-fail-on-wrong-tsdatasucc.result }}" != "success" ] ||
[ "${{ needs.jds-receive-solution-while-processing-declared-job.result }}" != "success" ] ||
[ "${{ needs.sv1-test.result }}" != "success" ]; then
[ "${{ needs.jds-receive-solution-while-processing-declared-job.result }}" != "success" ]; then
echo "One or more jobs failed."
exit 1
else
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
cargo test --manifest-path=roles/Cargo.toml
cargo test --manifest-path=utils/Cargo.toml
cargo test --manifest-path=roles/roles-utils/config-helpers/Cargo.toml
cargo test --manifest-path=roles/roles-utils/network-helpers/Cargo.toml sv1_connection::tests::test_sv1_connection --features sv1

- name: Property based testing
run: |
Expand Down
11 changes: 7 additions & 4 deletions roles/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion roles/roles-utils/network-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ tokio = { version = "1", features = ["full"] }
binary_sv2 = { path = "../../../protocols/v2/binary-sv2", optional = true }
codec_sv2 = { path = "../../../protocols/v2/codec-sv2", features=["noise_sv2"], optional = true }
const_sv2 = {path = "../../../protocols/v2/const-sv2"}
sv1_api = { path = "../../../protocols/v1/", optional = true }
tracing = { version = "0.1" }
futures = "0.3.28"
tokio-util = { version = "0.7.10", default-features = false, features = ["codec"], optional = true }
serde_json = { version = "1.0.138", default-features = false, optional = true }

[features]
default = ["async-channel", "binary_sv2", "codec_sv2"]
with_buffer_pool = ["codec_sv2/with_buffer_pool"]
sv1 = ["sv1_api", "tokio-util", "serde_json"]

[package.metadata.docs.rs]
features = ["with_buffer_pool"]
features = ["with_buffer_pool"]
2 changes: 2 additions & 0 deletions roles/roles-utils/network-helpers/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use binary_sv2::{Deserialize, GetSize, Serialize};
pub mod noise_connection;
pub mod plain_connection;
#[cfg(feature = "sv1")]
pub mod sv1_connection;

use async_channel::{Receiver, RecvError, SendError, Sender};
use codec_sv2::{Error as CodecError, HandShakeFrame, HandshakeRole, StandardEitherFrame};
Expand Down
171 changes: 171 additions & 0 deletions roles/roles-utils/network-helpers/src/sv1_connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use async_channel::{unbounded, Receiver, Sender};
use futures::{FutureExt, StreamExt};
use sv1_api::json_rpc;
use tokio::{
io::{AsyncWriteExt, BufReader},
net::TcpStream,
};
use tokio_util::codec::{FramedRead, LinesCodec};

/// Represents a connection between two roles communicating using SV1 protocol.
///
/// This struct can be used read and write messages to the other side of the connection. The
/// channel is unidirectional, so you will need two instances of this struct to communicate in both
/// directions.
Comment on lines +12 to +14
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment feels a bit confusing

with sender I can send, with receiver I can receive... therefore this is bidirectional?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding I think they say a single Channel instance only handles communication in one direction. But yes I agree that the statement could be clearer.

#[derive(Debug)]
pub struct ConnectionSV1 {
receiver: Receiver<json_rpc::Message>,
sender: Sender<json_rpc::Message>,
}

const MAX_LINE_LENGTH: usize = 2_usize.pow(16);

impl ConnectionSV1 {
/// Create a new connection set up to communicate with the other side of the given stream.
///
/// Two tasks are spawned to handle reading and writing messages. The reading task will read
/// messages from the stream and send them to the receiver channel. The writing task will read
/// messages from the sender channel and write them to the stream.
pub async fn new(stream: TcpStream) -> Self {
let (reader_stream, mut writer_stream) = stream.into_split();
let (sender_incoming, receiver_incoming) = unbounded();
let (sender_outgoing, receiver_outgoing) = unbounded::<json_rpc::Message>();

// Read Job
let sender_incoming_clone = sender_incoming.clone();
tokio::task::spawn(async move {
let reader = BufReader::new(reader_stream);
let mut messages =
FramedRead::new(reader, LinesCodec::new_with_max_length(MAX_LINE_LENGTH));
loop {
tokio::select! {
res = messages.next().fuse() => {
match res {
Some(Ok(incoming)) => {
let incoming: json_rpc::Message = serde_json::from_str(&incoming).unwrap();
if sender_incoming_clone
.send(incoming)
.await
.is_err()
{
break;
}

}
Some(Err(e)) => {
break tracing::error!("Error reading from stream: {:?}", e);
}
None => {
dbg!("None");
}
}
},
_ = tokio::signal::ctrl_c().fuse() => {
break;
}
};
}
});

// Write Job
tokio::task::spawn(async move {
loop {
tokio::select! {
res = receiver_outgoing.recv().fuse() => {
let to_send = res.unwrap();
let to_send = match serde_json::to_string(&to_send) {
Ok(string) => format!("{}\n", string),
Err(_e) => {
break;
}
};
let _ = writer_stream
.write_all(to_send.as_bytes())
.await;
},
_ = tokio::signal::ctrl_c().fuse() => {
break;
}
};
}
});
Self {
receiver: receiver_incoming,
sender: sender_outgoing,
}
}

/// Send a message to the other side of the connection.
pub async fn send(&self, msg: json_rpc::Message) -> bool {
self.sender.send(msg).await.is_ok()
}

/// Receive a message from the other side of the connection.
pub async fn receive(&self) -> Option<json_rpc::Message> {
self.receiver.recv().await.ok()
}

/// Get a clone of the receiver channel.
pub fn receiver(&self) -> Receiver<json_rpc::Message> {
self.receiver.clone()
}

/// Get a clone of the sender channel.
pub fn sender(&self) -> Sender<json_rpc::Message> {
self.sender.clone()
}
}

#[cfg(test)]
mod tests {
use tokio::net::TcpListener;

use super::*;

#[tokio::test]
async fn test_sv1_connection() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let downstream_stream = TcpStream::connect(addr).await.unwrap();
let (upstream_stream, _) = listener.accept().await.unwrap();

let upstream_connection = ConnectionSV1::new(upstream_stream).await;
let downstream_connection = ConnectionSV1::new(downstream_stream).await;
let message = json_rpc::Message::StandardRequest(json_rpc::StandardRequest {
id: 1,
method: "test".to_string(),
params: serde_json::Value::Null,
});
assert!(downstream_connection.send(message).await);
let received_on_upstream = upstream_connection.receive().await.unwrap();
match received_on_upstream {
json_rpc::Message::StandardRequest(received) => {
assert_eq!(received.id, 1);
assert_eq!(received.method, "test".to_string());
assert_eq!(received.params, serde_json::Value::Null);
}
_ => {
panic!("Unexpected message type");
}
}
let upstream_response = json_rpc::Message::OkResponse(json_rpc::Response {
id: 1,
result: serde_json::Value::String("response".to_string()),
error: None,
});
assert!(upstream_connection.send(upstream_response).await);
let received_upstream = downstream_connection.receive().await.unwrap();
match received_upstream {
json_rpc::Message::OkResponse(received) => {
assert_eq!(received.id, 1);
assert_eq!(
received.result,
serde_json::Value::String("response".to_string())
);
}
_ => {
panic!("Unexpected message type");
}
}
}
}
Loading
Loading