Skip to content

Commit 548a7f9

Browse files
committed
Merge branch 'release/0.7.0'
2 parents 1e5f357 + 3cda861 commit 548a7f9

File tree

4 files changed

+55
-12
lines changed

4 files changed

+55
-12
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Now let's tell composer about our project's dependancies, in this case, PHPNats.
3939
```
4040
{
4141
"require": {
42-
"repejota/nats": "master"
42+
"repejota/nats": "dev-master"
4343
}
4444
}
4545
```

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.6.3
1+
0.7.0

src/Connection.php

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -157,17 +157,13 @@ private function send($payload)
157157
*/
158158
private function receive($len = null)
159159
{
160+
160161
if ($len) {
161-
$line = fgets($this->streamSocket, $len + 1);
162+
$line = fread($this->streamSocket, $len);
162163
} else {
163164
$line = fgets($this->streamSocket);
164165
}
165-
166-
if ($line === false) {
167-
return $line;
168-
} else {
169-
return trim($line);
170-
}
166+
return $line;
171167
}
172168

173169
/**
@@ -300,6 +296,25 @@ public function subscribe($subject, \Closure $callback)
300296
return $sid;
301297
}
302298

299+
/**
300+
* Subscribes to an specific event given a subject and a queue.
301+
*
302+
* @param string $subject Message topic.
303+
* @param string $queue Queue name.
304+
* @param \Closure $callback Closure to be executed as callback.
305+
*
306+
* @return string
307+
*/
308+
public function queueSubscribe($subject, $queue, \Closure $callback)
309+
{
310+
$sid = uniqid();
311+
$msg = 'SUB '.$subject.' '.$queue.' '. $sid;
312+
$this->send($msg);
313+
$this->subscriptions[$sid] = $callback;
314+
315+
return $sid;
316+
}
317+
303318
/**
304319
* Unsubscribe from a event given a subject.
305320
*
@@ -337,14 +352,14 @@ private function handleMSG($line)
337352
{
338353
$parts = explode(' ', $line);
339354
$subject = null;
340-
$length = $parts[3];
355+
$length = trim($parts[3]);
341356
$sid = $parts[2];
342357

343358
if (count($parts) == 5) {
344-
$length = $parts[4];
359+
$length = trim($parts[4]);
345360
$subject = $parts[3];
346361
} elseif (count($parts) == 4) {
347-
$length = $parts[3];
362+
$length = trim($parts[3]);
348363
$subject = $parts[1];
349364
}
350365

@@ -373,6 +388,7 @@ public function wait($quantity = 0)
373388
$count = 0;
374389
while (!feof($this->streamSocket)) {
375390
$line = $this->receive();
391+
376392
if ($line === false) {
377393
return null;
378394
}

tests/Unit/ConnectionTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,33 @@ public function testSubscription()
157157
$this->c->wait(1);
158158
}
159159

160+
/**
161+
* Test Queue Subscription command.
162+
*
163+
* @return void
164+
*/
165+
public function testQueueSubscription()
166+
{
167+
$callback = function ($message) {
168+
$this->assertNotNull($message);
169+
$this->assertEquals($message, 'bar');
170+
};
171+
172+
$this->c->queueSubscribe('foo', 'bar', $callback);
173+
$this->assertGreaterThan(0, $this->c->subscriptionsCount());
174+
$subscriptions = $this->c->getSubscriptions();
175+
$this->assertInternalType('array', $subscriptions);
176+
177+
$this->c->publish('foo', 'bar');
178+
$this->assertEquals(1, $this->c->pubsCount());
179+
/*
180+
$process = new BackgroundProcess('/usr/bin/php ./tests/Util/ClientServerStub.php ');
181+
$process->run();
182+
*/
183+
// time_nanosleep(1, 0);
184+
$this->c->wait(1);
185+
}
186+
160187
/**
161188
* Test Request command.
162189
*

0 commit comments

Comments
 (0)