17
17
/* ============================================================================
18
18
The protocol for carrying packets on a character stream:
19
19
20
- The protocol consists of the XML text to be transported with a bare
20
+ The protocol consists of the actual bytes to be transported with a bare
21
21
minimum of framing information added:
22
22
23
23
An ASCII Escape (<ESC> == 0x1B) character marks the start of a
24
24
4-ASCII-character control word. These are defined:
25
25
26
- <ESC>PKT : marks the end of a packet.
26
+ <ESC>PKT : marks the beginning of a packet.
27
+ <ESC>END : marks the end of a packet.
27
28
<ESC>ESC : represents an <ESC> character in the packet
28
29
<ESC>NOP : no meaning
29
30
30
- Any other bytes after <ESC> is a protocol error. End of
31
- stream anywhere but after <ESC>PKT or beginning of stream is
32
- a protocol error.
31
+ Any other bytes after <ESC> is a protocol error.
32
+
33
+ A stream is all the data transmitted during a single socket
34
+ connection.
35
+
36
+ End of stream in the middle of a packet is a protocol error.
33
37
34
38
All bytes not part of a control word are literal bytes of a packet.
35
39
36
- You can create a pstream transport from any file descriptor from which
40
+ You can create a packet socket from any file descriptor from which
37
41
you can read and write a bidirectional character stream. Typically,
38
42
it's a TCP socket.
39
43
40
44
One use of the NOP control word is to validate that the connection
41
45
is still working. You might send one periodically to detect, for
42
- example, an unplugged TCP/IP network cable.
46
+ example, an unplugged TCP/IP network cable. It's probably better
47
+ to use the TCP keepalive facility for that.
43
48
============================================================================*/
44
49
45
50
#define _BSD_SOURCE // gets uint defined
@@ -174,11 +179,10 @@ packetSocket::packetSocket(int const sockFd) {
174
179
this ->sockFd = dupRc;
175
180
176
181
this ->inEscapeSeq = false ;
182
+ this ->inPacket = false ;
177
183
178
184
this ->escAccum .len = 0 ;
179
185
180
- this ->packetAccumP = packetPtr (new packet);
181
-
182
186
fcntl (this ->sockFd , F_SETFL, O_NONBLOCK);
183
187
184
188
this ->eof = false ;
@@ -225,12 +229,16 @@ writeFd(int const fd,
225
229
void
226
230
packetSocket::writeWait (packetPtr const & packetP) const {
227
231
228
- const unsigned char * const packetMarker (
232
+ const unsigned char * const packetStart (
229
233
reinterpret_cast <const unsigned char *>(ESC_STR " PKT" ));
234
+ const unsigned char * const packetEnd (
235
+ reinterpret_cast <const unsigned char *>(ESC_STR " END" ));
236
+
237
+ writeFd (this ->sockFd , packetStart, 4 );
230
238
231
239
writeFd (this ->sockFd , packetP->getBytes (), packetP->getLength ());
232
240
233
- writeFd (this ->sockFd , packetMarker , 4 );
241
+ writeFd (this ->sockFd , packetEnd , 4 );
234
242
}
235
243
236
244
@@ -245,24 +253,15 @@ libc_read(int const fd,
245
253
246
254
247
255
248
- void
249
- packetSocket::bufferFinishedPacket () {
250
- /* ----------------------------------------------------------------------------
251
- Assume the packet currently being accumulated (in *this->packetAccumP)
252
- is complete and move it to the packet buffer (this->readBuffer).
253
- -----------------------------------------------------------------------------*/
254
- this ->readBuffer .push (this ->packetAccumP );
255
-
256
- this ->packetAccumP = packetPtr (new packet);
257
- }
258
-
259
-
260
-
261
256
void
262
257
packetSocket::takeSomeEscapeSeq (const unsigned char * const buffer,
263
258
size_t const length,
264
259
size_t * const bytesTakenP) {
265
-
260
+ /* ----------------------------------------------------------------------------
261
+ Take and process some bytes from the incoming stream 'buffer',
262
+ which contains 'length' bytes, assuming they are within an escape
263
+ sequence.
264
+ -----------------------------------------------------------------------------*/
266
265
size_t bytesTaken;
267
266
268
267
bytesTaken = 0 ;
@@ -277,9 +276,20 @@ packetSocket::takeSomeEscapeSeq(const unsigned char * const buffer,
277
276
} else if (xmlrpc_memeq (this ->escAccum .bytes , " NOP" , 3 )) {
278
277
// Nothing to do
279
278
} else if (xmlrpc_memeq (this ->escAccum .bytes , " PKT" , 3 )) {
280
- bufferFinishedPacket ();
279
+ this ->packetAccumP = packetPtr (new packet);
280
+ this ->inPacket = true ;
281
+ } else if (xmlrpc_memeq (this ->escAccum .bytes , " END" , 3 )) {
282
+ if (this ->inPacket ) {
283
+ this ->readBuffer .push (this ->packetAccumP );
284
+ this ->inPacket = false ;
285
+ this ->packetAccumP = packetPtr ();
286
+ } else
287
+ throwf (" END control word received without preceding PKT" );
281
288
} else if (xmlrpc_memeq (this ->escAccum .bytes , " ESC" , 3 )) {
282
- this ->packetAccumP ->addData ((const unsigned char *)ESC_STR, 1 );
289
+ if (this ->inPacket )
290
+ this ->packetAccumP ->addData ((const unsigned char *)ESC_STR, 1 );
291
+ else
292
+ throwf (" ESC control work received outside of a packet" );
283
293
} else
284
294
throwf (" Invalid escape sequence 0x%02x%02x%02x read from "
285
295
" stream socket under packet socket" ,
@@ -311,14 +321,10 @@ packetSocket::takeSomePacket(const unsigned char * const buffer,
311
321
// packet accumulator.
312
322
this ->packetAccumP ->addData (buffer, escOffset);
313
323
314
- this ->inEscapeSeq = true ;
315
-
316
324
// Caller can pick up from here; we don't know nothin' 'bout
317
325
// no escape sequences.
318
326
319
- // +1 below reflects the fact that we're taking the ESC character
320
- // too.
321
- *bytesTakenP = escOffset + 1 ;
327
+ *bytesTakenP = escOffset;
322
328
} else {
323
329
// No complete packet yet and no substitution to do;
324
330
// just throw the whole thing into the accumulator.
@@ -338,14 +344,45 @@ packetSocket::verifyNothingAccumulated() {
338
344
throwf (" Streams socket closed in the middle of an "
339
345
" escape sequence" );
340
346
341
- if (this ->packetAccumP -> getLength () > 0 )
347
+ if (this ->inPacket )
342
348
throwf (" Stream socket closed in the middle of a packet "
343
- " (%u bytes of packet received; no PKT marker to mark "
349
+ " (%u bytes of packet received; no END marker to mark "
344
350
" end of packet)" , this ->packetAccumP ->getLength ());
345
351
}
346
352
347
353
348
354
355
+ void
356
+ packetSocket::processBytesRead (const unsigned char * const buffer,
357
+ size_t const bytesRead) {
358
+
359
+ uint cursor; // Cursor into buffer[]
360
+ cursor = 0 ;
361
+ while (cursor < bytesRead) {
362
+ size_t bytesTaken;
363
+
364
+ if (this ->inEscapeSeq )
365
+ this ->takeSomeEscapeSeq (&buffer[cursor],
366
+ bytesRead - cursor,
367
+ &bytesTaken);
368
+ else if (buffer[cursor] == ESC) {
369
+ this ->inEscapeSeq = true ;
370
+ bytesTaken = 1 ;
371
+ } else if (this ->inPacket )
372
+ this ->takeSomePacket (&buffer[cursor],
373
+ bytesRead - cursor,
374
+ &bytesTaken);
375
+ else
376
+ throwf (" Byte 0x%02x is not in a packet or escape sequence. "
377
+ " Sender is probably not using packet socket protocol" ,
378
+ buffer[cursor]);
379
+
380
+ cursor += bytesTaken;
381
+ }
382
+ }
383
+
384
+
385
+
349
386
void
350
387
packetSocket::readFromFile () {
351
388
/* ----------------------------------------------------------------------------
@@ -379,24 +416,9 @@ packetSocket::readFromFile() {
379
416
380
417
if (bytesRead == 0 ) {
381
418
this ->eof = true ;
382
- verifyNothingAccumulated ();
383
- } else {
384
- uint cursor; // Cursor into buffer[]
385
- cursor = 0 ;
386
- while (cursor < bytesRead) {
387
- size_t bytesTaken;
388
-
389
- if (this ->inEscapeSeq )
390
- this ->takeSomeEscapeSeq (&buffer[cursor],
391
- bytesRead - cursor,
392
- &bytesTaken);
393
- else
394
- this ->takeSomePacket (&buffer[cursor],
395
- bytesRead - cursor,
396
- &bytesTaken);
397
- cursor += bytesTaken;
398
- }
399
- }
419
+ this ->verifyNothingAccumulated ();
420
+ } else
421
+ this ->processBytesRead (buffer, bytesRead);
400
422
}
401
423
}
402
424
}
0 commit comments