Skip to content

Commit cf39556

Browse files
authored
Merge pull request vert-x3#260 from EmadAlblueshi/custom-read-write-stream
ReadStream and WriteStream examples
2 parents 972c2c8 + 189fd48 commit cf39556

File tree

5 files changed

+381
-0
lines changed

5 files changed

+381
-0
lines changed

core-examples/README.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,3 +497,24 @@ vertx run script.groovy
497497
A simple example illustrating how to use the streaming `JsonParser` to parse a giant array of small objects.
498498

499499
link:src/main/java/io/vertx/example/core/jsonstreaming/JsonStreamingExample.java[Java verticle parsing a giant JSON array in a non blocking way]
500+
501+
== Custom ReadStream and WriteStream implementation
502+
503+
An example illustrating how to create your custom prefix length protocol to read and write objects in wire. The example
504+
uses link:src/main/java/io/vertx/example/core/net/stream/Batch.java[Batch] object,
505+
link:src/main/java/io/vertx/example/core/net/stream/BatchStream.java[ReadStream] and
506+
link:src/main/java/io/vertx/example/core/net/stream/BatchStream.java[WriteStream] implementation.
507+
508+
The protocol structure for link:src/main/java/io/vertx/example/core/net/stream/Batch.java[Batch] object is simple as
509+
illustrated below:
510+
511+
```
512+
Length : uInt32
513+
Type : byte ['O' for JsonObject | 'A' for JsonArray | 'B' for Buffer]
514+
Payload : Buffer
515+
```
516+
517+
The link:src/main/java/io/vertx/example/core/net/stream/Server.java[NetServer] will receive the objects and write them
518+
back to the
519+
link:src/main/java/io/vertx/example/core/net/stream/Client.java[NetClient].
520+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.vertx.example.core.net.stream;
2+
3+
import io.vertx.core.buffer.Buffer;
4+
import io.vertx.core.json.JsonArray;
5+
import io.vertx.core.json.JsonObject;
6+
7+
/*
8+
*
9+
* Batch protocol uses prefix length structure
10+
*
11+
* Message Length : int
12+
* Message Type : byte('O') for JsonObject | byte('A') for JsonArray | byte('B') for Buffer
13+
* Message Payload : byte[]
14+
*
15+
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
16+
*/
17+
18+
public class Batch {
19+
20+
private final char type;
21+
private final Buffer buffer;
22+
23+
public Batch(JsonObject jsonObject) {
24+
this.buffer = jsonObject.toBuffer();
25+
this.type = 'O';
26+
}
27+
28+
public Batch(JsonArray jsonArray) {
29+
this.buffer = jsonArray.toBuffer();
30+
this.type = 'A';
31+
}
32+
33+
public Batch(Buffer buffer) {
34+
this.buffer = buffer;
35+
this.type = 'B';
36+
}
37+
38+
public boolean isArray() {
39+
return type == 'A';
40+
}
41+
42+
public boolean isObject() {
43+
return type == 'O';
44+
}
45+
46+
public boolean isBuffer() {
47+
return type == 'B';
48+
}
49+
50+
public char getType() {
51+
return type;
52+
}
53+
54+
public Buffer getBuffer() {
55+
return isBuffer() ? buffer : null;
56+
}
57+
58+
public JsonObject getJsonObject() {
59+
return isObject() ? buffer.toJsonObject() : null;
60+
}
61+
62+
public JsonArray getJsonArray() {
63+
return isArray() ? buffer.toJsonArray() : null;
64+
}
65+
66+
public Buffer getRaw() {
67+
return buffer;
68+
}
69+
70+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package io.vertx.example.core.net.stream;
2+
3+
import io.vertx.core.Handler;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.parsetools.RecordParser;
6+
import io.vertx.core.streams.ReadStream;
7+
import io.vertx.core.streams.WriteStream;
8+
9+
import java.util.Objects;
10+
11+
/*
12+
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
13+
*/
14+
15+
public class BatchStream implements ReadStream<Batch>, WriteStream<Batch> {
16+
17+
private final RecordParser recordParser;
18+
private final WriteStream<Buffer> writeStream;
19+
private int size = -1;
20+
private Handler<Throwable> exceptionHandler;
21+
22+
public BatchStream(ReadStream<Buffer> rs, WriteStream<Buffer> ws) {
23+
Objects.requireNonNull(rs, "ReadStream");
24+
Objects.requireNonNull(ws, "WriteStream");
25+
recordParser = RecordParser.newFixed(4, rs);
26+
writeStream = ws;
27+
// Propagate exceptions to the current stream
28+
recordParser.exceptionHandler(throwable -> {
29+
if (exceptionHandler != null) {
30+
exceptionHandler.handle(throwable);
31+
}
32+
});
33+
writeStream.exceptionHandler(throwable -> {
34+
if (exceptionHandler != null) {
35+
exceptionHandler.handle(throwable);
36+
}
37+
});
38+
}
39+
40+
@Override
41+
public BatchStream exceptionHandler(Handler<Throwable> handler) {
42+
exceptionHandler = handler;
43+
return this;
44+
}
45+
46+
@Override
47+
public BatchStream write(Batch batch) {
48+
if (batch == null) {
49+
if (exceptionHandler != null) {
50+
exceptionHandler.handle(new NullPointerException());
51+
}
52+
} else {
53+
Buffer protocol = Buffer.buffer();
54+
protocol.appendInt(0);
55+
protocol.appendByte((byte) batch.getType());
56+
protocol.appendBuffer(batch.getRaw());
57+
protocol.setInt(0, protocol.length() - 4);
58+
writeStream.write(protocol);
59+
}
60+
return this;
61+
}
62+
63+
@Override
64+
public void end() {
65+
writeStream.end();
66+
}
67+
68+
@Override
69+
public BatchStream setWriteQueueMaxSize(int maxSize) {
70+
writeStream.setWriteQueueMaxSize(maxSize);
71+
return this;
72+
}
73+
74+
@Override
75+
public boolean writeQueueFull() {
76+
return writeStream.writeQueueFull();
77+
}
78+
79+
@Override
80+
public BatchStream drainHandler(Handler<Void> handler) {
81+
writeStream.drainHandler(handler);
82+
return this;
83+
}
84+
85+
@Override
86+
public BatchStream handler(Handler<Batch> handler) {
87+
if (handler == null) {
88+
recordParser.handler(null);
89+
recordParser.exceptionHandler(null);
90+
recordParser.endHandler(null);
91+
return this;
92+
}
93+
recordParser.handler(buffer -> {
94+
try {
95+
// Message size mode
96+
if (size == -1) {
97+
size = buffer.getInt(0);
98+
recordParser.fixedSizeMode(size);
99+
// Message body mode
100+
} else {
101+
size = -1;
102+
recordParser.fixedSizeMode(4);
103+
// Batch message type
104+
final char type = (char) buffer.getByte(0);
105+
// Batch message data (Buffer)
106+
final Buffer payload = buffer.getBuffer(1, buffer.length());
107+
switch (type) {
108+
// JsonObject
109+
case 'O': {
110+
handler.handle(new Batch(payload.toJsonObject()));
111+
break;
112+
}
113+
// JsonArray
114+
case 'A': {
115+
handler.handle(new Batch(payload.toJsonArray()));
116+
break;
117+
}
118+
// Buffer
119+
case 'B': {
120+
handler.handle(new Batch(payload));
121+
break;
122+
} // Invalid
123+
default: {
124+
if (exceptionHandler != null) {
125+
exceptionHandler.handle(new IllegalStateException("Invalid message " + type));
126+
}
127+
}
128+
}
129+
}
130+
} catch (Throwable throwable) {
131+
if (exceptionHandler != null) {
132+
exceptionHandler.handle(throwable);
133+
}
134+
}
135+
});
136+
return this;
137+
}
138+
139+
@Override
140+
public BatchStream pause() {
141+
recordParser.pause();
142+
return this;
143+
}
144+
145+
@Override
146+
public BatchStream resume() {
147+
recordParser.resume();
148+
return this;
149+
}
150+
151+
@Override
152+
public BatchStream endHandler(Handler<Void> endHandler) {
153+
recordParser.endHandler(endHandler);
154+
return this;
155+
}
156+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.vertx.example.core.net.stream;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.core.buffer.Buffer;
5+
import io.vertx.core.json.JsonArray;
6+
import io.vertx.core.json.JsonObject;
7+
import io.vertx.core.net.NetSocket;
8+
import io.vertx.example.util.Runner;
9+
10+
import java.time.Instant;
11+
import java.util.UUID;
12+
13+
/*
14+
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
15+
*/
16+
public class Client extends AbstractVerticle {
17+
18+
// Convenience method so you can run it in your IDE
19+
public static void main(String[] args) {
20+
Runner.runExample(Client.class);
21+
}
22+
23+
@Override
24+
public void start() throws Exception {
25+
vertx.createNetClient().connect(1234, "localhost", ar -> {
26+
if (ar.succeeded()) {
27+
28+
NetSocket socket = ar.result();
29+
30+
// Create batch stream for reading and writing
31+
BatchStream batchStream = new BatchStream(socket, socket);
32+
33+
// Pause reading data
34+
batchStream.pause();
35+
36+
// Register read stream handler
37+
batchStream.handler(batch -> {
38+
System.out.println("Client Received : " + batch.getRaw().toString());
39+
}).endHandler(v -> batchStream.end())
40+
.exceptionHandler(t -> {
41+
t.printStackTrace();
42+
batchStream.end();
43+
});
44+
45+
// Resume reading data
46+
batchStream.resume();
47+
48+
// JsonObject
49+
JsonObject jsonObject = new JsonObject()
50+
.put("id", UUID.randomUUID().toString())
51+
.put("name", "Vert.x")
52+
.put("timestamp", Instant.now());
53+
54+
// JsonArray
55+
JsonArray jsonArray = new JsonArray()
56+
.add(UUID.randomUUID().toString())
57+
.add("Vert.x")
58+
.add(Instant.now());
59+
60+
// Buffer
61+
Buffer buffer = Buffer.buffer("Vert.x is awesome!");
62+
63+
// Write to socket
64+
batchStream.write(new Batch(jsonObject));
65+
batchStream.write(new Batch(jsonArray));
66+
batchStream.write(new Batch(buffer));
67+
68+
} else {
69+
System.out.println("Failed to connect " + ar.cause());
70+
}
71+
});
72+
}
73+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.vertx.example.core.net.stream;
2+
3+
import io.vertx.core.AbstractVerticle;
4+
import io.vertx.example.util.Runner;
5+
6+
/*
7+
* @author <a href="mailto:[email protected]">Emad Alblueshi</a>
8+
*/
9+
public class Server extends AbstractVerticle {
10+
11+
// Convenience method so you can run it in your IDE
12+
public static void main(String[] args) {
13+
Runner.runExample(Server.class);
14+
}
15+
16+
@Override
17+
public void start() throws Exception {
18+
vertx.createNetServer().connectHandler(socket -> {
19+
20+
// Create batch stream for reading and writing
21+
BatchStream batchStream = new BatchStream(socket, socket);
22+
23+
// Pause reading data
24+
batchStream.pause();
25+
26+
// Register read stream handler
27+
batchStream.handler(batch -> {
28+
29+
// Print received batch object from the client
30+
System.out.println("Server Received : " + batch.getRaw().toString());
31+
32+
// Write back batch object to the client
33+
batchStream.write(batch);
34+
35+
// Check if write queue is full
36+
if (batchStream.writeQueueFull()) {
37+
38+
// Pause reading data
39+
batchStream.pause();
40+
41+
// Called once write queue is ready to accept more data
42+
batchStream.drainHandler(done -> {
43+
44+
// Resume reading data
45+
batchStream.resume();
46+
47+
});
48+
}
49+
}).endHandler(v -> batchStream.end())
50+
.exceptionHandler(t -> {
51+
t.printStackTrace();
52+
batchStream.end();
53+
});
54+
55+
// Resume reading data
56+
batchStream.resume();
57+
58+
}).listen(1234);
59+
System.out.println("Batch server is now listening to port : 1234");
60+
}
61+
}

0 commit comments

Comments
 (0)