Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public int shutdown(int fd, int how) throws NativeErrorException {

public int available(int fd) throws NativeErrorException {
int result = availableNative(fd);
if (result < 0) {
return returnOrThrow(pollReadNative(fd, 0), 0);
if (result >= 0) {
return result;
} else if (pollRead(fd, 0)) {
return 1;
} else {
return returnOrThrow(result, 0);
return 0;
}
}

Expand Down
29 changes: 10 additions & 19 deletions src/main/java/org/scalasbt/ipcsocket/SocketChannels.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,8 @@ public static String readLine(SocketChannel channel, int readTimeoutMillis) thro
numOfKeys = sel.select(readTimeoutMillis);
}
} else {
if (readTimeoutMillis > 0) {
if (ServerSocketChannels.isWin) {
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
}
} else {
throw new IOException("timeout requires JDK 17 or Windows");
}
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
}
}
if (numOfKeys == 0) {
Expand Down Expand Up @@ -152,17 +146,14 @@ public static ByteBuffer readAll(SocketChannel channel, int readTimeoutMillis)
numOfKeys = sel.select(readTimeoutMillis);
}
} else {
if (readTimeoutMillis > 0) {
// The following operation gets blocked on JDK 8
// channel.register(sel, SelectionKey.OP_READ);
// numOfKeys = sel.select(readTimeoutMillis);
if (ServerSocketChannels.isWin) {
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
}
} else {
throw new IOException("timeout requires JDK 17 or Windows");
}
// if (readTimeoutMillis > 0) {
// throw new IOException("timeout requires JDK 17 and non-Windows");
// }
// The following operation gets blocked on JDK 8
// channel.register(sel, SelectionKey.OP_READ);
// numOfKeys = sel.select(readTimeoutMilis);
if (channel.supportedOptions().contains(SO_TIMEOUT)) {
channel.setOption(SO_TIMEOUT, Integer.valueOf(readTimeoutMillis));
}
}
if (numOfKeys == 0) {
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/org/scalasbt/ipcsocket/UnixDomainSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.ByteBuffer;

import java.net.Socket;
import java.net.SocketTimeoutException;

import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -137,7 +138,7 @@ public int available() throws IOException {
public int read() throws IOException {
byte[] buf = new byte[1];
int result;
if (doRead(buf, 0, 1) == 0) {
if (doRead(buf, 0, 1, getSoTimeout()) == 0) {
result = -1;
} else {
// Make sure to & with 0xFF to avoid sign extension
Expand All @@ -152,7 +153,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}
int socketFd = fd.acquire();
try {
int result = doRead(b, off, len);
int result = doRead(b, off, len, getSoTimeout());
if (result == 0) {
try {
provider.close(socketFd);
Expand All @@ -168,12 +169,17 @@ public int read(byte[] b, int off, int len) throws IOException {
}
}

private int doRead(byte[] buf, int offset, int len) throws IOException {
private int doRead(byte[] buf, int offset, int len, int timeoutMillis) throws IOException {
try {
int fdToRead = fd.acquire();
if (fdToRead == -1) {
return -1;
}
if (timeoutMillis > 0) {
if (!provider.pollRead(fdToRead, timeoutMillis)) {
throw new SocketTimeoutException("read timed out");
}
}
return provider.read(fdToRead, buf, offset, len);
} catch (NativeErrorException e) {
throw new IOException(e);
Expand Down
16 changes: 6 additions & 10 deletions src/test/java/org/scalasbt/ipcsocket/SocketChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,26 @@ static boolean isJava17Plus() {
return SocketChannels.isJava17Plus();
}

/** Test the non-blocking echo server using JDK 17 Unix Domain Socket. */
/** Test the non-blocking echo server. */
@Test
public void testNonBlockingEchoServer() throws IOException, InterruptedException {
System.out.println(
"SocketChannelTest#testNonBlockingEchoServer(" + Boolean.toString(useJNI()) + ")");
withSocket(
sock -> {
if (isJava17Plus() || ServerSocketChannels.isWin) {
String line = nonBlockingEchoServerTest(sock, 100, 600);
assertEquals("echo did not return the content", "hello", line);
}
String line = nonBlockingEchoServerTest(sock, 100, 600);
assertEquals("echo did not return the content", "hello", line);
});
}

/** Test the non-blocking echo server using JDK 17 Unix Domain Socket. */
/** Test the non-blocking echo server. */
@Test
public void testTimeout() throws IOException, InterruptedException {
System.out.println("SocketChannelTest#testTimeout(" + Boolean.toString(useJNI()) + ")");
withSocket(
sock -> {
if (isJava17Plus() || ServerSocketChannels.isWin) {
String line = nonBlockingEchoServerTest(sock, 6000, 600);
assertEquals("echo did not timeout", "<unavailable>", line);
}
String line = nonBlockingEchoServerTest(sock, 6000, 600);
assertEquals("echo did not timeout", "<unavailable>", line);
});
}

Expand Down
Loading