Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentinel support #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/main/scala/com/redis/IO.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package com.redis

import java.io._
import java.net.{Socket, InetSocketAddress}
import java.net.{InetAddress, Socket, InetSocketAddress}

import serialization.Parse.parseStringSafe

trait IO extends Log {
val host: String
val port: Int
val addr: NodeAddress
def host: String = addr.addr._1
def port: Int = addr.addr._2

var socket: Socket = _
addr onChange onAddrChange

@volatile var socket: Socket = _
var out: OutputStream = _
var in: InputStream = _
var db: Int = _
Expand All @@ -21,9 +24,17 @@ trait IO extends Log {
disconnect && connect
}

protected def onAddrChange(addr: InetSocketAddress) {
val sock = socket
if (sock != null && sock.getRemoteSocketAddress != addr) {
sock.close() // just close the socket (pretend the server closed it)
}
}

// Connects the socket, and sets the input and output streams.
def connect: Boolean = {
try {
val (host, port) = addr.addr
socket = new Socket(host, port)

socket.setSoTimeout(0)
Expand Down
100 changes: 100 additions & 0 deletions src/main/scala/com/redis/NodeAddress.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.redis

import java.net.InetSocketAddress

abstract class NodeAddress {
def addr: (String, Int)

def onChange(callback: InetSocketAddress => Unit)
override def toString = {
val (host, port) = addr
host + ":" + String.valueOf(port)
}
}

class FixedAddress(host: String, port: Int) extends NodeAddress {
val addr = (host, port)
override def onChange(callback: InetSocketAddress => Unit) { }
}

class SentinelMonitoredMasterAddress(val sentinels: Seq[(String, Int)], val masterName: String) extends NodeAddress
with Log {

var master: Option[(String, Int)] = None

override def addr = master.synchronized {
master match {
case Some((h, p)) => (h, p)
case _ => throw new RuntimeException("All sentinels are down.")
}
}

private var onChangeCallbacks: List[InetSocketAddress => Unit] = Nil

override def onChange(callback: InetSocketAddress => Unit) = synchronized {
onChangeCallbacks = callback :: onChangeCallbacks
}

private def fireCallbacks(addr: InetSocketAddress) = synchronized {
onChangeCallbacks foreach (_(addr))
}

def stopMonitoring() {
sentinelListeners foreach (_.stop())
}

private val sentinelClients = sentinels.map { case (h, p) =>
val client = new SentinelClient(h, p)
master match { // this can be done without synchronization because the threads are not yet live
case Some(_) =>
case None =>
try {
master = client.getMasterAddrByName(masterName)
} catch {
case e: Throwable => error("Error connecting to sentinel.", e)
}
}
client
}
private val sentinelListeners = sentinelClients map { client =>
val listener = new SentinelListener(client)
new Thread(listener).start()
listener
}

private class SentinelListener(val client: SentinelClient) extends Runnable {
@volatile var running: Boolean = false

def run() {
running = true
while (running) {
try {
client.synchronized {
client.send("SUBSCRIBE", List("+switch-master"))(())
}
new client.Consumer((msg: PubSubMessage) =>
msg match {
case M(chan, msgText) =>
val tokens = msgText split ' '
val addr = tokens(3)
val port = tokens(4).toInt
master.synchronized {
master = Some(addr, port)
}
fireCallbacks(new InetSocketAddress(addr, port))
case _ =>
}).run() // synchronously read, so we know when a disconnect happens
} catch {
case e: Throwable => error("Error connecting to sentinel.", e)
}
}
}

def stop() {
client.synchronized {
client.unsubscribe
}
running = false
}
}
}
18 changes: 12 additions & 6 deletions src/main/scala/com/redis/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import org.apache.commons.pool._
import org.apache.commons.pool.impl._
import com.redis.cluster.ClusterNode

private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None)
private [redis] class RedisClientFactory(val addr: NodeAddress, val database: Int = 0, val secret: Option[Any] = None)
extends PoolableObjectFactory[RedisClient] {

// when we make an object it's already connected
def makeObject = {
val cl = new RedisClient(host, port)
val cl = new RedisClient(addr)
if (database != 0)
cl.select(database)
secret.foreach(cl auth _)
Expand All @@ -30,9 +30,15 @@ private [redis] class RedisClientFactory(val host: String, val port: Int, val da
def activateObject(rc: RedisClient): Unit = {}
}

class RedisClientPool(val host: String, val port: Int, val maxIdle: Int = 8, val database: Int = 0, val secret: Option[Any] = None) {
val pool = new StackObjectPool(new RedisClientFactory(host, port, database, secret), maxIdle)
override def toString = host + ":" + String.valueOf(port)
class RedisClientPool(val addr: NodeAddress, val maxIdle: Int = 8, val database: Int = 0, val secret: Option[Any] = None) {
def host: String = addr.addr._1
def port: Int = addr.addr._2

def this(host: String, port: Int) =
this(new FixedAddress(host, port))

val pool = new StackObjectPool(new RedisClientFactory(addr, database, secret), maxIdle)
override def toString = addr.toString

def withClient[T](body: RedisClient => T) = {
val client = pool.borrowObject
Expand All @@ -52,6 +58,6 @@ class RedisClientPool(val host: String, val port: Int, val maxIdle: Int = 8, val
* @param poolname must be unique
*/
class IdentifiableRedisClientPool(val node: ClusterNode)
extends RedisClientPool (node.host, node.port, node.maxIdle, node.database, node.secret){
extends RedisClientPool (new FixedAddress(node.host, node.port), node.maxIdle, node.database, node.secret) {
override def toString = node.nodename
}
6 changes: 3 additions & 3 deletions src/main/scala/com/redis/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ trait RedisCommand extends Redis
with EvalOperations


class RedisClient(override val host: String, override val port: Int)
class RedisClient(val addr: NodeAddress)
extends RedisCommand with PubSub {

connect

def this(host: String, port: Int) = this(new FixedAddress(host, port))
def this() = this("localhost", 6379)
override def toString = host + ":" + String.valueOf(port)

Expand Down Expand Up @@ -153,8 +154,7 @@ class RedisClient(override val host: String, override val port: Int)
null.asInstanceOf[A]
}

val host = parent.host
val port = parent.port
lazy val addr = parent.addr

// TODO: Find a better abstraction
override def connected = parent.connected
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/com/redis/RedisProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private [redis] trait Reply {
type Reply[T] = PartialFunction[(Char, Array[Byte]), T]
type SingleReply = Reply[Option[Array[Byte]]]
type MultiReply = Reply[Option[List[Option[Array[Byte]]]]]
type MultiMultiReply = Reply[Option[List[Option[List[Option[Array[Byte]]]]]]]

def readLine: Array[Byte]
def readCounted(c: Int): Array[Byte]
Expand Down Expand Up @@ -80,6 +81,14 @@ private [redis] trait Reply {
}
}

val multiMultiBulkReply: MultiMultiReply = {
case (MULTI, str) =>
Parsers.parseInt(str) match {
case -1 => None
case n => Some(List.fill(n)(receive(multiBulkReply)))
}
}

def execReply(handlers: Seq[() => Any]): PartialFunction[(Char, Array[Byte]), Option[List[Any]]] = {
case (MULTI, str) =>
Parsers.parseInt(str) match {
Expand Down Expand Up @@ -146,6 +155,12 @@ private [redis] trait R extends Reply {
case _ => Iterator.single(None)
}.toList)

def asListOfListPairs[A,B](implicit parseA: Parse[A], parseB: Parse[B]): Option[List[Option[List[Option[(A,B)]]]]] =
receive(multiMultiBulkReply).map(_.map(_.map(_.grouped(2).map {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new MapOverflowException :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, will probably need refactoring if it gets to ListOfListOfListPairs.

case List(Some(a), Some(b)) => Some((parseA(a), parseB(b)))
case _ => None
}.toList)))

def asQueuedList: Option[List[Option[String]]] = receive(queuedReplyList).map(_.map(_.map(Parsers.parseString)))

def asExec(handlers: Seq[() => Any]): Option[List[Any]] = receive(execReply(handlers))
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/com/redis/SentinelClient.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.redis

class SentinelClient(override val host: String, override val port: Int) extends Redis
with SentinelOperations
with PubSub {

lazy val addr: NodeAddress = new FixedAddress(host, port) // can only be fixed, not a dynamic master

def this() = this("localhost", 26379)
override def toString = host + ":" + String.valueOf(port)

// publishing is not allowed on a sentinel's pub/sub channel
override def publish(channel: String, msg: String): Option[Long] =
throw new RuntimeException("Publishing is not supported on a sentinel.")
}
32 changes: 32 additions & 0 deletions src/main/scala/com/redis/SentinelOperations.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.redis

import serialization._

trait SentinelOperations { self: Redis =>

def masters[K,V](implicit format: Format, parseK: Parse[K], parseV: Parse[V]): Option[List[Option[Map[K,V]]]] =
send("SENTINEL", List("MASTERS"))(asListOfListPairs[K,V].map(_.map(_.map(_.flatten.toMap))))

def slaves[K,V](name: String)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]):
Option[List[Option[Map[K,V]]]] =
send("SENTINEL", List("SLAVES", name))(asListOfListPairs[K,V].map(_.map(_.map(_.flatten.toMap))))

def isMasterDownByAddr(host: String, port: Int): Option[(Boolean, String)] =
send("SENTINEL", List("IS-MASTER-DOWN-BY-ADDR", host, port))(asList) match {
case Some(List(Some(down), Some(leader))) => Some(down.toInt == 1, leader)
case _ => None
}

def getMasterAddrByName(name: String): Option[(String, Int)] =
send("SENTINEL", List("GET-MASTER-ADDR-BY-NAME", name))(asList[String]) match {
case Some(List(Some(h), Some(p))) => Some(h, p.toInt)
case _ => None
}

def reset(pattern: String): Option[Int] =
send("SENTINEL", List("RESET", pattern))(asInt)

def failover(name: String): Boolean =
send("SENTINEL", List("FAILOVER", name))(asBoolean)

}
3 changes: 1 addition & 2 deletions src/main/scala/com/redis/cluster/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ case class ClusterNode(nodename: String, host: String, port: Int, database: Int
abstract class RedisCluster(hosts: ClusterNode*) extends RedisCommand {

// not needed at cluster level
override val host = null
override val port = 0
lazy val addr = new FixedAddress(null, 0)

// abstract val
val keyTag: Option[KeyTag]
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/com/redis/cluster/RedisShards.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import scala.util.matching.Regex
abstract class RedisShards(val hosts: List[ClusterNode]) extends RedisCommand {

// not needed at cluster level
override val host = null
override val port = 0
lazy val addr = new FixedAddress(null, 0)

// abstract val
val keyTag: Option[KeyTag]
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/redis/ds/Deque.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ abstract class RedisDeque[A](val blocking: Boolean = false, val timeoutInSecs: I
}
}

import com.redis.{Redis, ListOperations}
import com.redis.{Redis, ListOperations, NodeAddress, FixedAddress}

class RedisDequeClient(val h: String, val p: Int) {
class RedisDequeClient(val a: NodeAddress) {
def this(h: String, p: Int) = this(new FixedAddress(h, p))
def getDeque[A](k: String, blocking: Boolean = false, timeoutInSecs: Int = 0)(implicit format: Format, parse: Parse[A]) =
new RedisDeque(blocking, timeoutInSecs)(format, parse) with ListOperations with Redis {
val host = h
val port = p
lazy val addr = a
val key = k
connect
}
Expand Down
Loading