-
Notifications
You must be signed in to change notification settings - Fork 753
/
Copy pathV1Decoder.cs
178 lines (142 loc) · 6.65 KB
/
V1Decoder.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2015 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed
// paradigm, which qualifies it to be parsed by this class.
// On the other hand, XML-based transports (like XMPP or SOAP) don't allow
// for knowing the size of data to read in advance and should use different
// decoding algorithms.
//
// This class , the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions.
namespace NetMQ.Core.Transports
{
internal class V1Decoder : DecoderBase
{
private const int OneByteSizeReadyState = 0;
private const int EightByteSizeReadyState = 1;
private const int FlagsReadyState = 2;
private const int MessageReadyState = 3;
private readonly ByteArraySegment m_tmpbuf;
private Msg m_inProgress;
/// <summary>
/// The maximum message-size. If this is -1 then there is no maximum.
/// </summary>
private readonly long m_maxMessageSize;
/// <summary>
/// Create a new V1Decoder with the given buffer-size, maximum-message-size and Endian-ness.
/// </summary>
/// <param name="bufsize">the buffer-size to give the contained buffer</param>
/// <param name="maxMessageSize">the maximum message size. -1 indicates no limit.</param>
/// <param name="endian">the Endianness to specify for it - either Big or Little</param>
public V1Decoder(int bufsize, long maxMessageSize, Endianness endian)
: base(bufsize, endian)
{
m_maxMessageSize = maxMessageSize;
m_tmpbuf = new ByteArraySegment(new byte[8]);
// At the beginning, read one byte and go to one_byte_size_ready state.
NextStep(m_tmpbuf, 1, OneByteSizeReadyState);
m_inProgress = new Msg();
m_inProgress.InitEmpty();
}
protected override DecodeResult Next()
{
switch (State)
{
case OneByteSizeReadyState:
return OneByteSizeReady();
case EightByteSizeReadyState:
return EightByteSizeReady();
case FlagsReadyState:
return FlagsReady();
case MessageReadyState:
return MessageReady();
default:
return DecodeResult.Error;
}
}
private DecodeResult OneByteSizeReady()
{
m_tmpbuf.Reset();
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
// message data into it.
byte first = m_tmpbuf[0];
if (first == 0xff)
{
NextStep(m_tmpbuf, 8, EightByteSizeReadyState);
}
else
{
// There has to be at least one byte (the flags) in the message).
if (first == 0)
return DecodeResult.Error;
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
if (m_maxMessageSize >= 0 && (first - 1) > m_maxMessageSize)
return DecodeResult.Error;
m_inProgress.InitPool(first - 1);
NextStep(m_tmpbuf, 1, FlagsReadyState);
}
return DecodeResult.Processing;
}
private DecodeResult EightByteSizeReady()
{
m_tmpbuf.Reset();
// 8-byte payload length is read. Allocate the buffer
// for message body and read the message data into it.
ulong payloadLength = m_tmpbuf.GetUnsignedLong(Endian, 0);
// There has to be at least one byte (the flags) in the message).
if (payloadLength == 0)
return DecodeResult.Error;
// Message size must not exceed the maximum allowed size.
if (m_maxMessageSize >= 0 && payloadLength - 1 > (ulong)m_maxMessageSize)
return DecodeResult.Error;
// TODO: move this constant to a good place (0x7FFFFFC7)
// Message size must fit within range of size_t data type.
if (payloadLength - 1 > 0x7FFFFFC7)
return DecodeResult.Error;
int msgSize = (int)(payloadLength - 1);
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
m_inProgress.InitPool(msgSize);
NextStep(m_tmpbuf, 1, FlagsReadyState);
return DecodeResult.Processing;
}
private DecodeResult FlagsReady()
{
m_tmpbuf.Reset();
// Store the flags from the wire into the message structure.
int first = m_tmpbuf[0];
m_inProgress.SetFlags((MsgFlags)first & MsgFlags.More);
Assumes.NotNull(m_inProgress.UnsafeData);
NextStep(new ByteArraySegment(m_inProgress.UnsafeData, m_inProgress.UnsafeOffset),
m_inProgress.Size, MessageReadyState);
return DecodeResult.Processing;
}
private DecodeResult MessageReady()
{
m_tmpbuf.Reset();
NextStep(m_tmpbuf, 1, OneByteSizeReadyState);
return DecodeResult.MessageReady;
}
public override PushMsgResult PushMsg(ProcessMsgDelegate sink) => sink(ref m_inProgress);
}
}