Skip to content

Commit 935df6c

Browse files
authored
WS pubsub example. (#126)
* WS pubsub example. * Fixing logs. * Updating examples to use explicit imports.
1 parent f8501f8 commit 935df6c

File tree

16 files changed

+249
-28
lines changed

16 files changed

+249
-28
lines changed

http/examples/http_async.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate jsonrpc_http_server;
44

55
use jsonrpc_core::*;
66
use jsonrpc_core::futures::Future;
7-
use jsonrpc_http_server::*;
7+
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin};
88

99
fn main() {
1010
let mut io = IoHandler::default();

http/examples/http_middleware.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate jsonrpc_http_server;
44

55
use jsonrpc_core::{IoHandler, Value};
66
use jsonrpc_core::futures::{self, Future};
7-
use jsonrpc_http_server::*;
7+
use jsonrpc_http_server::{hyper, ServerBuilder, DomainsValidation, AccessControlAllowOrigin, Response};
88

99
fn main() {
1010
let mut io = IoHandler::default();

http/examples/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ extern crate jsonrpc_core;
22
extern crate jsonrpc_http_server;
33

44
use jsonrpc_core::*;
5-
use jsonrpc_http_server::*;
5+
use jsonrpc_http_server::{ServerBuilder, DomainsValidation, AccessControlAllowOrigin};
66

77
fn main() {
88
let mut io = IoHandler::default();

minihttp/examples/http_async.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate jsonrpc_minihttp_server;
44

55
use jsonrpc_core::*;
66
use jsonrpc_core::futures::Future;
7-
use jsonrpc_minihttp_server::*;
7+
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation};
88

99
fn main() {
1010
let mut io = IoHandler::default();

minihttp/examples/http_meta.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ extern crate jsonrpc_minihttp_server;
33

44
use jsonrpc_core::*;
55
use jsonrpc_core::futures::Future;
6-
use jsonrpc_minihttp_server::*;
6+
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation, Req};
77

88
#[derive(Clone, Default)]
99
struct Meta(usize);

minihttp/examples/server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ extern crate jsonrpc_core;
22
extern crate jsonrpc_minihttp_server;
33

44
use jsonrpc_core::*;
5-
use jsonrpc_minihttp_server::*;
5+
use jsonrpc_minihttp_server::{cors, ServerBuilder, DomainsValidation};
66

77
fn main() {
88
let mut io = IoHandler::default();

pubsub/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ jsonrpc-core = { version = "7.0", path = "../core" }
1616

1717
[dev-dependencies]
1818
jsonrpc-tcp-server = { version = "7.0", path = "../tcp" }
19+
jsonrpc-ws-server = { version = "7.0", path = "../ws" }

pubsub/examples/pubsub.rs

+9-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use std::{time, thread};
66
use std::sync::Arc;
77

88
use jsonrpc_core::*;
9-
use jsonrpc_pubsub::*;
10-
use jsonrpc_tcp_server::*;
9+
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId};
10+
use jsonrpc_tcp_server::{ServerBuilder, RequestContext};
1111

1212
use jsonrpc_core::futures::Future;
1313

@@ -31,6 +31,13 @@ impl PubSubMetadata for Meta {
3131
}
3232
}
3333

34+
/// To test the server:
35+
///
36+
/// ```bash
37+
/// $ netcat localhost 3030 -
38+
/// {"id":1,"jsonrpc":"2.0","method":"hello_subscribe","params":[10]}
39+
///
40+
/// ```
3441
fn main() {
3542
let mut io = PubSubHandler::new(MetaIoHandler::default());
3643
io.add_method("say_hello", |_params: Params| {

pubsub/examples/pubsub_ws.rs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
extern crate jsonrpc_core;
2+
extern crate jsonrpc_pubsub;
3+
extern crate jsonrpc_ws_server;
4+
5+
use std::{time, thread};
6+
use std::sync::Arc;
7+
8+
use jsonrpc_core::*;
9+
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, Subscriber, SubscriptionId};
10+
use jsonrpc_ws_server::{ServerBuilder, RequestContext};
11+
12+
use jsonrpc_core::futures::Future;
13+
14+
#[derive(Clone)]
15+
struct Meta {
16+
session: Option<Arc<Session>>,
17+
}
18+
19+
impl Default for Meta {
20+
fn default() -> Self {
21+
Meta {
22+
session: None,
23+
}
24+
}
25+
}
26+
27+
impl Metadata for Meta {}
28+
impl PubSubMetadata for Meta {
29+
fn session(&self) -> Option<Arc<Session>> {
30+
self.session.clone()
31+
}
32+
}
33+
34+
/// Use following node.js code to test:
35+
///
36+
/// ```js
37+
/// const WebSocket = require('websocket').w3cwebsocket;
38+
///
39+
/// const ws = new WebSocket('ws://localhost:3030');
40+
/// ws.addEventListener('open', () => {
41+
/// console.log('Sending request');
42+
///
43+
/// ws.send(JSON.stringify({
44+
/// jsonrpc: "2.0",
45+
/// id: 1,
46+
/// method: "subscribe_hello",
47+
/// params: [],
48+
/// }));
49+
/// });
50+
///
51+
/// ws.addEventListener('message', (message) => {
52+
/// console.log('Received: ', message.data);
53+
/// });
54+
///
55+
/// console.log('Starting');
56+
/// ```
57+
fn main() {
58+
let mut io = PubSubHandler::new(MetaIoHandler::default());
59+
io.add_method("say_hello", |_params: Params| {
60+
Ok(Value::String("hello".to_string()))
61+
});
62+
63+
io.add_subscription(
64+
"hello",
65+
("subscribe_hello", |params: Params, _, subscriber: Subscriber| {
66+
if params != Params::None {
67+
subscriber.reject(Error {
68+
code: ErrorCode::ParseError,
69+
message: "Invalid parameters. Subscription rejected.".into(),
70+
data: None,
71+
}).unwrap();
72+
return;
73+
}
74+
75+
let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
76+
// or subscriber.reject(Error {} );
77+
// or drop(subscriber)
78+
thread::spawn(move || {
79+
loop {
80+
thread::sleep(time::Duration::from_millis(1000));
81+
match sink.notify(Params::Array(vec![Value::Number(10.into())])).wait() {
82+
Ok(_) => {},
83+
Err(_) => {
84+
println!("Subscription has ended, finishing.");
85+
break;
86+
}
87+
}
88+
}
89+
});
90+
}),
91+
("remove_hello", |_id: SubscriptionId| -> futures::BoxFuture<Value, Error> {
92+
println!("Closing subscription");
93+
futures::future::ok(Value::Bool(true)).boxed()
94+
}),
95+
);
96+
97+
let server = ServerBuilder::new(io)
98+
.session_meta_extractor(|context: &RequestContext| {
99+
Meta {
100+
session: Some(Arc::new(Session::new(context.sender()))),
101+
}
102+
})
103+
.start(&"127.0.0.1:3030".parse().unwrap())
104+
.expect("Unable to start RPC server");
105+
106+
let _ = server.wait();
107+
}

pubsub/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ mod handler;
1212
mod subscription;
1313
mod types;
1414

15-
1615
pub use self::handler::{PubSubHandler, SubscribeRpcMethod, UnsubscribeRpcMethod};
1716
pub use self::subscription::{Session, Sink, Subscriber, new_subscription};
1817
pub use self::types::{PubSubMetadata, SubscriptionId, TransportError, SinkResult};

pubsub/src/subscription.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl Drop for Session {
8383
#[derive(Debug, Clone)]
8484
pub struct Sink {
8585
notification: String,
86-
transport: TransportSender
86+
transport: TransportSender,
8787
}
8888

8989
impl Sink {

tcp/examples/tcp.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ extern crate jsonrpc_core;
22
extern crate jsonrpc_tcp_server;
33

44
use jsonrpc_core::*;
5-
use jsonrpc_tcp_server::*;
5+
use jsonrpc_tcp_server::ServerBuilder;
66

77
fn main() {
88
let mut io = IoHandler::default();

ws/examples/ws.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ extern crate jsonrpc_core;
22
extern crate jsonrpc_ws_server;
33

44
use jsonrpc_core::*;
5-
use jsonrpc_ws_server::*;
5+
use jsonrpc_ws_server::ServerBuilder;
66

77
fn main() {
88
let mut io = IoHandler::default();

ws/src/lib.rs

-1
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,3 @@ pub use self::server_builder::{ServerBuilder, Error};
2222
pub use self::server_utils::cors::Origin;
2323
pub use self::server_utils::hosts::{Host, DomainsValidation};
2424
pub use self::server_utils::tokio_core;
25-

ws/src/metadata.rs

+105-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,64 @@
11
use std::fmt;
2+
use std::sync::{atomic, Arc};
23

3-
use core;
4+
use core::{self, futures};
5+
use core::futures::sync::mpsc;
6+
use server_utils::tokio_core::reactor::Remote;
47
use ws;
58

69
use session;
710
use Origin;
811

12+
/// Output of WebSocket connection. Use this to send messages to the other endpoint.
13+
#[derive(Clone)]
14+
pub struct Sender {
15+
out: ws::Sender,
16+
active: Arc<atomic::AtomicBool>,
17+
}
18+
19+
impl Sender {
20+
/// Creates a new `Sender`.
21+
pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
22+
Sender {
23+
out: out,
24+
active: active,
25+
}
26+
}
27+
28+
fn check_active(&self) -> ws::Result<()> {
29+
if self.active.load(atomic::Ordering::SeqCst) {
30+
Ok(())
31+
} else {
32+
Err(ws::Error::new(ws::ErrorKind::Internal, "Attempting to send a message to closed connection."))
33+
}
34+
}
35+
36+
/// Sends a message over the connection.
37+
/// Will return error if the connection is not active any more.
38+
pub fn send<M>(&self, msg: M) -> ws::Result<()> where
39+
M: Into<ws::Message>
40+
{
41+
self.check_active()?;
42+
self.out.send(msg)
43+
}
44+
45+
/// Sends a message over the endpoints of all connections.
46+
/// Will return error if the connection is not active any more.
47+
pub fn broadcast<M>(&self, msg: M) -> ws::Result<()> where
48+
M: Into<ws::Message>
49+
{
50+
self.check_active()?;
51+
self.out.broadcast(msg)
52+
}
53+
54+
/// Sends a close code to the other endpoint.
55+
/// Will return error if the connection is not active any more.
56+
pub fn close(&self, code: ws::CloseCode) -> ws::Result<()> {
57+
self.check_active()?;
58+
self.out.close(code)
59+
}
60+
}
61+
962
/// Request context
1063
pub struct RequestContext {
1164
/// Session id
@@ -15,7 +68,20 @@ pub struct RequestContext {
1568
/// Requested protocols
1669
pub protocols: Vec<String>,
1770
/// Direct channel to send messages to a client.
18-
pub out: ws::Sender,
71+
pub out: Sender,
72+
/// Remote to underlying event loop.
73+
pub remote: Remote,
74+
}
75+
76+
impl RequestContext {
77+
/// Get this session as a `Sink` spawning a new future
78+
/// in the underlying event loop.
79+
pub fn sender(&self) -> mpsc::Sender<String> {
80+
let out = self.out.clone();
81+
let (sender, receiver) = mpsc::channel(1);
82+
self.remote.spawn(move |_| SenderFuture(out, receiver));
83+
sender
84+
}
1985
}
2086

2187
impl fmt::Debug for RequestContext {
@@ -36,7 +102,44 @@ pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
36102
}
37103
}
38104

105+
impl<M, F> MetaExtractor<M> for F where
106+
M: core::Metadata,
107+
F: Fn(&RequestContext) -> M + Send + Sync + 'static,
108+
{
109+
fn extract(&self, context: &RequestContext) -> M {
110+
(*self)(context)
111+
}
112+
}
113+
39114
/// Dummy metadata extractor
40115
#[derive(Clone)]
41116
pub struct NoopExtractor;
42117
impl<M: core::Metadata> MetaExtractor<M> for NoopExtractor {}
118+
119+
struct SenderFuture(Sender, mpsc::Receiver<String>);
120+
impl futures::Future for SenderFuture {
121+
type Item = ();
122+
type Error = ();
123+
124+
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
125+
use self::futures::Stream;
126+
127+
loop {
128+
let item = self.1.poll()?;
129+
match item {
130+
futures::Async::NotReady => {
131+
return Ok(futures::Async::NotReady);
132+
},
133+
futures::Async::Ready(None) => {
134+
return Ok(futures::Async::Ready(()));
135+
},
136+
futures::Async::Ready(Some(val)) => {
137+
if let Err(e) = self.0.send(val) {
138+
warn!("Error sending a subscription update: {:?}", e);
139+
return Ok(futures::Async::Ready(()));
140+
}
141+
},
142+
}
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)