Skip to content

Commit 616b436

Browse files
authored
Batch operations in pipeline (#283)
* Refactoring - getting rid of compiler warnings * Implements batch operations in pipeline mode, refactoring, removes compiler warnings
1 parent ff0cf99 commit 616b436

27 files changed

+184
-96
lines changed

src/main/scala/com/redis/BaseOperations.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ trait BaseOperations extends BaseApi {
4747
send("KEYS", List(pattern))(asList)
4848

4949
override def time[A](implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
50-
send("TIME")(asList)
50+
send("TIME", false)(asList)
5151

5252
@deprecated("use randomkey", "2.8")
5353
def randkey[A](implicit parse: Parse[A]): Option[A] =
54-
send("RANDOMKEY")(asBulk)
54+
send("RANDOMKEY", false)(asBulk)
5555

5656
override def randomkey[A](implicit parse: Parse[A]): Option[A] =
57-
send("RANDOMKEY")(asBulk)
57+
send("RANDOMKEY", false)(asBulk)
5858

5959
override def rename(oldkey: Any, newkey: Any)(implicit format: Format): Boolean =
6060
send("RENAME", List(oldkey, newkey))(asBoolean)
@@ -63,7 +63,7 @@ trait BaseOperations extends BaseApi {
6363
send("RENAMENX", List(oldkey, newkey))(asBoolean)
6464

6565
override def dbsize: Option[Long] =
66-
send("DBSIZE")(asLong)
66+
send("DBSIZE", false)(asLong)
6767

6868
override def exists(key: Any)(implicit format: Format): Boolean =
6969
send("EXISTS", List(key))(asBoolean)
@@ -104,16 +104,16 @@ trait BaseOperations extends BaseApi {
104104
})
105105

106106
override def flushdb: Boolean =
107-
send("FLUSHDB")(asBoolean)
107+
send("FLUSHDB", false)(asBoolean)
108108

109109
override def flushall: Boolean =
110-
send("FLUSHALL")(asBoolean)
110+
send("FLUSHALL", false)(asBoolean)
111111

112112
override def move(key: Any, db: Int)(implicit format: Format): Boolean =
113113
send("MOVE", List(key, db))(asBoolean)
114114

115115
override def quit: Boolean =
116-
send("QUIT")(disconnect)
116+
send("QUIT", false)(disconnect)
117117

118118
override def auth(secret: Any)(implicit format: Format): Boolean =
119119
send("AUTH", List(secret))(asBoolean)
@@ -125,13 +125,13 @@ trait BaseOperations extends BaseApi {
125125
send("SCAN", cursor :: ((x: List[Any]) => if (pattern == "*") x else "match" :: pattern :: x) (if (count == 10) Nil else List("count", count)))(asPair)
126126

127127
override def ping: Option[String] =
128-
send("PING")(asString)
128+
send("PING", false)(asString)
129129

130130
override def watch(key: Any, keys: Any*)(implicit format: Format): Boolean =
131131
send("WATCH", key :: keys.toList)(asBoolean)
132132

133133
override def unwatch(): Boolean =
134-
send("UNWATCH")(asBoolean)
134+
send("UNWATCH", false)(asBoolean)
135135

136136
override def getConfig(key: Any = "*")(implicit format: Format): Option[Map[String, Option[String]]] =
137137
send("CONFIG", List("GET", key))(asList).map { ls =>

src/main/scala/com/redis/IO.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ trait IO extends Log {
115115
disconnect
116116
throw ex
117117
}
118-
build.result
118+
build.result()
119119
}
120120

121121
def readCounted(count: Int): Array[Byte] = {

src/main/scala/com/redis/NodeOperations.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,25 @@ trait NodeOperations extends NodeApi {
66
self: Redis =>
77

88
override def save: Boolean =
9-
send("SAVE")(asBoolean)
9+
send("SAVE", false)(asBoolean)
1010

1111
override def bgsave: Boolean =
12-
send("BGSAVE")(asBoolean)
12+
send("BGSAVE", false)(asBoolean)
1313

1414
override def lastsave: Option[Long] =
15-
send("LASTSAVE")(asLong)
15+
send("LASTSAVE", false)(asLong)
1616

1717
override def shutdown: Boolean =
18-
send("SHUTDOWN")(asBoolean)
18+
send("SHUTDOWN", false)(asBoolean)
1919

2020
override def bgrewriteaof: Boolean =
21-
send("BGREWRITEAOF")(asBoolean)
21+
send("BGREWRITEAOF", false)(asBoolean)
2222

2323
override def info: Option[String] =
24-
send("INFO")(asBulk)
24+
send("INFO", false)(asBulk)
2525

2626
override def monitor: Boolean =
27-
send("MONITOR")(asBoolean)
27+
send("MONITOR", false)(asBoolean)
2828

2929
override def slaveof(options: Any): Boolean = options match {
3030
case (h: String, p: Int) =>

src/main/scala/com/redis/PubSub.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ trait PubSub extends PubOperations { self: Redis =>
7777
}
7878

7979
def pUnsubscribe(): Unit = {
80-
send("PUNSUBSCRIBE")(())
80+
send("PUNSUBSCRIBE", false)(())
8181
}
8282

8383
def pUnsubscribe(channel: String, channels: String*): Unit = {
@@ -98,7 +98,7 @@ trait PubSub extends PubOperations { self: Redis =>
9898
}
9999

100100
def unsubscribe(): Unit = {
101-
val r = send("UNSUBSCRIBE")(())
101+
val r = send("UNSUBSCRIBE", false)(())
102102
pubSub = false
103103
r
104104
}

src/main/scala/com/redis/RedisClient.scala

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ object RedisClient {
1515
case object MIN extends Aggregate
1616
case object MAX extends Aggregate
1717

18+
sealed trait Mode
19+
case object SINGLE extends Mode
20+
case object BATCH extends Mode
21+
1822
private def extractDatabaseNumber(connectionUri: java.net.URI): Int = {
1923
Option(connectionUri.getPath).map(path =>
2024
if (path.isEmpty) 0
@@ -24,11 +28,22 @@ object RedisClient {
2428
}
2529
}
2630

27-
trait Redis extends IO with Protocol {
31+
import RedisClient._
32+
abstract class Redis(batch: Mode) extends IO with Protocol {
33+
var handlers: Vector[(String, () => Any)] = Vector.empty
34+
var commandBuffer: StringBuffer = new StringBuffer
35+
val crlf = "\r\n"
36+
2837

2938
def send[A](command: String, args: Seq[Any])(result: => A)(implicit format: Format): A = try {
30-
write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply))))
31-
result
39+
if (batch == BATCH) {
40+
handlers :+= ((command, () => result))
41+
commandBuffer.append((List(command) ++ args.toList).mkString(" ") ++ crlf)
42+
null.asInstanceOf[A] // hack
43+
} else {
44+
write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply))))
45+
result
46+
}
3247
} catch {
3348
case e: RedisConnectionException =>
3449
if (disconnect) send(command, args)(result)
@@ -38,15 +53,26 @@ trait Redis extends IO with Protocol {
3853
else throw e
3954
}
4055

41-
def send[A](command: String)(result: => A): A = try {
42-
write(Commands.multiBulk(List(command.getBytes("UTF-8"))))
43-
result
56+
def send[A](command: String, submissionMode: Boolean = false)(result: => A): A = try {
57+
if (batch == BATCH) {
58+
if (!submissionMode) {
59+
handlers :+= ((command, () => result))
60+
commandBuffer.append(command ++ crlf)
61+
null.asInstanceOf[A]
62+
} else {
63+
write(command.getBytes("UTF-8"))
64+
result
65+
}
66+
} else {
67+
write(Commands.multiBulk(List(command.getBytes("UTF-8"))))
68+
result
69+
}
4470
} catch {
4571
case e: RedisConnectionException =>
46-
if (disconnect) send(command)(result)
72+
if (disconnect) send(command, submissionMode)(result)
4773
else throw e
4874
case e: SocketException =>
49-
if (disconnect) send(command)(result)
75+
if (disconnect) send(command, submissionMode)(result)
5076
else throw e
5177
}
5278

@@ -57,7 +83,7 @@ trait Redis extends IO with Protocol {
5783

5884
}
5985

60-
trait RedisCommand extends Redis
86+
abstract class RedisCommand(batch: Mode) extends Redis(batch)
6187
with BaseOperations
6288
with GeoOperations
6389
with NodeOperations
@@ -74,7 +100,7 @@ trait RedisCommand extends Redis
74100
val database: Int = 0
75101
val secret: Option[Any] = None
76102

77-
override def onConnect: Unit = {
103+
override def onConnect(): Unit = {
78104
secret.foreach {s =>
79105
auth(s)
80106
}
@@ -89,13 +115,12 @@ trait RedisCommand extends Redis
89115
private def authenticate(): Unit = {
90116
secret.foreach(auth _)
91117
}
92-
93118
}
94119

95-
96120
class RedisClient(override val host: String, override val port: Int,
97-
override val database: Int = 0, override val secret: Option[Any] = None, override val timeout : Int = 0, override val sslContext: Option[SSLContext] = None)
98-
extends RedisCommand with PubSub {
121+
override val database: Int = 0, override val secret: Option[Any] = None, override val timeout : Int = 0,
122+
override val sslContext: Option[SSLContext] = None, val batch: Mode = RedisClient.SINGLE)
123+
extends RedisCommand(batch) with PubSub {
99124

100125
def this() = this("localhost", 6379)
101126
def this(connectionUri: java.net.URI) = this(
@@ -110,18 +135,19 @@ class RedisClient(override val host: String, override val port: Int,
110135
)
111136
override def toString: String = host + ":" + String.valueOf(port) + "/" + database
112137

138+
// with MULTI/EXEC
113139
def pipeline(f: PipelineClient => Any): Option[List[Any]] = {
114-
send("MULTI")(asString) // flush reply stream
140+
send("MULTI", false)(asString) // flush reply stream
115141
try {
116142
val pipelineClient = new PipelineClient(this)
117143
try {
118144
f(pipelineClient)
119145
} catch {
120146
case e: Exception =>
121-
send("DISCARD")(asString)
147+
send("DISCARD", false)(asString)
122148
throw e
123149
}
124-
send("EXEC")(asExec(pipelineClient.handlers))
150+
send("EXEC", false)(asExec(pipelineClient.responseHandlers))
125151
} catch {
126152
case e: RedisMultiExecException =>
127153
None
@@ -135,7 +161,7 @@ class RedisClient(override val host: String, override val port: Int,
135161
/**
136162
* Redis pipelining API without the transaction semantics. The implementation has a non-blocking
137163
* semantics and returns a <tt>List</tt> of <tt>Promise</tt>. The caller may use <tt>Future.firstCompletedOf</tt> to get the
138-
* first completed task before all tasks have been completed.
164+
* first completed task before all tasks have been completed. However the commands are submitted one by one and NOT in batch.
139165
* If an exception is raised in executing any of the commands, then the corresponding <tt>Promise</tt> holds
140166
* the exception. Here's a sample usage:
141167
* <pre>
@@ -179,20 +205,32 @@ class RedisClient(override val host: String, override val port: Int,
179205
ps
180206
}
181207

182-
class PipelineClient(parent: RedisClient) extends RedisCommand with PubOperations {
208+
// batched pipelines : all commands submitted in batch
209+
def batchedPipeline(commands: List[() => Any]): Option[List[Any]] = {
210+
assert(batch == BATCH)
211+
commands.foreach { command =>
212+
command()
213+
}
214+
val r = send(commandBuffer.toString, true)(Some(handlers.map(_._2).map(_()).toList))
215+
handlers = Vector.empty
216+
commandBuffer.setLength(0)
217+
r
218+
}
219+
220+
class PipelineClient(parent: RedisClient) extends RedisCommand(parent.batch) with PubOperations {
183221
import com.redis.serialization.Parse
184222

185-
var handlers: Vector[() => Any] = Vector.empty
223+
var responseHandlers: Vector[() => Any] = Vector.empty
186224

187225
override def send[A](command: String, args: Seq[Any])(result: => A)(implicit format: Format): A = {
188226
write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply))))
189-
handlers :+= (() => result)
227+
responseHandlers :+= (() => result)
190228
receive(singleLineReply).map(Parse.parseDefault)
191229
null.asInstanceOf[A] // ugh... gotta find a better way
192230
}
193-
override def send[A](command: String)(result: => A): A = {
231+
override def send[A](command: String, submissionMode: Boolean = false)(result: => A): A = {
194232
write(Commands.multiBulk(List(command.getBytes("UTF-8"))))
195-
handlers :+= (() => result)
233+
responseHandlers :+= (() => result)
196234
receive(singleLineReply).map(Parse.parseDefault)
197235
null.asInstanceOf[A]
198236
}
@@ -207,7 +245,7 @@ class RedisClient(override val host: String, override val port: Int,
207245
override def connected = parent.connected
208246
override def connect = parent.connect
209247
override def disconnect = parent.disconnect
210-
override def clearFd = parent.clearFd
248+
override def clearFd() = parent.clearFd()
211249
override def write(data: Array[Byte]) = parent.write(data)
212250
override def readLine = parent.readLine
213251
override def readCounted(count: Int) = parent.readCounted(count)

src/main/scala/com/redis/RedisProtocol.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private [redis] object Commands {
3232
b ++= arg
3333
b ++= LS
3434
}
35-
b.result
35+
b.result()
3636
}
3737
}
3838

@@ -111,7 +111,7 @@ private [redis] trait Reply {
111111
Parsers.parseInt(str) match {
112112
case -1 => None
113113
case n if n == handlers.size =>
114-
Some(handlers.map(_.apply).toList)
114+
Some(handlers.map(_.apply()).toList)
115115
case n => throw new Exception("Protocol error: Expected "+handlers.size+" results, but got "+n)
116116
}
117117
}
@@ -286,6 +286,7 @@ private [redis] trait R extends Reply {
286286
}
287287

288288
def asAny = receive(integerReply orElse singleLineReply orElse bulkReply orElse multiBulkReply)
289+
def asAnyMany(count: Int) = (0 to count - 1).map(_ => asAny).toList
289290
}
290291

291292
trait Protocol extends R

src/main/scala/com/redis/cluster/HashRing.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ case class HashRing[T](nodes: List[T], replicas: Int) {
7676
def isEmpty: Boolean = ring.isEmpty
7777

7878
private def nodeHashFor(node: T, replica: Int): Long = {
79-
calculateChecksum((node + ":" + replica).getBytes("UTF-8").toIndexedSeq)
79+
calculateChecksum((s"$node:$replica").getBytes("UTF-8").toIndexedSeq)
8080
}
8181

8282
}

src/main/scala/com/redis/ds/Deque.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,14 @@ import com.redis.ListOperations
3636
import com.redis.serialization.Parse.Implicits._
3737
import com.redis.serialization._
3838

39-
abstract class RedisDeque[A](val blocking: Boolean = false, val timeoutInSecs: Int = 0)(implicit private val format: Format, private val parse: Parse[A])
39+
trait RedisDeque[A]
4040
extends Deque[A] { self: ListOperations =>
4141

42+
val blocking: Boolean = false
43+
val timeoutInSecs: Int = 0
44+
implicit val f: Format
45+
implicit val pr: Parse[A]
46+
4247
val key: String
4348

4449
def addFirst(a: A): Option[Long] = lpush(key, a)
@@ -73,18 +78,27 @@ abstract class RedisDeque[A](val blocking: Boolean = false, val timeoutInSecs: I
7378
}
7479
}
7580

76-
import com.redis.RedisCommand
81+
import com.redis._
82+
83+
abstract class MyRedisDeque[A](bloking: Boolean, timoutInSecs: Int)(implicit format: Format, parse: Parse[A])
84+
extends RedisCommand(RedisClient.SINGLE) with RedisDeque[A] {
85+
override val blocking = bloking
86+
override val timeoutInSecs: Int = timoutInSecs
87+
}
7788

7889
class RedisDequeClient(val h: String, val p: Int, val d: Int = 0, val s: Option[Any] = None, val t : Int =0) {
79-
def getDeque[A](k: String, blocking: Boolean = false, timeoutInSecs: Int = 0)(implicit format: Format, parse: Parse[A]) =
80-
new RedisDeque(blocking, timeoutInSecs)(format, parse) with RedisCommand {
90+
def getDeque[A](k: String, blocking: Boolean = false, timeoutInSecs: Int = 0)(implicit format: Format, parse: Parse[A]) = {
91+
92+
new MyRedisDeque[A](blocking, timeoutInSecs)(format, parse) {
93+
implicit val f = format
94+
implicit val pr = parse
8195
val host = h
8296
val port = p
8397
val timeout = t
8498
val key = k
8599
override val database = d
86100
override val secret = s
87-
88101
override def close(): Unit = disconnect
89102
}
103+
}
90104
}

src/test/scala/com/redis/Bench.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ object Bench {
1313
import Values._
1414
clients.withClient {
1515
client => {
16-
(1 to count) foreach (i => client.rpush(key, values.next))
16+
(1 to count) foreach (i => client.rpush(key, values.next()))
1717
assert(client.llen(key) == Some(count))
1818
(1 to count) foreach (i => client.lpop(key))
1919
assert(client.llen(key) == Some(0))

0 commit comments

Comments
 (0)