Skip to content

Commit

Permalink
Add doc for Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
sewenew committed Jul 2, 2018
1 parent fd7a0ea commit 2f84f60
Showing 1 changed file with 98 additions and 1 deletion.
99 changes: 98 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This is a C++ client for Redis. It's based on [hiredis](https://github.com/redis
- Connection pool.
- Redis scripting.
- Thread safe unless otherwise stated.
- Redis publish/subscribe (unstable).
- Redis publish/subscribe.
- Redis pipeline (unstable).
- Redis transaction (unstable).
- Redis Cluster (coming soon).
Expand Down Expand Up @@ -175,6 +175,11 @@ connection_options.port = 6666; // Optional. The default port is 6379.
connection_options.password = "auth"; // Optional. No password by default.
connection_options.db = 1; // Optional. Use the 0th database by default.
// Optional. Timeout before we successfully send request to or receive response from redis.
// By default, the timeout is 0ms, i.e. no timeout and block until we send or receive successfuly.
// NOTE: if any command is timed out, we throw a TimeoutError exception.
connection_options.socket_timeout = std::chrono::milliseconds(200);
// Connect to Redis server with a single connection.
Redis redis1(connection_options);
Expand Down Expand Up @@ -354,6 +359,9 @@ redis.zrangebyscore("zset",
vector<pair<string, string>> kvs = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};
redis.mset(kvs.begin(), kvs.end());
unordered_map<string, string> kv_map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};
redis.mset(kv_map.begin(), kv_map.end());
unordered_map<string, string> str_map = {{"f1", "v1"}, {"f2", "v2"}, {"f3", "v3"}};
redis.hmset("hash", str_map.begin(), str_map.end());
Expand Down Expand Up @@ -558,6 +566,95 @@ redis.georadius("geo",

Please see [redis.h](https://github.com/sewenew/redis-plus-plus/blob/master/src/sw/redis%2B%2B/redis.h) for more API references, and see the [tests](https://github.com/sewenew/redis-plus-plus/tree/master/test/src/sw/redis%2B%2B) for more examples.

### Publish/Subscribe

You can use `Redis::publish` to publish messages to channels. *redis-plus-plus* picks a connection from the connection pool, and publishes message with that connection. So you might publish two messages with two different connections.

When you subscribe to a channel with a connection, all messages published to the channel are sent back to that connection. So there's **NO** `Redis::subscribe` method. Instead, you can use `Redis::subscriber` to create a `Subscriber` and the `Subscriber` maintains a connection to Redis. The underlying connection is a new connection, NOT picked from the connection pool. This new connection has the same `ConnectionOptions` as the `Redis` object.

With `Subscriber`, you can call `Subscriber::subscribe`, `Subscriber::unsubscribe`, `Subscriber::psubscribe` and `Subscriber::punsubscribe` to send *SUBSCRIBE*, `UNSUBSCRIBE`, *PSUBSCRIBE* and *PUNSUBSCRIBE* commands to Redis.

`Subscriber` is **NOT** thread-safe. If you want to call its member functions in multi-thread environment, you need to synchronize between threads manually.

#### Subscriber Callbacks

There are 6 kinds of messages:
- *MESSAGE*: message sent to a channel.
- *PMESSAGE*: message sent to channels of a given pattern.
- *SUBSCRIBE*: message sent when we successfully subscribe to a channel.
- *UNSUBSCRIBE*: message sent when we successfully unsubscribe to a channel.
- *PSUBSCRIBE*: message sent when we successfully subscribe to a channel pattern.
- *PUNSUBSCRIBE*: message sent when we successfully unsubscribe to a channel pattern.

We call messages of *SUBSCRIBE*, *UNSUBSCRIBE*, *PSUBSCRIBE* and *PUNSUBSCRIBE* types as *META MESSAGE*s.

In order to process these messages, you can set callback functions on `Subscriber`:
- `Subscriber::on_message(MsgCallback)`: set callback function for messages of *MESSAGE* type, and the callback interface is: `void (std::string channel, std::string msg)`.
- `Subscriber::on_pmessage(PatternMsgCallback)`: set the callback function for messages of *PMESSAGE* type, and the callback interface is: `void (std::string pattern, std::string channel, std::string msg)`.
- `Subscriber::on_meta(MetaCallback)`: set callback function for messages of *META MESSAGE* type, and the callback interface is: `void (Subscriber::MsgType type, OptionalString channel, long long num)`. If you haven't subscribe/psubscribe to any channel/pattern, and try to unsubscribe/punsubscribe without any parameter, i.e. unsubscribe/punsubscribe all channels/patterns, *channel* will be null. So the second parameter of meta callback is of type *OptionalString*.

All these callback interfaces pass `std::string` by value, and you can take their ownership (i.e. std::move) safely.

#### Consume Messages

You can call `Subscriber::consume` to consume messages published to channels/patterns that the `Subscriber` has been subscribed.

`Subscriber::consume` waits for message from the underlying connection. If the `ConnectionOptions::socket_timeout` is reached, and there's no message sent to this connection, `Subscriber::consume` throws `TimeoutError` exception. If `ConnectionOptions::socket_timeout` is 0ms, `Subscriber::consume` blocks until it receives a message.

After receiving the message, `Subscriber::consume` calls the callback function to process the message based on message type. However, if you don't set callback for a specific kind of message, `Subscriber::consume` will ignore the received message, i.e. no callback will be called.

#### Examples

The following example is a common pattern for using `Subscriber`:

```
// Create a Subscriber.
auto sub = redis.subscriber();
// Set callback functions.
sub.on_message([](std::string channel, std::string msg) {
// Process message of MESSAGE type.
});
sub.on_pmessage([](std::string pattern, std::string channel, std::string msg) {
// Process message of PMESSAGE type.
});
sub.on_meta([](Subscriber::MsgType type, OptionalString channel, long long num) {
// Process message of META type.
});
// Subscribe to channels and patterns.
sub.subscribe("channel1");
sub.subscribe({"channel2", "channel3"});
sub.psubscribe("pattern1*");
// Consume messages in a loop.
while (true) {
try {
sub.consume();
} catch (...) {
// Handle exceptions.
}
}
```

If `ConnectionOptions::socket_timeout` is set, you might get `TimeoutError` exception before receiving a message:

```
while (true) {
try {
sub.consume();
} catch (const TimeoutError &e) {
// Try again.
continue;
} catch (...) {
// Handle other exceptions.
}
}
```

## Author

*redis-plus-plus* is written by sewenew, who is also active on [StackOverflow](https://stackoverflow.com/users/5384363/for-stack).

0 comments on commit 2f84f60

Please sign in to comment.