Skip to content

Commit 17e8a5c

Browse files
committed
Add SSE style delimiting for message streaming
1 parent 8f0d092 commit 17e8a5c

File tree

3 files changed

+113
-7
lines changed

3 files changed

+113
-7
lines changed

src/include/grpc_transcoding/response_to_json_translator.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ struct JsonResponseTranslateOptions {
7070
// If set to false, all streaming messages are treated as a JSON array and
7171
// separated by comma.
7272
bool stream_newline_delimited;
73+
74+
// If true, enforces Server-Sent Events (SSE) message framing (`data: <message>\n\n`)
75+
// and, `stream_newline_delimited` is ignored.
76+
// If false, message framing is determined by `stream_newline_delimited`.
77+
bool stream_sse_style_delimited;
7378
};
7479

7580
class ResponseToJsonTranslator : public MessageStream {
@@ -84,7 +89,7 @@ class ResponseToJsonTranslator : public MessageStream {
8489
::google::protobuf::util::TypeResolver* type_resolver,
8590
std::string type_url, bool streaming, TranscoderInputStream* in,
8691
const JsonResponseTranslateOptions& options = {
87-
::google::protobuf::util::JsonPrintOptions(), false});
92+
::google::protobuf::util::JsonPrintOptions(), false, false});
8893

8994
// MessageStream implementation
9095
bool NextMessage(std::string* message);

src/response_to_json_translator.cc

+32-6
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ bool ResponseToJsonTranslator::NextMessage(std::string* message) {
6565
return false;
6666
}
6767
} else if (streaming_ && reader_.Finished()) {
68-
if (!options_.stream_newline_delimited) {
69-
// This is a non-newline-delimited streaming call and the input is
70-
// finished. Return the final ']'
71-
// or "[]" in case this was an empty stream.
68+
if (!options_.stream_newline_delimited &&
69+
!options_.stream_sse_style_delimited) {
70+
// This is a non-newline-delimited and non-SSE-style-delimited streaming
71+
// call and the input is finished. Return the final ']' or "[]" in case
72+
// this was an empty stream.
7273
*message = first_ ? "[]" : "]";
7374
}
7475
finished_ = true;
@@ -95,14 +96,39 @@ bool WriteChar(::google::protobuf::io::ZeroCopyOutputStream* stream, char c) {
9596
return true;
9697
}
9798

99+
// A helper to write a string to a ZeroCopyOutputStream.
100+
bool WriteString(::google::protobuf::io::ZeroCopyOutputStream* stream, std::string str) {
101+
for (auto c : str) {
102+
if (!WriteChar(stream, c)) {
103+
return false;
104+
}
105+
}
106+
return true;
107+
}
108+
98109
} // namespace
99110

100111
bool ResponseToJsonTranslator::TranslateMessage(
101112
::google::protobuf::io::ZeroCopyInputStream* proto_in,
102113
std::string* json_out) {
103114
::google::protobuf::io::StringOutputStream json_stream(json_out);
104115

105-
if (streaming_ && !options_.stream_newline_delimited) {
116+
if (streaming_ && options_.stream_sse_style_delimited) {
117+
if (first_) {
118+
if (!WriteString(&json_stream, "data: ")) {
119+
status_ = absl::Status(absl::StatusCode::kInternal,
120+
"Failed to build the response message.");
121+
return false;
122+
}
123+
first_ = false;
124+
} else {
125+
if (!WriteString(&json_stream, "\n\ndata: ")) {
126+
status_ = absl::Status(absl::StatusCode::kInternal,
127+
"Failed to build the response message.");
128+
return false;
129+
}
130+
}
131+
} else if (streaming_ && !options_.stream_newline_delimited) {
106132
if (first_) {
107133
// This is a non-newline-delimited streaming call and this is the first
108134
// message, so prepend the
@@ -134,7 +160,7 @@ bool ResponseToJsonTranslator::TranslateMessage(
134160
}
135161

136162
// Append a newline delimiter after the message if needed.
137-
if (streaming_ && options_.stream_newline_delimited) {
163+
if (streaming_ && options_.stream_newline_delimited && !options_.stream_sse_style_delimited) {
138164
if (!WriteChar(&json_stream, '\n')) {
139165
status_ = absl::Status(absl::StatusCode::kInternal,
140166
"Failed to build the response message.");

test/response_to_json_translator_test.cc

+75
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,81 @@ TEST_F(ResponseToJsonTranslatorTest, StreamingNewlineDelimitedDirectTest) {
911911
EXPECT_FALSE(translator.NextMessage(&message));
912912
}
913913

914+
TEST_F(ResponseToJsonTranslatorTest, StreamingSSEStyleDelimitedDirectTest) {
915+
// Load the service config
916+
::google::api::Service service;
917+
ASSERT_TRUE(
918+
transcoding::testing::LoadService("bookstore_service.pb.txt", &service));
919+
920+
// Create a TypeHelper using the service config
921+
TypeHelper type_helper(service.types(), service.enums());
922+
923+
// Messages to test
924+
auto test_message1 =
925+
GenerateGrpcMessage<Shelf>(R"(name : "1" theme : "Fiction")");
926+
auto test_message2 =
927+
GenerateGrpcMessage<Shelf>(R"(name : "2" theme : "Fantasy")");
928+
auto test_message3 =
929+
GenerateGrpcMessage<Shelf>(R"(name : "3" theme : "Children")");
930+
auto test_message4 =
931+
GenerateGrpcMessage<Shelf>(R"(name : "4" theme : "Classics")");
932+
933+
TestZeroCopyInputStream input_stream;
934+
ResponseToJsonTranslator translator(
935+
type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream,
936+
{pbutil::JsonPrintOptions(), true, true});
937+
938+
std::string message;
939+
// There is nothing translated
940+
EXPECT_FALSE(translator.NextMessage(&message));
941+
942+
// Add test_message1 to the stream
943+
input_stream.AddChunk(test_message1);
944+
945+
// Now we should have the test_message1 translated
946+
EXPECT_TRUE(translator.NextMessage(&message));
947+
EXPECT_EQ("data: {\"name\":\"1\",\"theme\":\"Fiction\"}", message);
948+
949+
// No more messages, but not finished yet
950+
EXPECT_FALSE(translator.NextMessage(&message));
951+
EXPECT_FALSE(translator.Finished());
952+
953+
// Add the test_message2, test_message3 and part of test_message4
954+
input_stream.AddChunk(test_message2);
955+
input_stream.AddChunk(test_message3);
956+
input_stream.AddChunk(test_message4.substr(0, 10));
957+
958+
// Now we should have test_message2 & test_message3 translated
959+
EXPECT_TRUE(translator.NextMessage(&message));
960+
EXPECT_EQ("\n\ndata: {\"name\":\"2\",\"theme\":\"Fantasy\"}", message);
961+
962+
EXPECT_TRUE(translator.NextMessage(&message));
963+
EXPECT_EQ("\n\ndata: {\"name\":\"3\",\"theme\":\"Children\"}", message);
964+
965+
// No more messages, but not finished yet
966+
EXPECT_FALSE(translator.NextMessage(&message));
967+
EXPECT_FALSE(translator.Finished());
968+
969+
// Add the rest of test_message4
970+
input_stream.AddChunk(test_message4.substr(10));
971+
972+
// Now we should have the test_message4 translated
973+
EXPECT_TRUE(translator.NextMessage(&message));
974+
EXPECT_EQ("\n\ndata: {\"name\":\"4\",\"theme\":\"Classics\"}", message);
975+
976+
// No more messages, but not finished yet
977+
EXPECT_FALSE(translator.NextMessage(&message));
978+
EXPECT_FALSE(translator.Finished());
979+
980+
// Now finish the stream
981+
input_stream.Finish();
982+
983+
// All done!
984+
EXPECT_TRUE(translator.NextMessage(&message));
985+
EXPECT_TRUE(translator.Finished());
986+
EXPECT_FALSE(translator.NextMessage(&message));
987+
}
988+
914989
TEST_F(ResponseToJsonTranslatorTest, Streaming5KMessages) {
915990
// Load the service config
916991
::google::api::Service service;

0 commit comments

Comments
 (0)