Skip to content

Commit 988796c

Browse files
authored
Expose last parsed delimiter in MessageReader (#58)
Currently, `MessageReader::NextMessage` will advance the buffer past the gRPC delimiter. However, we need the delimiter if we wish to preserve the original message. Instead of having our extraction libraries reform the delimiter, just rely on `MessageReader` to expose the last parsed message.
1 parent 0b74abc commit 988796c

File tree

5 files changed

+52
-15
lines changed

5 files changed

+52
-15
lines changed

script/ci.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@
1717
################################################################################
1818

1919
bazel build //...
20-
bazel test //...
20+
bazel test //... --test_output=errors

src/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ cc_library(
214214

215215
cc_library(
216216
name = "transcoder_input_stream",
217-
srcs = [
217+
hdrs = [
218218
"include/grpc_transcoding/transcoder_input_stream.h",
219219
],
220220
includes = [

src/include/grpc_transcoding/message_reader.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,17 @@ namespace grpc {
2525

2626
namespace transcoding {
2727

28+
// The number of bytes in the delimiter for gRPC wire format's
29+
// `Length-Prefixed-Message`.
30+
constexpr size_t kGrpcDelimiterByteSize = 5;
31+
32+
// Return type that contains both the proto message and the preceding gRPC data
33+
// frame.
34+
struct MessageAndGrpcFrame {
35+
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream> message;
36+
unsigned char grpc_frame[kGrpcDelimiterByteSize];
37+
};
38+
2839
// MessageReader helps extract full messages from a ZeroCopyInputStream of
2940
// messages in gRPC wire format (http://www.grpc.io/docs/guides/wire.html). Each
3041
// message is returned in a ZeroCopyInputStream. MessageReader doesn't advance
@@ -70,8 +81,15 @@ class MessageReader {
7081
// of the original ZeroCopyInputStream and the MessageReader relies on
7182
// the caller to advance the stream to the next message before calling
7283
// NextMessage() again.
84+
// NOTE: the caller should check `Status()` is OK after calling this method.
7385
std::unique_ptr<::google::protobuf::io::ZeroCopyInputStream> NextMessage();
7486

87+
// An overload that also outputs the gRPC message delimiter for the parsed
88+
// message. The caller is free to take ownership of contents in `grpc_frame`.
89+
// NOTE: the caller must check the `message` is NOT nullptr and the `Status()`
90+
// is OK before consuming the `grpc_frame`.
91+
MessageAndGrpcFrame NextMessageAndGrpcFrame();
92+
7593
::google::protobuf::util::Status Status() const { return status_; }
7694

7795
// Returns true if the stream has ended (this is permanent); otherwise returns
@@ -88,6 +106,8 @@ class MessageReader {
88106
bool finished_;
89107
// Status
90108
::google::protobuf::util::Status status_;
109+
// Buffer to store the current delimiter value.
110+
unsigned char delimiter_[kGrpcDelimiterByteSize];
91111

92112
MessageReader(const MessageReader&) = delete;
93113
MessageReader& operator=(const MessageReader&) = delete;

src/message_reader.cc

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
8585

8686
// Check if we have the current message size. If not try to read it.
8787
if (!have_current_message_size_) {
88-
const size_t kDelimiterSize = 5;
89-
if (in_->BytesAvailable() < static_cast<pb::int64>(kDelimiterSize)) {
88+
if (in_->BytesAvailable()
89+
< static_cast<pb::int64>(kGrpcDelimiterByteSize)) {
9090
// We don't have 5 bytes available to read the length of the message.
9191
// Find out whether the stream is finished and return false.
9292
finished_ = in_->Finished();
@@ -98,21 +98,21 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
9898
return nullptr;
9999
}
100100

101-
// Try to read the delimiter
102-
unsigned char delimiter[kDelimiterSize] = {0};
103-
if (!ReadStream(in_, delimiter, sizeof(delimiter))) {
101+
// Try to read the delimiter.
102+
memset(delimiter_, 0, kGrpcDelimiterByteSize);
103+
if (!ReadStream(in_, delimiter_, kGrpcDelimiterByteSize)) {
104104
finished_ = true;
105105
return nullptr;
106106
}
107107

108-
if (delimiter[0] != 0) {
108+
if (delimiter_[0] != 0) {
109109
status_ = google::protobuf::util::Status(
110110
google::protobuf::util::error::INTERNAL,
111-
"Unsupported gRPC frame flag: " + std::to_string(delimiter[0]));
111+
"Unsupported gRPC frame flag: " + std::to_string(delimiter_[0]));
112112
return nullptr;
113113
}
114114

115-
current_message_size_ = DelimiterToSize(delimiter);
115+
current_message_size_ = DelimiterToSize(delimiter_);
116116
have_current_message_size_ = true;
117117
}
118118

@@ -137,6 +137,13 @@ std::unique_ptr<pbio::ZeroCopyInputStream> MessageReader::NextMessage() {
137137
new pbio::LimitingInputStream(in_, current_message_size_));
138138
}
139139

140+
MessageAndGrpcFrame MessageReader::NextMessageAndGrpcFrame() {
141+
MessageAndGrpcFrame out;
142+
out.message = NextMessage();
143+
memcpy(out.grpc_frame, delimiter_, kGrpcDelimiterByteSize);
144+
return out;
145+
}
146+
140147
} // namespace transcoding
141148

142149
} // namespace grpc

test/message_reader_test.cc

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct ExpectedAt {
4747
// The position in the input, after which this message is expected
4848
size_t at;
4949
std::string message;
50+
std::string delimiter;
5051
};
5152

5253
// MessageReaderTestRun tests a single MessageReader processing the input as
@@ -87,25 +88,33 @@ class MessageReaderTestRun {
8788
// While we still have expected messages before or at the current position
8889
// try to match.
8990
while (next_expected_ != std::end(expected_) &&
90-
next_expected_->at <= position_) {
91+
next_expected_->at <= position_) {
9192
// Must not be finished as we expect a message
9293
if (reader_->Finished()) {
9394
ADD_FAILURE() << "Finished unexpectedly" << std::endl;
9495
return false;
9596
}
9697
// Read the message
97-
auto stream = reader_->NextMessage();
98+
MessageAndGrpcFrame result = reader_->NextMessageAndGrpcFrame();
9899
EXPECT_TRUE(reader_->Status().ok());
99-
if (!stream) {
100+
if (!result.message) {
100101
ADD_FAILURE() << "No message available" << std::endl;
101102
return false;
102103
}
103104
// Match the message with the expected message
104-
auto message = ReadAllFromStream(stream.get());
105+
auto message = ReadAllFromStream(result.message.get());
105106
if (next_expected_->message != message) {
106107
EXPECT_EQ(next_expected_->message, message);
107108
return false;
108109
}
110+
// Match the delimiter.
111+
std::string delimiter_string =
112+
std::string(reinterpret_cast<const char*>(result.grpc_frame),
113+
kGrpcDelimiterByteSize);
114+
if (delimiter_string != next_expected_->delimiter) {
115+
EXPECT_EQ(delimiter_string, next_expected_->delimiter);
116+
return false;
117+
}
109118
// Move to the next expected message
110119
++next_expected_;
111120
}
@@ -150,7 +159,8 @@ class MessageReaderTestCase {
150159
input_ += message;
151160
// Remember that we should expect this message after input_.size() bytes
152161
// are processed.
153-
expected_.emplace_back(ExpectedAt{input_.size(), message});
162+
expected_.emplace_back(ExpectedAt{input_.size(), message,
163+
SizeToDelimiter(message.size())});
154164
}
155165
}
156166

0 commit comments

Comments
 (0)