Skip to content

Commit 000a9bb

Browse files
committed
Implement pubsub, though pub currently doesn't work
1 parent 6b3ca09 commit 000a9bb

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

src/main/java/io/ipfs/api/IPFS.java

+56
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.net.*;
99
import java.nio.file.*;
1010
import java.util.*;
11+
import java.util.concurrent.*;
12+
import java.util.function.*;
1113
import java.util.stream.*;
1214

1315
public class IPFS {
@@ -36,6 +38,7 @@ public enum PinType {all, direct, indirect, recursive}
3638
public final File file = new File();
3739
public final Stats stats = new Stats();
3840
public final Name name = new Name();
41+
public final Pubsub pubsub = new Pubsub();
3942

4043
public IPFS(String host, int port) {
4144
this(host, port, "/api/v0/");
@@ -217,6 +220,32 @@ public Object gc() throws IOException {
217220
}
218221
}
219222

223+
public class Pubsub {
224+
public Object ls() throws IOException {
225+
return retrieveAndParse("pubsub/ls");
226+
}
227+
228+
public Object peers() throws IOException {
229+
return retrieveAndParse("pubsub/peers");
230+
}
231+
232+
public Object peers(String topic) throws IOException {
233+
return retrieveAndParse("pubsub/peers?topic="+topic);
234+
}
235+
236+
public Object pub(String topic, String data) throws IOException {
237+
return retrieveAndParse("pubsub/peers?arg="+topic + "&data=" + data);
238+
}
239+
240+
public Stream<Object> sub(String topic) throws IOException {
241+
return sub(topic, ForkJoinPool.commonPool());
242+
}
243+
244+
public Stream<Object> sub(String topic, ForkJoinPool threadSupplier) throws IOException {
245+
return retrieveAndParseStream("pubsub/sub?arg="+topic, threadSupplier);
246+
}
247+
}
248+
220249
/* 'ipfs block' is a plumbing command used to manipulate raw ipfs blocks.
221250
*/
222251
public class Block {
@@ -527,6 +556,12 @@ private Object retrieveAndParse(String path) throws IOException {
527556
return JSONParser.parse(new String(res));
528557
}
529558

559+
private Stream<Object> retrieveAndParseStream(String path, ForkJoinPool executor) throws IOException {
560+
Queue<byte[]> objects = new LinkedBlockingQueue<>();
561+
executor.submit(() -> getObjectStream(path, objects::add));
562+
return Stream.generate(() -> JSONParser.parse(new String(objects.poll())));
563+
}
564+
530565
private byte[] retrieve(String path) throws IOException {
531566
URL target = new URL("http", host, port, version + path);
532567
return IPFS.get(target);
@@ -554,6 +589,27 @@ private static byte[] get(URL target) throws IOException {
554589
}
555590
}
556591

592+
private void getObjectStream(String path, Consumer<byte[]> processor) {
593+
byte LINE_FEED = (byte)10;
594+
595+
try {
596+
InputStream in = retrieveStream(path);
597+
ByteArrayOutputStream resp = new ByteArrayOutputStream();
598+
599+
byte[] buf = new byte[4096];
600+
int r;
601+
while ((r = in.read(buf)) >= 0) {
602+
resp.write(buf, 0, r);
603+
if (buf[r - 1] == LINE_FEED) {
604+
processor.accept(resp.toByteArray());
605+
resp.reset();
606+
}
607+
}
608+
} catch (IOException e) {
609+
e.printStackTrace();
610+
}
611+
}
612+
557613
private InputStream retrieveStream(String path) throws IOException {
558614
URL target = new URL("http", host, port, version + path);
559615
return IPFS.getStream(target);

src/test/java/io/ipfs/api/APITest.java

+12
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.*;
88

99
import java.io.*;
10+
import java.net.*;
1011
import java.nio.file.*;
1112
import java.util.*;
1213
import java.util.function.*;
@@ -396,6 +397,17 @@ public void bulkBlockTest() {
396397
}
397398
}
398399

400+
@Test
401+
public void pubsub() throws IOException {
402+
Object ls = ipfs.pubsub.ls();
403+
Object peers = ipfs.pubsub.peers();
404+
Stream<Object> sub = ipfs.pubsub.sub("orbit" + System.nanoTime());
405+
String data = "Gday All!";
406+
Object pub = ipfs.pubsub.pub("orbit", data);
407+
Optional<Object> first = sub.findFirst();
408+
Assert.assertTrue(first.isPresent() && first.get().equals(data));
409+
}
410+
399411
private static String toEscapedHex(byte[] in) throws IOException {
400412
StringBuilder res = new StringBuilder();
401413
for (byte b : in) {

0 commit comments

Comments
 (0)