forked from kstyrc/embedded-redis
-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathAbstractRedisInstance.java
154 lines (133 loc) · 4.88 KB
/
AbstractRedisInstance.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package redis.embedded;
import org.apache.commons.io.IOUtils;
import redis.embedded.exceptions.EmbeddedRedisException;
import java.io.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
abstract class AbstractRedisInstance implements Redis {
protected List<String> args = Collections.emptyList();
private volatile boolean active = false;
private Process redisProcess;
private final int port;
private ExecutorService executor = Executors.newSingleThreadExecutor();
private PrintStream out = null; //Ignore Redis output.
private PrintStream err = System.err; //Forward Redis error messages to STDERR.
protected AbstractRedisInstance(int port) {
this.port = port;
}
public boolean isActive() {
return active;
}
public synchronized void start() throws EmbeddedRedisException {
if (active) {
throw new EmbeddedRedisException("This redis server instance is already running...");
}
try {
redisProcess = createRedisProcessBuilder().start();
logErrors();
awaitRedisServerReady();
active = true;
} catch (IOException e) {
throw new EmbeddedRedisException("Failed to start Redis instance", e);
}
}
private void logErrors() {
final InputStream errorStream = redisProcess.getErrorStream();
copyStreamInBackground(errorStream, err);
}
private void copyStreamInBackground(final InputStream copyFrom, PrintStream copyTo) {
BufferedReader reader = new BufferedReader(new InputStreamReader(copyFrom));
Runnable printReaderTask = new PrintReaderRunnable(reader, copyTo);
executor.submit(printReaderTask);
}
public void outTo(PrintStream out) {
this.out = out;
}
public void errTo(PrintStream err) {
this.err = err;
}
private void awaitRedisServerReady() throws IOException {
InputStream stdoutStream = redisProcess.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(stdoutStream));
try {
StringBuffer outputStringBuffer = new StringBuffer();
String outputLine;
do {
outputLine = reader.readLine();
if (outputLine == null) {
//Something goes wrong. Stream is ended before server was activated.
throw new RuntimeException("Can't start redis server. Check logs for details. Redis process log: "+outputStringBuffer.toString());
}
else{
outputStringBuffer.append("\n");
outputStringBuffer.append(outputLine);
if(out != null) {
out.println(outputLine);
}
}
} while (!outputLine.matches(redisReadyPattern()));
} finally {
if(out != null) {
/* Continue reading of STDOUT in a background thread. */
copyStreamInBackground(stdoutStream, out);
} else {
IOUtils.closeQuietly(reader);
}
}
}
protected abstract String redisReadyPattern();
private ProcessBuilder createRedisProcessBuilder() {
File executable = new File(args.get(0));
ProcessBuilder pb = new ProcessBuilder(args);
pb.directory(executable.getParentFile());
return pb;
}
public synchronized void stop() throws EmbeddedRedisException {
if (active) {
if (executor != null && !executor.isShutdown()) {
executor.shutdown();
}
redisProcess.destroy();
tryWaitFor();
active = false;
}
}
private void tryWaitFor() {
try {
redisProcess.waitFor();
} catch (InterruptedException e) {
throw new EmbeddedRedisException("Failed to stop redis instance", e);
}
}
public List<Integer> ports() {
return Arrays.asList(port);
}
private static class PrintReaderRunnable implements Runnable {
private final BufferedReader reader;
private final PrintStream outputStream;
private PrintReaderRunnable(BufferedReader reader, PrintStream outputStream) {
this.reader = reader;
this.outputStream = outputStream;
}
public void run() {
try {
readLines();
} finally {
IOUtils.closeQuietly(reader);
}
}
public void readLines() {
try {
String line;
while ((line = reader.readLine()) != null) {
outputStream.println(line);
}
} catch (IOException e) {
/* reader has been closed. The connected Redis instance has possibly shut down. */
}
}
}
}