Skip to content

Commit 1c4ae1c

Browse files
kerninsfprochazka
authored andcommitted
Fix for php-amqplib#215 and some other related improvements
- Replaced AMQPWriter chrByteSplit() & byteSplit() with more reliable and ~35% more effective packBigEndian(), which also supports negative numbers - Fixed php-amqplib#215 and some other issues on 64bit - Added support for signed longlong, some perf optimizations - Improved existing and added absent range checks, fixed write_long() with vals >php_int_max on 32bit - Fixed existing and added new tests, highly increased num of iters - Reformatted the code for better readability - Reverted int values to be encoded as long in write_array(), as longlong not supported in AMQP-0.8 - fixed exception msg in test - Fixed php-amqplib#216. Added wrapper/helper classes AMQPArray & AMQPTable, which may be directly passed to AMQPWriter::write_array() & write_table() - examples now use AMQPTable - Fixed 'b' type to be read as short-short-int, as per amqp spec. - Added few data types - made read_value() private, just like its counterpart write_value() - Took care of field types mess - http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 - By default rabbitMQ set is used - reverted Decimal to use signed long as per http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3 - removed unused ArrayAccess iface from AMQPAbstractCollection - Incoming message headers as AMQPTable, easily retrievable in their original form via getNativeData() - Added examples for message headers - fixed test for 32bit systems - added full void (null) support - adapted to prev changes (message application_headers is now AMQPTable instance) - fixed assertEquals sematic to conform ($expected, $actual) - fixed setUp() was affecting global state causing consecutive tests to fail - Changed some methods to accept empty $arguments to make them friendly to downstream type-hinted wrappers like someMethod($foo, $bar, AMQPTable $args=null) {basicConsume(...., $args);} - fixed tests to pass on Travis's ancient phpunit version
1 parent e30f455 commit 1c4ae1c

25 files changed

+2123
-254
lines changed

PhpAmqpLib/Channel/AMQPChannel.php

+15-13
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public function close($reply_code = 0, $reply_text = '', $method_sig = array(0,
157157
{
158158
if ($this->is_open !== true || null === $this->connection) {
159159
$this->do_close();
160+
160161
return; // already closed
161162
}
162163
list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
@@ -193,6 +194,8 @@ protected function channel_close($args)
193194
/**
194195
* Confirm a channel close
195196
* Alias of AMQPChannel::do_close()
197+
*
198+
* @param AMQPReader $args
196199
*/
197200
protected function channel_close_ok($args)
198201
{
@@ -249,7 +252,7 @@ protected function channel_flow_ok($args)
249252
protected function x_open($out_of_band = '')
250253
{
251254
if ($this->is_open) {
252-
return NULL;
255+
return null;
253256
}
254257

255258
list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
@@ -310,8 +313,8 @@ public function access_request(
310313
/**
311314
* Grants access to server resources
312315
*
313-
* @param $args
314-
* @return mixed
316+
* @param AMQPReader $args
317+
* @return string
315318
*/
316319
protected function access_request_ok($args)
317320
{
@@ -363,7 +366,7 @@ public function exchange_declare(
363366
$this->send_method_frame(array($class_id, $method_id), $args);
364367

365368
if ($nowait) {
366-
return NULL;
369+
return null;
367370
}
368371

369372
return $this->wait(array(
@@ -752,10 +755,9 @@ protected function basic_ack_from_server(AMQPReader $args)
752755

753756
if (false === isset($this->published_messages[$delivery_tag])) {
754757
throw new AMQPRuntimeException(sprintf(
755-
'Server ack\'ed unknown delivery_tag %s',
756-
$delivery_tag
757-
)
758-
);
758+
'Server ack\'ed unknown delivery_tag %s',
759+
$delivery_tag
760+
));
759761
}
760762

761763
$this->internal_ack_handler($delivery_tag, $multiple, $this->ack_handler);
@@ -774,10 +776,9 @@ protected function basic_nack_from_server($args)
774776

775777
if (false === isset($this->published_messages[$delivery_tag])) {
776778
throw new AMQPRuntimeException(sprintf(
777-
'Server nack\'ed unknown delivery_tag %s',
778-
$delivery_tag
779-
)
780-
);
779+
'Server nack\'ed unknown delivery_tag %s',
780+
$delivery_tag
781+
));
781782
}
782783

783784
$this->internal_ack_handler($delivery_tag, $multiple, $this->nack_handler);
@@ -1281,6 +1282,7 @@ protected function tx_commit_ok($args)
12811282

12821283
/**
12831284
* Rollbacks the current transaction
1285+
*
12841286
* @return mixed
12851287
*/
12861288
public function tx_rollback()
@@ -1312,7 +1314,7 @@ public function confirm_select($nowait = false)
13121314
$this->send_method_frame(array($class_id, $method_id), $args);
13131315

13141316
if ($nowait) {
1315-
return NULL;
1317+
return null;
13161318
}
13171319

13181320
$this->wait(array($this->waitHelper->get_wait('confirm.select_ok')));

PhpAmqpLib/Channel/AbstractChannel.php

+24-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use PhpAmqpLib\Connection\AbstractConnection;
55
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
6+
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
67
use PhpAmqpLib\Exception\AMQPRuntimeException;
78
use PhpAmqpLib\Helper\MiscHelper;
89
use PhpAmqpLib\Helper\Protocol\MethodMap080;
@@ -16,6 +17,9 @@
1617

1718
class AbstractChannel
1819
{
20+
const PROTO_080 = '0.8';
21+
const PROTO_091 = '0.9.1';
22+
1923
public static $PROTOCOL_CONSTANTS_CLASS;
2024

2125
/** @var array */
@@ -79,17 +83,17 @@ public function __construct(AbstractConnection $connection, $channel_id)
7983
$this->wait_content_reader = new AMQPReader(null);
8084
$this->dispatch_reader = new AMQPReader(null);
8185

82-
$this->protocolVersion = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : '0.9.1';
86+
$this->protocolVersion = self::getProtocolVersion();
8387
switch ($this->protocolVersion) {
84-
case '0.9.1':
88+
case self::PROTO_091:
8589
self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091';
8690
$c = self::$PROTOCOL_CONSTANTS_CLASS;
8791
$this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
8892
$this->protocolWriter = new Protocol091();
8993
$this->waitHelper = new Wait091();
9094
$this->methodMap = new MethodMap091();
9195
break;
92-
case '0.8':
96+
case self::PROTO_080:
9397
self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080';
9498
$c = self::$PROTOCOL_CONSTANTS_CLASS;
9599
$this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER;
@@ -98,10 +102,26 @@ public function __construct(AbstractConnection $connection, $channel_id)
98102
$this->methodMap = new MethodMap080();
99103
break;
100104
default:
101-
throw new AMQPRuntimeException('Protocol: ' . $this->protocolVersion . ' not implemented.');
105+
//this is logic exception (requires code changes to fix), so OutOfRange, not OutOfBounds or Runtime
106+
throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $this->protocolVersion));
102107
}
103108
}
104109

110+
/**
111+
* @return string
112+
* @throws AMQPOutOfRangeException
113+
*/
114+
public static function getProtocolVersion()
115+
{
116+
$proto = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTO_091;
117+
//adding check here to catch unknown protocol ASAP, as this method may be called from the outside
118+
if (!in_array($proto, array(self::PROTO_080, self::PROTO_091), TRUE)) {
119+
throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $proto));
120+
}
121+
122+
return $proto;
123+
}
124+
105125
/**
106126
* @return string
107127
*/

PhpAmqpLib/Connection/AMQPLazyConnection.php

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ protected function getIO()
3939

4040
/**
4141
* Should the connection be attempted during construction?
42+
*
4243
* @return bool
4344
*/
4445
public function connectOnConstruct()

PhpAmqpLib/Connection/AbstractConnection.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ protected function wait_channel($channel_id, $timeout = 0)
562562
if ($frame_channel === 0 && $frame_type === 8) {
563563
// skip heartbeat frames
564564
continue;
565-
565+
566566
} else {
567567

568568
if ($frame_channel == $channel_id) {
@@ -834,10 +834,10 @@ protected function connection_tune($args)
834834
}
835835

836836
// use server proposed value if not set
837-
if ($this->heartbeat === NULL) {
837+
if ($this->heartbeat === null) {
838838
$this->heartbeat = $args->read_short();
839839
}
840-
840+
841841
$this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
842842
}
843843

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
namespace PhpAmqpLib\Exception;
3+
4+
class AMQPLogicException extends \LogicException implements AMQPExceptionInterface
5+
{
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
namespace PhpAmqpLib\Exception;
3+
4+
class AMQPOutOfRangeException extends \OutOfRangeException implements AMQPExceptionInterface
5+
{
6+
7+
}

PhpAmqpLib/Helper/Protocol/Protocol080.php

+8-8
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,14 @@ public static function accessRequestOk($args)
282282
/**
283283
* @return array
284284
*/
285-
public function exchangeDeclare($ticket = 1, $exchange, $type = 'direct', $passive = false, $durable = false, $auto_delete = false, $internal = false, $nowait = false, $arguments = array())
285+
public function exchangeDeclare($ticket = 1, $exchange, $type = 'direct', $passive = false, $durable = false, $auto_delete = false, $internal = false, $nowait = false, $arguments = null)
286286
{
287287
$args = new AMQPWriter();
288288
$args->write_short($ticket);
289289
$args->write_shortstr($exchange);
290290
$args->write_shortstr($type);
291291
$args->write_bits(array($passive, $durable, $auto_delete, $internal, $nowait));
292-
$args->write_table($arguments);
292+
$args->write_table(empty($arguments)? array():$arguments);
293293
return array(40, 10, $args);
294294
}
295295

@@ -336,13 +336,13 @@ public static function exchangeDeleteOk($args)
336336
/**
337337
* @return array
338338
*/
339-
public function queueDeclare($ticket = 1, $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = array())
339+
public function queueDeclare($ticket = 1, $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null)
340340
{
341341
$args = new AMQPWriter();
342342
$args->write_short($ticket);
343343
$args->write_shortstr($queue);
344344
$args->write_bits(array($passive, $durable, $exclusive, $auto_delete, $nowait));
345-
$args->write_table($arguments);
345+
$args->write_table(empty($arguments)? array():$arguments);
346346
return array(50, 10, $args);
347347
}
348348

@@ -366,15 +366,15 @@ public static function queueDeclareOk($args)
366366
/**
367367
* @return array
368368
*/
369-
public function queueBind($ticket = 1, $queue = '', $exchange, $routing_key = '', $nowait = false, $arguments = array())
369+
public function queueBind($ticket = 1, $queue = '', $exchange, $routing_key = '', $nowait = false, $arguments = null)
370370
{
371371
$args = new AMQPWriter();
372372
$args->write_short($ticket);
373373
$args->write_shortstr($queue);
374374
$args->write_shortstr($exchange);
375375
$args->write_shortstr($routing_key);
376376
$args->write_bits(array($nowait));
377-
$args->write_table($arguments);
377+
$args->write_table(empty($arguments)? array():$arguments);
378378
return array(50, 20, $args);
379379
}
380380

@@ -449,14 +449,14 @@ public static function queueDeleteOk($args)
449449
/**
450450
* @return array
451451
*/
452-
public function queueUnbind($ticket = 1, $queue = '', $exchange, $routing_key = '', $arguments = array())
452+
public function queueUnbind($ticket = 1, $queue = '', $exchange, $routing_key = '', $arguments = null)
453453
{
454454
$args = new AMQPWriter();
455455
$args->write_short($ticket);
456456
$args->write_shortstr($queue);
457457
$args->write_shortstr($exchange);
458458
$args->write_shortstr($routing_key);
459-
$args->write_table($arguments);
459+
$args->write_table(empty($arguments)? array():$arguments);
460460
return array(50, 50, $args);
461461
}
462462

PhpAmqpLib/Helper/Protocol/Protocol091.php

+14-14
Original file line numberDiff line numberDiff line change
@@ -280,14 +280,14 @@ public static function accessRequestOk($args)
280280
/**
281281
* @return array
282282
*/
283-
public function exchangeDeclare($ticket = 0, $exchange, $type = 'direct', $passive = false, $durable = false, $auto_delete = false, $internal = false, $nowait = false, $arguments = array())
283+
public function exchangeDeclare($ticket = 0, $exchange, $type = 'direct', $passive = false, $durable = false, $auto_delete = false, $internal = false, $nowait = false, $arguments = null)
284284
{
285285
$args = new AMQPWriter();
286286
$args->write_short($ticket);
287287
$args->write_shortstr($exchange);
288288
$args->write_shortstr($type);
289289
$args->write_bits(array($passive, $durable, $auto_delete, $internal, $nowait));
290-
$args->write_table($arguments);
290+
$args->write_table(empty($arguments)? array():$arguments);
291291
return array(40, 10, $args);
292292
}
293293

@@ -334,15 +334,15 @@ public static function exchangeDeleteOk($args)
334334
/**
335335
* @return array
336336
*/
337-
public function exchangeBind($ticket = 0, $destination, $source, $routing_key = '', $nowait = false, $arguments = array())
337+
public function exchangeBind($ticket = 0, $destination, $source, $routing_key = '', $nowait = false, $arguments = null)
338338
{
339339
$args = new AMQPWriter();
340340
$args->write_short($ticket);
341341
$args->write_shortstr($destination);
342342
$args->write_shortstr($source);
343343
$args->write_shortstr($routing_key);
344344
$args->write_bits(array($nowait));
345-
$args->write_table($arguments);
345+
$args->write_table(empty($arguments)? array():$arguments);
346346
return array(40, 30, $args);
347347
}
348348

@@ -363,15 +363,15 @@ public static function exchangeBindOk($args)
363363
/**
364364
* @return array
365365
*/
366-
public function exchangeUnbind($ticket = 0, $destination, $source, $routing_key = '', $nowait = false, $arguments = array())
366+
public function exchangeUnbind($ticket = 0, $destination, $source, $routing_key = '', $nowait = false, $arguments = null)
367367
{
368368
$args = new AMQPWriter();
369369
$args->write_short($ticket);
370370
$args->write_shortstr($destination);
371371
$args->write_shortstr($source);
372372
$args->write_shortstr($routing_key);
373373
$args->write_bits(array($nowait));
374-
$args->write_table($arguments);
374+
$args->write_table(empty($arguments)? array():$arguments);
375375
return array(40, 40, $args);
376376
}
377377

@@ -392,13 +392,13 @@ public static function exchangeUnbindOk($args)
392392
/**
393393
* @return array
394394
*/
395-
public function queueDeclare($ticket = 0, $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = array())
395+
public function queueDeclare($ticket = 0, $queue = '', $passive = false, $durable = false, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null)
396396
{
397397
$args = new AMQPWriter();
398398
$args->write_short($ticket);
399399
$args->write_shortstr($queue);
400400
$args->write_bits(array($passive, $durable, $exclusive, $auto_delete, $nowait));
401-
$args->write_table($arguments);
401+
$args->write_table(empty($arguments)? array():$arguments);
402402
return array(50, 10, $args);
403403
}
404404

@@ -422,15 +422,15 @@ public static function queueDeclareOk($args)
422422
/**
423423
* @return array
424424
*/
425-
public function queueBind($ticket = 0, $queue = '', $exchange, $routing_key = '', $nowait = false, $arguments = array())
425+
public function queueBind($ticket = 0, $queue = '', $exchange, $routing_key = '', $nowait = false, $arguments = null)
426426
{
427427
$args = new AMQPWriter();
428428
$args->write_short($ticket);
429429
$args->write_shortstr($queue);
430430
$args->write_shortstr($exchange);
431431
$args->write_shortstr($routing_key);
432432
$args->write_bits(array($nowait));
433-
$args->write_table($arguments);
433+
$args->write_table(empty($arguments)? array():$arguments);
434434
return array(50, 20, $args);
435435
}
436436

@@ -505,14 +505,14 @@ public static function queueDeleteOk($args)
505505
/**
506506
* @return array
507507
*/
508-
public function queueUnbind($ticket = 0, $queue = '', $exchange, $routing_key = '', $arguments = array())
508+
public function queueUnbind($ticket = 0, $queue = '', $exchange, $routing_key = '', $arguments = null)
509509
{
510510
$args = new AMQPWriter();
511511
$args->write_short($ticket);
512512
$args->write_shortstr($queue);
513513
$args->write_shortstr($exchange);
514514
$args->write_shortstr($routing_key);
515-
$args->write_table($arguments);
515+
$args->write_table(empty($arguments)? array():$arguments);
516516
return array(50, 50, $args);
517517
}
518518

@@ -559,14 +559,14 @@ public static function basicQosOk($args)
559559
/**
560560
* @return array
561561
*/
562-
public function basicConsume($ticket = 0, $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $arguments = array())
562+
public function basicConsume($ticket = 0, $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $arguments = null)
563563
{
564564
$args = new AMQPWriter();
565565
$args->write_short($ticket);
566566
$args->write_shortstr($queue);
567567
$args->write_shortstr($consumer_tag);
568568
$args->write_bits(array($no_local, $no_ack, $exclusive, $nowait));
569-
$args->write_table($arguments);
569+
$args->write_table(empty($arguments)? array():$arguments);
570570
return array(60, 20, $args);
571571
}
572572

0 commit comments

Comments
 (0)