@@ -234,14 +234,14 @@ public Object peers(String topic) throws IOException {
234
234
}
235
235
236
236
public Object pub (String topic , String data ) throws IOException {
237
- return retrieveAndParse ("pubsub/peers?arg=" +topic + "&data =" + data );
237
+ return retrieveAndParse ("pubsub/peers?arg=" +topic + "&arg =" + data );
238
238
}
239
239
240
- public Stream <Object > sub (String topic ) throws IOException {
240
+ public Supplier <Object > sub (String topic ) throws IOException {
241
241
return sub (topic , ForkJoinPool .commonPool ());
242
242
}
243
243
244
- public Stream <Object > sub (String topic , ForkJoinPool threadSupplier ) throws IOException {
244
+ public Supplier <Object > sub (String topic , ForkJoinPool threadSupplier ) throws IOException {
245
245
return retrieveAndParseStream ("pubsub/sub?arg=" +topic , threadSupplier );
246
246
}
247
247
}
@@ -556,10 +556,16 @@ private Object retrieveAndParse(String path) throws IOException {
556
556
return JSONParser .parse (new String (res ));
557
557
}
558
558
559
- private Stream <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
560
- Queue <byte []> objects = new LinkedBlockingQueue <>();
559
+ private Supplier <Object > retrieveAndParseStream (String path , ForkJoinPool executor ) throws IOException {
560
+ BlockingQueue <byte []> objects = new LinkedBlockingQueue <>();
561
561
executor .submit (() -> getObjectStream (path , objects ::add ));
562
- return Stream .generate (() -> JSONParser .parse (new String (objects .poll ())));
562
+ return () -> {
563
+ try {
564
+ return JSONParser .parse (new String (objects .take ()));
565
+ } catch (InterruptedException e ) {
566
+ throw new RuntimeException (e );
567
+ }
568
+ };
563
569
}
564
570
565
571
private byte [] retrieve (String path ) throws IOException {
0 commit comments