Skip to content

Commit 54de1b8

Browse files
committed
Merge pull request repejota#45 from dfeyer/queuesupport
TASK: Implement Connection::QueueSubscribe
2 parents 9004740 + e6ddcb1 commit 54de1b8

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed

src/Connection.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,25 @@ public function subscribe($subject, \Closure $callback)
296296
return $sid;
297297
}
298298

299+
/**
300+
* Subscribes to an specific event given a subject and a queue.
301+
*
302+
* @param string $subject Message topic.
303+
* @param string $queue Queue name.
304+
* @param \Closure $callback Closure to be executed as callback.
305+
*
306+
* @return string
307+
*/
308+
public function queueSubscribe($subject, $queue, \Closure $callback)
309+
{
310+
$sid = uniqid();
311+
$msg = 'SUB '.$subject.' '.$queue.' '. $sid;
312+
$this->send($msg);
313+
$this->subscriptions[$sid] = $callback;
314+
315+
return $sid;
316+
}
317+
299318
/**
300319
* Unsubscribe from a event given a subject.
301320
*

tests/Unit/ConnectionTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,33 @@ public function testSubscription()
157157
$this->c->wait(1);
158158
}
159159

160+
/**
161+
* Test Queue Subscription command.
162+
*
163+
* @return void
164+
*/
165+
public function testQueueSubscription()
166+
{
167+
$callback = function ($message) {
168+
$this->assertNotNull($message);
169+
$this->assertEquals($message, 'bar');
170+
};
171+
172+
$this->c->queueSubscribe('foo', 'bar', $callback);
173+
$this->assertGreaterThan(0, $this->c->subscriptionsCount());
174+
$subscriptions = $this->c->getSubscriptions();
175+
$this->assertInternalType('array', $subscriptions);
176+
177+
$this->c->publish('foo', 'bar');
178+
$this->assertEquals(1, $this->c->pubsCount());
179+
/*
180+
$process = new BackgroundProcess('/usr/bin/php ./tests/Util/ClientServerStub.php ');
181+
$process->run();
182+
*/
183+
// time_nanosleep(1, 0);
184+
$this->c->wait(1);
185+
}
186+
160187
/**
161188
* Test Request command.
162189
*

0 commit comments

Comments
 (0)