Skip to content

Commit ac053f9

Browse files
committed
Merge branch 'release/0.7.1'
2 parents 548a7f9 + 9c99a6c commit ac053f9

File tree

10 files changed

+112
-44
lines changed

10 files changed

+112
-44
lines changed

AUTHORS

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1 @@
11
Raül Pérez <[email protected]> - http://repejota.com - @repejota
2-
Adrià Cidre <[email protected]> - http://oridoki.com - @adriacidre
3-
José Gil <[email protected]> - @josgilmo
4-
Gorka López de Torre <[email protected]> - http://gorka.io - @glopezdetorre

CONTRIBUTORS

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Adrià Cidre <[email protected]> - http://oridoki.com - @adriacidre
2+
José Gil <[email protected]> - @josgilmo
3+
Gorka López de Torre <[email protected]> - http://gorka.io - @glopezdetorre
4+
Issel Guberna - http://isselguberna.com/ - @octante
5+
Donal Byrne - <[email protected]> - http://www.byrnedo.com/
6+
Dominique Feyer - <[email protected]> - http://www.ttree.ch/
7+
Oliver Mack -
8+
Superbug - http://www.superbug.co/
9+
netroby - http://www.netroby.com/
10+

README.md

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,6 @@ Creators
113113
- <https://twitter.com/repejota>
114114
- <https://github.com/repejota>
115115

116-
**Adrià Cidre**
117-
118-
- <https://twitter.com/adriacidre>
119-
- <https://github.com/adriacidre>
120-
121-
**José Gil**
122-
123-
- <https://twitter.com/josgilmo>
124-
- <https://github.com/josgilmo>
125-
126-
**Gorka López de Torre**
127-
128-
- <https://twitter.com/glopezdetorre>
129-
- <https://github.com/glopezdetorre>
130-
131-
132-
133116
License
134117
-------
135118

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.7.0
1+
0.7.1

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"minimum-stability": "dev",
66
"license": "MIT",
77
"require": {
8+
"ircmaxell/random-lib": "^1.1"
89
},
910
"require-dev": {
1011
"phpunit/phpunit": "4.7.*",

examples/connectauth.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
require_once __DIR__ . "/../vendor/autoload.php";
3+
4+
$connectionOptions = new \Nats\ConnectionOptions();
5+
$connectionOptions
6+
->setHost('localhost')
7+
->setPort(4222)
8+
->setUser("foo")
9+
->setPass("bar");
10+
$c = new Nats\Connection($connectionOptions);
11+
$c->connect();
12+
$c->close();

src/Connection.php

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
<?php
22
namespace Nats;
33

4+
use RandomLib\Factory;
5+
use RandomLib\Generator;
6+
47
/**
58
* Connection Class.
69
*/
@@ -13,6 +16,12 @@ class Connection
1316
*/
1417
private $pings = 0;
1518

19+
/**
20+
* Chunk size in bytes to use when reading with fread.
21+
* @var int
22+
*/
23+
private $chunkSize = 8192;
24+
1625
/**
1726
* Return the number of pings.
1827
*
@@ -91,6 +100,13 @@ public function getSubscriptions()
91100
*/
92101
private $options = null;
93102

103+
/**
104+
* Connection timeout
105+
*
106+
* @var float
107+
*/
108+
private $timeout = null;
109+
94110
/**
95111
* Stream File Pointer.
96112
*
@@ -105,6 +121,11 @@ public function getSubscriptions()
105121
*/
106122
private $streamWrapper;
107123

124+
/**
125+
* @var Generator
126+
*/
127+
private $randomGenerator;
128+
108129
/**
109130
* Constructor.
110131
*
@@ -117,6 +138,8 @@ public function __construct(ConnectionOptions $options = null)
117138
$this->subscriptions = [];
118139
$this->options = $options;
119140
$this->streamWrapper = new StreamWrapper();
141+
$randomFactory = new Factory();
142+
$this->randomGenerator = $randomFactory->getLowStrengthGenerator();
120143

121144
if (is_null($options)) {
122145
$this->options = new ConnectionOptions();
@@ -159,7 +182,21 @@ private function receive($len = null)
159182
{
160183

161184
if ($len) {
162-
$line = fread($this->streamSocket, $len);
185+
$chunkSize = $this->chunkSize;
186+
$line = null;
187+
$receivedBytes = 0;
188+
while ($receivedBytes < $len) {
189+
$bytesLeft = $len - $receivedBytes;
190+
if ( $bytesLeft < 1500 ) {
191+
$chunkSize = $bytesLeft;
192+
}
193+
194+
$line .= fread($this->streamSocket, $chunkSize);
195+
$receivedBytes += $chunkSize;
196+
}
197+
if (strlen($line) > 2) {
198+
$line = substr($line, 0, -2);
199+
}
163200
} else {
164201
$line = fgets($this->streamSocket);
165202
}
@@ -170,7 +207,7 @@ private function receive($len = null)
170207
* Returns an stream socket to the desired server.
171208
*
172209
* @param string $address Server url string.
173-
* @param integer $timeout Number of seconds until the connect() system call should timeout.
210+
* @param float $timeout Number of seconds until the connect() system call should timeout.
174211
*
175212
* @return resource
176213
* @throws \Exception Exception raised if connection fails.
@@ -206,13 +243,14 @@ public function isConnected()
206243
/**
207244
* Connect to server.
208245
*
209-
* @param integer $timeout Number of seconds until the connect() system call should timeout.
246+
* @param float $timeout Number of seconds until the connect() system call should timeout.
210247
*
211248
* @throws \Exception Exception raised if connection fails.
212249
* @return void
213250
*/
214251
public function connect($timeout = null)
215252
{
253+
$this->timeout = $timeout;
216254
$this->streamSocket = $this->getStream($this->options->getAddress(), $timeout);
217255
$msg = 'CONNECT '.$this->options;
218256
$this->send($msg);
@@ -271,7 +309,7 @@ public function request($subject, $payload, $callback, $wait = 1)
271309
*
272310
* @return void
273311
*/
274-
public function publish($subject, $payload)
312+
public function publish($subject, $payload = null)
275313
{
276314
$msg = 'PUB '.$subject.' '.strlen($payload);
277315
$this->send($msg . "\r\n" . $payload);
@@ -288,7 +326,7 @@ public function publish($subject, $payload)
288326
*/
289327
public function subscribe($subject, \Closure $callback)
290328
{
291-
$sid = uniqid();
329+
$sid = $this->randomGenerator->generateString(16);
292330
$msg = 'SUB '.$subject.' '.$sid;
293331
$this->send($msg);
294332
$this->subscriptions[$sid] = $callback;
@@ -307,7 +345,7 @@ public function subscribe($subject, \Closure $callback)
307345
*/
308346
public function queueSubscribe($subject, $queue, \Closure $callback)
309347
{
310-
$sid = uniqid();
348+
$sid = $this->randomGenerator->generateString(16);
311349
$msg = 'SUB '.$subject.' '.$queue.' '. $sid;
312350
$this->send($msg);
313351
$this->subscriptions[$sid] = $callback;
@@ -345,7 +383,8 @@ private function handlePING()
345383
*
346384
* @param string $line Message command from NATS.
347385
*
348-
* @return \Exception|void
386+
* @return void
387+
* @throws Exception
349388
* @codeCoverageIgnore
350389
*/
351390
private function handleMSG($line)
@@ -366,11 +405,15 @@ private function handleMSG($line)
366405
$payload = $this->receive($length);
367406
$msg = new Message($subject, $payload, $sid, $this);
368407

408+
if (!isset($this->subscriptions[$sid])) {
409+
throw new Exception('subscription not found');
410+
}
411+
369412
$func = $this->subscriptions[$sid];
370413
if (is_callable($func)) {
371414
$func($msg);
372415
} else {
373-
return new \Exception('not callable');
416+
throw new Exception('not callable');
374417
}
375418

376419
return;
@@ -413,7 +456,7 @@ public function wait($quantity = 0)
413456
/**
414457
* Set Stream Timeout.
415458
*
416-
* @param integer $seconds Before timeout on stream.
459+
* @param float $seconds Before timeout on stream.
417460
*
418461
* @return boolean
419462
*/
@@ -441,7 +484,14 @@ public function reconnect()
441484
{
442485
$this->reconnects += 1;
443486
$this->close();
444-
$this->connect();
487+
$this->connect($this->timeout);
488+
}
489+
490+
/**
491+
* @param integer $chunkSize Set byte chunk len to read when reading from wire
492+
*/
493+
public function setChunkSize($chunkSize){
494+
$this->chunkSize = $chunkSize;
445495
}
446496

447497
/**

src/Exception.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
namespace Nats;
3+
4+
/**
5+
* Exception Class.
6+
*/
7+
class Exception extends \Exception
8+
{
9+
10+
}

src/Message.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class Message
3030
/**
3131
* Message related connection.
3232
*
33-
* @var string
33+
* @var Connection
3434
*/
3535
private $conn;
3636

@@ -146,7 +146,7 @@ public function setConn(Connection $conn)
146146
/**
147147
* Get Conn.
148148
*
149-
* @return string
149+
* @return Connection
150150
*/
151151
public function getConn()
152152
{

src/StreamWrapper.php

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,35 @@ class StreamWrapper
1010
/**
1111
* Wrapper for stream_socket_client
1212
*
13-
* @param string $address Address to connect the socket.
14-
* @param integer $errno Number of error.
15-
* @param string $errstr Description of error.
16-
* @param integer $timeout Timeout.
13+
* @param string $address Address to connect the socket.
14+
* @param integer $errno Number of error.
15+
* @param string $errstr Description of error.
16+
* @param float $timeout Timeout.
1717
* @param integer $typeStream Type of stream.
1818
*
19-
* @return stream
19+
* @return resource
2020
*/
2121
public function getStreamSocketClient($address, &$errno, &$errstr, $timeout, $typeStream)
2222
{
23-
return stream_socket_client($address, $errno, $errstr, $timeout, $typeStream);
23+
$stream = stream_socket_client($address, $errno, $errstr, $timeout, $typeStream);
24+
$this->setStreamTimeout($stream, $timeout);
25+
return $stream;
2426
}
25-
27+
2628
/**
2729
* Wrapper for stream_set_timeout
2830
*
29-
* @param mixed $stream Stream.
30-
* @param integer $seconds Seconds for timeout.
31+
* @param mixed $stream Stream.
32+
* @param float $seconds Seconds for timeout.
3133
*
3234
* @return boolean
3335
*
34-
*/
36+
*/
3537
public function setStreamTimeout($stream, $seconds)
3638
{
37-
return stream_set_timeout($stream, $seconds);
39+
$timeout = number_format($seconds, 3);
40+
$seconds = floor($timeout);
41+
$microseconds = ($timeout - $seconds) * 1000;
42+
return stream_set_timeout($stream, $seconds, $microseconds);
3843
}
3944
}

0 commit comments

Comments
 (0)