Skip to content
This repository was archived by the owner on Jun 4, 2023. It is now read-only.

Commit f63abbe

Browse files
authored
Merge pull request containerd#146 from wllenyj/stream-1
Streaming support.
2 parents 5def440 + 98d128d commit f63abbe

25 files changed

+2824
-686
lines changed

compiler/src/codegen.rs

Lines changed: 193 additions & 122 deletions
Large diffs are not rendered by default.

deny.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ unlicensed = "deny"
7272
allow = [
7373
"MIT",
7474
"Apache-2.0",
75+
"Unicode-DFS-2016",
7576
#"Apache-2.0 WITH LLVM-exception",
7677
]
7778
# List of explictly disallowed licenses

example/Cargo.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ ttrpc = { path = "../", features = ["async"] }
2222
ctrlc = { version = "3.0", features = ["termination"] }
2323
tokio = { version = "1.0.1", features = ["signal", "time"] }
2424
async-trait = "0.1.42"
25+
rand = "0.8.5"
2526

2627

2728
[[example]]
@@ -40,5 +41,13 @@ path = "./async-server.rs"
4041
name = "async-client"
4142
path = "./async-client.rs"
4243

44+
[[example]]
45+
name = "async-stream-server"
46+
path = "./async-stream-server.rs"
47+
48+
[[example]]
49+
name = "async-stream-client"
50+
path = "./async-stream-client.rs"
51+
4352
[build-dependencies]
4453
ttrpc-codegen = { path = "../ttrpc-codegen"}

example/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ build:
88
cargo build --example client
99
cargo build --example async-server
1010
cargo build --example async-client
11+
cargo build --example async-stream-server
12+
cargo build --example async-stream-client
1113

1214
.PHONY: deps
1315
deps:

example/async-stream-client.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright 2022 Alibaba Cloud. All rights reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
mod protocols;
7+
mod utils;
8+
9+
use protocols::r#async::{empty, streaming, streaming_ttrpc};
10+
use ttrpc::context::{self, Context};
11+
use ttrpc::r#async::Client;
12+
13+
#[tokio::main(flavor = "current_thread")]
14+
async fn main() {
15+
simple_logging::log_to_stderr(log::LevelFilter::Info);
16+
17+
let c = Client::connect(utils::SOCK_ADDR).unwrap();
18+
let sc = streaming_ttrpc::StreamingClient::new(c);
19+
20+
let _now = std::time::Instant::now();
21+
22+
let sc1 = sc.clone();
23+
let t1 = tokio::spawn(echo_request(sc1));
24+
25+
let sc1 = sc.clone();
26+
let t2 = tokio::spawn(echo_stream(sc1));
27+
28+
let sc1 = sc.clone();
29+
let t3 = tokio::spawn(sum_stream(sc1));
30+
31+
let sc1 = sc.clone();
32+
let t4 = tokio::spawn(divide_stream(sc1));
33+
34+
let sc1 = sc.clone();
35+
let t5 = tokio::spawn(echo_null(sc1));
36+
37+
let t6 = tokio::spawn(echo_null_stream(sc));
38+
39+
let _ = tokio::join!(t1, t2, t3, t4, t5, t6);
40+
}
41+
42+
fn default_ctx() -> Context {
43+
let mut ctx = context::with_timeout(0);
44+
ctx.add("key-1".to_string(), "value-1-1".to_string());
45+
ctx.add("key-1".to_string(), "value-1-2".to_string());
46+
ctx.set("key-2".to_string(), vec!["value-2".to_string()]);
47+
48+
ctx
49+
}
50+
51+
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
52+
let echo1 = streaming::EchoPayload {
53+
seq: 1,
54+
msg: "Echo Me".to_string(),
55+
..Default::default()
56+
};
57+
let resp = cli.echo(default_ctx(), &echo1).await.unwrap();
58+
assert_eq!(resp.msg, echo1.msg);
59+
assert_eq!(resp.seq, echo1.seq + 1);
60+
}
61+
62+
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
63+
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();
64+
65+
let mut i = 0;
66+
while i < 100 {
67+
let echo = streaming::EchoPayload {
68+
seq: i as u32,
69+
msg: format!("{}: Echo in a stream", i),
70+
..Default::default()
71+
};
72+
stream.send(&echo).await.unwrap();
73+
let resp = stream.recv().await.unwrap();
74+
assert_eq!(resp.msg, echo.msg);
75+
assert_eq!(resp.seq, echo.seq + 1);
76+
77+
i += 2;
78+
}
79+
stream.close_send().await.unwrap();
80+
let ret = stream.recv().await;
81+
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
82+
}
83+
84+
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
85+
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();
86+
87+
let mut sum = streaming::Sum::new();
88+
stream.send(&streaming::Part::new()).await.unwrap();
89+
90+
sum.num += 1;
91+
let mut i = -99i32;
92+
while i <= 100 {
93+
let addi = streaming::Part {
94+
add: i,
95+
..Default::default()
96+
};
97+
stream.send(&addi).await.unwrap();
98+
sum.sum += i;
99+
sum.num += 1;
100+
101+
i += 1;
102+
}
103+
stream.send(&streaming::Part::new()).await.unwrap();
104+
sum.num += 1;
105+
106+
let ssum = stream.close_and_recv().await.unwrap();
107+
assert_eq!(ssum.sum, sum.sum);
108+
assert_eq!(ssum.num, sum.num);
109+
}
110+
111+
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
112+
let expected = streaming::Sum {
113+
sum: 392,
114+
num: 4,
115+
..Default::default()
116+
};
117+
let mut stream = cli.divide_stream(default_ctx(), &expected).await.unwrap();
118+
119+
let mut actual = streaming::Sum::new();
120+
121+
// NOTE: `for part in stream.recv().await.unwrap()` can't work.
122+
while let Some(part) = stream.recv().await.unwrap() {
123+
actual.sum += part.add;
124+
actual.num += 1;
125+
}
126+
assert_eq!(actual.sum, expected.sum);
127+
assert_eq!(actual.num, expected.num);
128+
}
129+
130+
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
131+
let mut stream = cli.echo_null(default_ctx()).await.unwrap();
132+
133+
for i in 0..100 {
134+
let echo = streaming::EchoPayload {
135+
seq: i as u32,
136+
msg: "non-empty empty".to_string(),
137+
..Default::default()
138+
};
139+
stream.send(&echo).await.unwrap();
140+
}
141+
let res = stream.close_and_recv().await.unwrap();
142+
assert_eq!(res, empty::Empty::new());
143+
}
144+
145+
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
146+
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();
147+
148+
let (tx, mut rx) = stream.split();
149+
150+
let task = tokio::spawn(async move {
151+
loop {
152+
let ret = rx.recv().await;
153+
if matches!(ret, Err(ttrpc::Error::Eof)) {
154+
break;
155+
}
156+
}
157+
});
158+
159+
for i in 0..100 {
160+
let echo = streaming::EchoPayload {
161+
seq: i as u32,
162+
msg: "non-empty empty".to_string(),
163+
..Default::default()
164+
};
165+
tx.send(&echo).await.unwrap();
166+
}
167+
168+
tx.close_send().await.unwrap();
169+
170+
tokio::time::timeout(tokio::time::Duration::from_secs(10), task)
171+
.await
172+
.unwrap()
173+
.unwrap();
174+
}

example/async-stream-server.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2022 Alibaba Cloud. All rights reserved.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
6+
mod protocols;
7+
mod utils;
8+
9+
use std::sync::Arc;
10+
11+
use log::{info, LevelFilter};
12+
13+
use protocols::r#async::{empty, streaming, streaming_ttrpc};
14+
use ttrpc::asynchronous::Server;
15+
16+
use async_trait::async_trait;
17+
use tokio::signal::unix::{signal, SignalKind};
18+
use tokio::time::sleep;
19+
20+
struct StreamingService;
21+
22+
#[async_trait]
23+
impl streaming_ttrpc::Streaming for StreamingService {
24+
async fn echo(
25+
&self,
26+
_ctx: &::ttrpc::r#async::TtrpcContext,
27+
mut e: streaming::EchoPayload,
28+
) -> ::ttrpc::Result<streaming::EchoPayload> {
29+
e.seq += 1;
30+
Ok(e)
31+
}
32+
33+
async fn echo_stream(
34+
&self,
35+
_ctx: &::ttrpc::r#async::TtrpcContext,
36+
mut s: ::ttrpc::r#async::ServerStream<streaming::EchoPayload, streaming::EchoPayload>,
37+
) -> ::ttrpc::Result<()> {
38+
while let Some(mut e) = s.recv().await? {
39+
e.seq += 1;
40+
s.send(&e).await?;
41+
}
42+
43+
Ok(())
44+
}
45+
46+
async fn sum_stream(
47+
&self,
48+
_ctx: &::ttrpc::r#async::TtrpcContext,
49+
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::Part>,
50+
) -> ::ttrpc::Result<streaming::Sum> {
51+
let mut sum = streaming::Sum::new();
52+
while let Some(part) = s.recv().await? {
53+
sum.sum += part.add;
54+
sum.num += 1;
55+
}
56+
57+
Ok(sum)
58+
}
59+
60+
async fn divide_stream(
61+
&self,
62+
_ctx: &::ttrpc::r#async::TtrpcContext,
63+
sum: streaming::Sum,
64+
s: ::ttrpc::r#async::ServerStreamSender<streaming::Part>,
65+
) -> ::ttrpc::Result<()> {
66+
let mut parts = vec![streaming::Part::new(); sum.num as usize];
67+
68+
let mut total = 0i32;
69+
for i in 1..(sum.num - 2) {
70+
let add = (rand::random::<u32>() % 1000) as i32 - 500;
71+
parts[i as usize].add = add;
72+
total += add;
73+
}
74+
75+
parts[sum.num as usize - 2].add = sum.sum - total;
76+
77+
for part in parts {
78+
s.send(&part).await.unwrap();
79+
}
80+
81+
Ok(())
82+
}
83+
84+
async fn echo_null(
85+
&self,
86+
_ctx: &::ttrpc::r#async::TtrpcContext,
87+
mut s: ::ttrpc::r#async::ServerStreamReceiver<streaming::EchoPayload>,
88+
) -> ::ttrpc::Result<empty::Empty> {
89+
let mut seq = 0;
90+
while let Some(e) = s.recv().await? {
91+
assert_eq!(e.seq, seq);
92+
assert_eq!(e.msg.as_str(), "non-empty empty");
93+
seq += 1;
94+
}
95+
Ok(empty::Empty::new())
96+
}
97+
98+
async fn echo_null_stream(
99+
&self,
100+
_ctx: &::ttrpc::r#async::TtrpcContext,
101+
s: ::ttrpc::r#async::ServerStream<empty::Empty, streaming::EchoPayload>,
102+
) -> ::ttrpc::Result<()> {
103+
let msg = "non-empty empty".to_string();
104+
105+
let mut tasks = Vec::new();
106+
107+
let (tx, mut rx) = s.split();
108+
let mut seq = 0u32;
109+
while let Some(e) = rx.recv().await? {
110+
assert_eq!(e.seq, seq);
111+
assert_eq!(e.msg, msg);
112+
seq += 1;
113+
114+
for _i in 0..10 {
115+
let tx = tx.clone();
116+
tasks.push(tokio::spawn(
117+
async move { tx.send(&empty::Empty::new()).await },
118+
));
119+
}
120+
}
121+
122+
for t in tasks {
123+
t.await.unwrap().map_err(|e| {
124+
::ttrpc::Error::RpcStatus(::ttrpc::get_status(
125+
::ttrpc::Code::UNKNOWN,
126+
e.to_string(),
127+
))
128+
})?;
129+
}
130+
Ok(())
131+
}
132+
}
133+
134+
#[tokio::main(flavor = "current_thread")]
135+
async fn main() {
136+
simple_logging::log_to_stderr(LevelFilter::Info);
137+
138+
let s = Box::new(StreamingService {}) as Box<dyn streaming_ttrpc::Streaming + Send + Sync>;
139+
let s = Arc::new(s);
140+
let service = streaming_ttrpc::create_streaming(s);
141+
142+
utils::remove_if_sock_exist(utils::SOCK_ADDR).unwrap();
143+
144+
let mut server = Server::new()
145+
.bind(utils::SOCK_ADDR)
146+
.unwrap()
147+
.register_service(service);
148+
149+
let mut hangup = signal(SignalKind::hangup()).unwrap();
150+
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
151+
server.start().await.unwrap();
152+
153+
tokio::select! {
154+
_ = hangup.recv() => {
155+
// test stop_listen -> start
156+
info!("stop listen");
157+
server.stop_listen().await;
158+
info!("start listen");
159+
server.start().await.unwrap();
160+
161+
// hold some time for the new test connection.
162+
sleep(std::time::Duration::from_secs(100)).await;
163+
}
164+
_ = interrupt.recv() => {
165+
// test graceful shutdown
166+
info!("graceful shutdown");
167+
server.shutdown().await.unwrap();
168+
}
169+
};
170+
}

0 commit comments

Comments
 (0)