6
6
use PhpAmqpLib \Exception \AMQPOutOfBoundsException ;
7
7
use PhpAmqpLib \Exception \AMQPRuntimeException ;
8
8
use PhpAmqpLib \Helper \MiscHelper ;
9
- use PhpAmqpLib \Wire \AMQPReader ;
10
- use PhpAmqpLib \Message \AMQPMessage ;
11
-
9
+ use PhpAmqpLib \Helper \Protocol \MethodMap080 ;
10
+ use PhpAmqpLib \Helper \Protocol \MethodMap091 ;
12
11
use PhpAmqpLib \Helper \Protocol \Protocol080 ;
13
12
use PhpAmqpLib \Helper \Protocol \Protocol091 ;
14
13
use PhpAmqpLib \Helper \Protocol \Wait080 ;
15
14
use PhpAmqpLib \Helper \Protocol \Wait091 ;
16
- use PhpAmqpLib \Helper \ Protocol \ MethodMap080 ;
17
- use PhpAmqpLib \Helper \ Protocol \ MethodMap091 ;
15
+ use PhpAmqpLib \Message \ AMQPMessage ;
16
+ use PhpAmqpLib \Wire \ AMQPReader ;
18
17
19
18
class AbstractChannel
20
19
{
20
+
21
21
public static $ PROTOCOL_CONSTANTS_CLASS ;
22
22
23
23
protected $ debug ;
24
+
24
25
/**
25
26
*
26
27
* @var AbstractConnection
27
28
*/
28
29
protected $ connection ;
29
30
31
+ /**
32
+ * @var string
33
+ */
30
34
protected $ protocolVersion ;
31
35
36
+ /**
37
+ * @var \PhpAmqpLib\Helper\Protocol\Protocol091|\PhpAmqpLib\Helper\Protocol\Protocol080
38
+ */
32
39
protected $ protocolWriter ;
33
40
41
+ /**
42
+ * @var \PhpAmqpLib\Helper\Protocol\Wait091|\PhpAmqpLib\Helper\Protocol\Wait080
43
+ */
34
44
protected $ waitHelper ;
35
45
46
+ /**
47
+ * @var \PhpAmqpLib\Helper\Protocol\MethodMap091|\PhpAmqpLib\Helper\Protocol\MethodMap080
48
+ */
36
49
protected $ methodMap ;
37
50
51
+ /**
52
+ * @var string
53
+ */
38
54
protected $ channel_id ;
39
55
56
+ /**
57
+ * @var \PhpAmqpLib\Wire\AMQPReader
58
+ */
40
59
protected $ msg_property_reader = null ;
60
+
61
+ /**
62
+ * @var \PhpAmqpLib\Wire\AMQPReader
63
+ */
41
64
protected $ wait_content_reader = null ;
65
+
66
+ /**
67
+ * @var \PhpAmqpLib\Wire\AMQPReader
68
+ */
42
69
protected $ dispatch_reader = null ;
43
70
71
+
72
+
44
73
/**
45
74
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
46
- * @param $channel_id
75
+ * @param string $channel_id
47
76
*/
48
77
public function __construct (AbstractConnection $ connection , $ channel_id )
49
78
{
50
79
$ this ->connection = $ connection ;
51
80
$ this ->channel_id = $ channel_id ;
52
81
$ connection ->channels [$ channel_id ] = $ this ;
53
- $ this ->frame_queue = array (); // Lower level queue for frames
82
+ $ this ->frame_queue = array (); // Lower level queue for frames
54
83
$ this ->method_queue = array (); // Higher level queue for methods
55
84
$ this ->auto_decode = false ;
56
85
$ this ->debug = defined ('AMQP_DEBUG ' ) ? AMQP_DEBUG : false ;
57
86
58
87
$ this ->msg_property_reader = new AMQPReader (null );
59
88
$ this ->wait_content_reader = new AMQPReader (null );
60
- $ this ->dispatch_reader = new AMQPReader (null );
89
+ $ this ->dispatch_reader = new AMQPReader (null );
61
90
62
91
$ this ->protocolVersion = defined ('AMQP_PROTOCOL ' ) ? AMQP_PROTOCOL : '0.9.1 ' ;
63
92
switch ($ this ->protocolVersion ) {
64
- case '0.9.1 ' :
65
- self ::$ PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091 ' ;
66
- $ c = self ::$ PROTOCOL_CONSTANTS_CLASS ;
67
- $ this ->amqp_protocol_header = $ c ::$ AMQP_PROTOCOL_HEADER ;
68
- $ this ->protocolWriter = new Protocol091 ();
69
- $ this ->waitHelper = new Wait091 ();
70
- $ this ->methodMap = new MethodMap091 ();
71
- break ;
72
- case '0.8 ' :
73
- self ::$ PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080 ' ;
74
- $ c = self ::$ PROTOCOL_CONSTANTS_CLASS ;
75
- $ this ->amqp_protocol_header = $ c ::$ AMQP_PROTOCOL_HEADER ;
76
- $ this ->protocolWriter = new Protocol080 ();
77
- $ this ->waitHelper = new Wait080 ();
78
- $ this ->methodMap = new MethodMap080 ();
79
- break ;
80
- default :
81
- throw new AMQPRuntimeException ('Protocol: ' . $ this ->protocolVersion . ' not implemented. ' );
93
+ case '0.9.1 ' :
94
+ self ::$ PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091 ' ;
95
+ $ c = self ::$ PROTOCOL_CONSTANTS_CLASS ;
96
+ $ this ->amqp_protocol_header = $ c ::$ AMQP_PROTOCOL_HEADER ;
97
+ $ this ->protocolWriter = new Protocol091 ();
98
+ $ this ->waitHelper = new Wait091 ();
99
+ $ this ->methodMap = new MethodMap091 ();
100
+ break ;
101
+ case '0.8 ' :
102
+ self ::$ PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080 ' ;
103
+ $ c = self ::$ PROTOCOL_CONSTANTS_CLASS ;
104
+ $ this ->amqp_protocol_header = $ c ::$ AMQP_PROTOCOL_HEADER ;
105
+ $ this ->protocolWriter = new Protocol080 ();
106
+ $ this ->waitHelper = new Wait080 ();
107
+ $ this ->methodMap = new MethodMap080 ();
108
+ break ;
109
+ default :
110
+ throw new AMQPRuntimeException ('Protocol: ' . $ this ->protocolVersion . ' not implemented. ' );
82
111
}
83
112
}
84
113
114
+
115
+
85
116
public function getChannelId ()
86
117
{
87
118
return $ this ->channel_id ;
88
119
}
89
120
121
+
122
+
123
+ /**
124
+ * @param string $method_sig
125
+ * @param string $args
126
+ * @param $content
127
+ * @return null|string
128
+ */
90
129
public function dispatch ($ method_sig , $ args , $ content )
91
130
{
92
131
if (!$ this ->methodMap ->valid_method ($ method_sig )) {
@@ -108,10 +147,12 @@ public function dispatch($method_sig, $args, $content)
108
147
return call_user_func (array ($ this , $ amqp_method ), $ this ->dispatch_reader , $ content );
109
148
}
110
149
150
+
151
+
111
152
public function next_frame ($ timeout = 0 )
112
153
{
113
154
if ($ this ->debug ) {
114
- MiscHelper::debug_msg ("waiting for a new frame " );
155
+ MiscHelper::debug_msg ("waiting for a new frame " );
115
156
}
116
157
117
158
if (!empty ($ this ->frame_queue )) {
@@ -121,19 +162,25 @@ public function next_frame($timeout = 0)
121
162
return $ this ->connection ->wait_channel ($ this ->channel_id , $ timeout );
122
163
}
123
164
124
- protected function send_method_frame ($ method_sig , $ args ="" )
165
+
166
+
167
+ protected function send_method_frame ($ method_sig , $ args = "" )
125
168
{
126
169
$ this ->connection ->send_channel_method_frame ($ this ->channel_id , $ method_sig , $ args );
127
170
}
128
171
172
+
173
+
129
174
/**
130
175
* This is here for performance reasons to batch calls to fwrite from basic.publish
131
176
*/
132
- protected function prepare_method_frame ($ method_sig , $ args= "" , $ pkt = null )
177
+ protected function prepare_method_frame ($ method_sig , $ args = "" , $ pkt = null )
133
178
{
134
179
return $ this ->connection ->prepare_channel_method_frame ($ this ->channel_id , $ method_sig , $ args , $ pkt );
135
180
}
136
181
182
+
183
+
137
184
public function wait_content ()
138
185
{
139
186
$ frm = $ this ->next_frame ();
@@ -144,58 +191,65 @@ public function wait_content()
144
191
throw new AMQPRuntimeException ("Expecting Content header " );
145
192
}
146
193
147
- $ this ->wait_content_reader ->reuse (mb_substr ($ payload ,0 , 12 ,'ASCII ' ));
194
+ $ this ->wait_content_reader ->reuse (mb_substr ($ payload , 0 , 12 , 'ASCII ' ));
148
195
149
196
// $payload_reader = new AMQPReader(substr($payload,0,12));
150
197
$ class_id = $ this ->wait_content_reader ->read_short ();
151
198
$ weight = $ this ->wait_content_reader ->read_short ();
152
199
153
200
$ body_size = $ this ->wait_content_reader ->read_longlong ();
154
201
202
+ //hack to avoid creating new instances of AMQPReader;
203
+ $ this ->msg_property_reader ->reuse (mb_substr ($ payload , 12 , mb_strlen ($ payload , 'ASCII ' ) - 12 , 'ASCII ' ));
204
+
155
205
$ msg = new AMQPMessage ();
156
- $ this ->msg_property_reader ->reuse (mb_substr ($ payload ,12 ,mb_strlen ($ payload ,'ASCII ' )-12 ,'ASCII ' )); //hack to avoid creating new instances of AMQPReader;
157
206
$ msg ->load_properties ($ this ->msg_property_reader );
158
207
159
208
$ body_parts = array ();
160
209
$ body_received = 0 ;
161
- while (bccomp ($ body_size ,$ body_received ) == 1 ) {
210
+ while (bccomp ($ body_size , $ body_received ) == 1 ) {
162
211
$ frm = $ this ->next_frame ();
163
212
$ frame_type = $ frm [0 ];
164
213
$ payload = $ frm [1 ];
165
214
166
215
if ($ frame_type != 3 ) {
167
216
$ PROTOCOL_CONSTANTS_CLASS = self ::$ PROTOCOL_CONSTANTS_CLASS ;
168
217
throw new AMQPRuntimeException ("Expecting Content body, received frame type $ frame_type ( "
169
- . $ PROTOCOL_CONSTANTS_CLASS ::$ FRAME_TYPES [$ frame_type ]. ") " );
218
+ . $ PROTOCOL_CONSTANTS_CLASS ::$ FRAME_TYPES [$ frame_type ] . ") " );
170
219
}
171
220
172
221
$ body_parts [] = $ payload ;
173
222
$ body_received = bcadd ($ body_received , mb_strlen ($ payload , 'ASCII ' ));
174
223
}
175
224
176
- $ msg ->body = implode ("" ,$ body_parts );
225
+ $ msg ->body = implode ("" , $ body_parts );
177
226
178
227
if ($ this ->auto_decode && isset ($ msg ->content_encoding )) {
179
228
try {
180
229
$ msg ->body = $ msg ->body ->decode ($ msg ->content_encoding );
181
230
} catch (\Exception $ e ) {
182
- if ($ this ->debug ) {
183
- MiscHelper::debug_msg ("Ignoring body decoding exception: " . $ e ->getMessage ());
184
- }
231
+ if ($ this ->debug ) {
232
+ MiscHelper::debug_msg ("Ignoring body decoding exception: " . $ e ->getMessage ());
233
+ }
185
234
}
186
235
}
187
236
188
237
return $ msg ;
189
238
}
190
239
240
+
241
+
191
242
/**
192
243
* Wait for some expected AMQP methods and dispatch to them.
193
244
* Unexpected methods are queued up for later calls to this PHP
194
245
* method.
195
246
*
247
+ * @param array $allowed_methods
248
+ * @param bool $non_blocking
249
+ * @param int $timeout
196
250
* @return mixed
197
251
*/
198
- public function wait ($ allowed_methods= null , $ non_blocking = false , $ timeout = 0 )
252
+ public function wait ($ allowed_methods = null , $ non_blocking = false , $ timeout = 0 )
199
253
{
200
254
$ PROTOCOL_CONSTANTS_CLASS = self ::$ PROTOCOL_CONSTANTS_CLASS ;
201
255
@@ -206,23 +260,21 @@ public function wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
206
260
}
207
261
208
262
//Process deferred methods
209
- foreach ($ this ->method_queue as $ qk=> $ queued_method ) {
210
- if ($ this ->debug ) {
211
- MiscHelper::debug_msg ("checking queue method " . $ qk );
212
- }
263
+ foreach ($ this ->method_queue as $ qk => $ queued_method ) {
264
+ if ($ this ->debug ) {
265
+ MiscHelper::debug_msg ("checking queue method " . $ qk );
266
+ }
213
267
214
268
$ method_sig = $ queued_method [0 ];
215
- if ($ allowed_methods== null || in_array ($ method_sig , $ allowed_methods )) {
269
+ if ($ allowed_methods == null || in_array ($ method_sig , $ allowed_methods )) {
216
270
unset($ this ->method_queue [$ qk ]);
217
271
218
272
if ($ this ->debug ) {
219
- MiscHelper::debug_msg ("Executing queued method: $ method_sig: " .
220
- $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
273
+ MiscHelper::debug_msg ("Executing queued method: $ method_sig: " .
274
+ $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
221
275
}
222
276
223
- return $ this ->dispatch ($ queued_method [0 ],
224
- $ queued_method [1 ],
225
- $ queued_method [2 ]);
277
+ return $ this ->dispatch ($ queued_method [0 ], $ queued_method [1 ], $ queued_method [2 ]);
226
278
}
227
279
}
228
280
@@ -234,20 +286,21 @@ public function wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
234
286
235
287
if ($ frame_type != 1 ) {
236
288
throw new AMQPRuntimeException ("Expecting AMQP method, received frame type: $ frame_type ( "
237
- . $ PROTOCOL_CONSTANTS_CLASS ::$ FRAME_TYPES [$ frame_type ]. ") " );
289
+ . $ PROTOCOL_CONSTANTS_CLASS ::$ FRAME_TYPES [$ frame_type ] . ") " );
238
290
}
239
291
240
292
if (mb_strlen ($ payload , 'ASCII ' ) < 4 ) {
241
293
throw new AMQPOutOfBoundsException ("Method frame too short " );
242
294
}
243
295
244
- $ method_sig_array = unpack ("n2 " , mb_substr ($ payload ,0 , 4 , 'ASCII ' ));
296
+ $ method_sig_array = unpack ("n2 " , mb_substr ($ payload , 0 , 4 , 'ASCII ' ));
245
297
$ method_sig = "" . $ method_sig_array [1 ] . ", " . $ method_sig_array [2 ];
246
298
247
- $ args = mb_substr ($ payload ,4 , mb_strlen ($ payload ,'ASCII ' )- 4 , 'ASCII ' );
299
+ $ args = mb_substr ($ payload , 4 , mb_strlen ($ payload , 'ASCII ' ) - 4 , 'ASCII ' );
248
300
249
301
if ($ this ->debug ) {
250
- MiscHelper::debug_msg ("> $ method_sig: " . $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
302
+ MiscHelper::debug_msg ("> $ method_sig: "
303
+ . $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
251
304
}
252
305
253
306
if (in_array ($ method_sig , $ PROTOCOL_CONSTANTS_CLASS ::$ CONTENT_METHODS )) {
@@ -257,15 +310,16 @@ public function wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
257
310
}
258
311
259
312
if ($ allowed_methods == null ||
260
- in_array ($ method_sig ,$ allowed_methods ) ||
313
+ in_array ($ method_sig , $ allowed_methods ) ||
261
314
in_array ($ method_sig , $ PROTOCOL_CONSTANTS_CLASS ::$ CLOSE_METHODS )
262
- ) {
315
+ ) {
263
316
return $ this ->dispatch ($ method_sig , $ args , $ content );
264
317
}
265
318
266
319
// Wasn't what we were looking for? save it for later
267
320
if ($ this ->debug ) {
268
- MiscHelper::debug_msg ("Queueing for later: $ method_sig: " . $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
321
+ MiscHelper::debug_msg ("Queueing for later: $ method_sig: "
322
+ . $ PROTOCOL_CONSTANTS_CLASS ::$ GLOBAL_METHOD_NAMES [MiscHelper::methodSig ($ method_sig )]);
269
323
}
270
324
$ this ->method_queue [] = array ($ method_sig , $ args , $ content );
271
325
@@ -275,10 +329,13 @@ public function wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
275
329
}
276
330
}
277
331
278
- protected function dispatch_to_handler ($ handler , array $ arguments )
279
- {
280
- if (is_callable ($ handler )) {
281
- call_user_func_array ($ handler , $ arguments );
282
- }
283
- }
332
+
333
+
334
+ protected function dispatch_to_handler ($ handler , array $ arguments )
335
+ {
336
+ if (is_callable ($ handler )) {
337
+ call_user_func_array ($ handler , $ arguments );
338
+ }
339
+ }
340
+
284
341
}
0 commit comments