Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 141 additions & 70 deletions store/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
SOH = "\x01"

defaultTimeout = 10 * time.Second

syncRetryLimit = 5
)

// Zookeeper is the receiver type for
Expand Down Expand Up @@ -66,21 +68,12 @@ func (s *Zookeeper) setTimeout(time time.Duration) {
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
resp, meta, err := s.client.Get(s.normalize(key))

resp, meta, err := s.get(key)
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}

// FIXME handle very rare cases where Get returns the
// SOH control character instead of the actual value
if string(resp) == SOH {
return s.Get(store.Normalize(key))
}

pair = &store.KVPair{
Key: key,
Value: resp,
Expand All @@ -91,14 +84,21 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
}

// createFullPath creates the entire path for a directory
// that does not exist
func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
// that does not exist and sets the value of the last
// znode to data
func (s *Zookeeper) createFullPath(path []string, data []byte, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))

if i == len(path) {
flag := 0
if ephemeral {
flag = zk.FlagEphemeral
}
_, err := s.client.Create(newpath, data, int32(flag), zk.WorldACL(zk.PermAll))
return err
}

_, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
Expand All @@ -121,13 +121,14 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro

if !exists {
if opts != nil && opts.TTL > 0 {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, true)
} else {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, false)
}
} else {
_, err = s.client.Set(fkey, value, -1)
}

_, err = s.client.Set(fkey, value, -1)
return err
}

Expand Down Expand Up @@ -155,32 +156,30 @@ func (s *Zookeeper) Exists(key string) (bool, error) {
// be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
// Get the key first
pair, err := s.Get(key)
if err != nil {
return nil, err
}

// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *store.KVPair)
go func() {
defer close(watchCh)

// Get returns the current value to the channel prior
// to listening to any event that may occur on that key
watchCh <- pair
var fireEvt = true
for {
_, _, eventCh, err := s.client.GetW(s.normalize(key))
resp, meta, eventCh, err := s.getW(key)
if err != nil {
return
}
if fireEvt {
watchCh <- &store.KVPair{
Key: key,
Value: resp,
LastIndex: uint64(meta.Version),
}
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
// Only fire an event if the data in the node changed.
// Simply reset the watch if this is any other event
// (e.g. a session event).
fireEvt = e.Type == zk.EventNodeDataChanged
case <-stopCh:
// There is no way to stop GetW so just quit
return
Expand All @@ -197,36 +196,35 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP
// will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
// List the childrens first
entries, err := s.List(directory)
if err != nil {
return nil, err
}

// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*store.KVPair)
go func() {
defer close(watchCh)

// List returns the children values to the channel
// prior to listening to any events that may occur
// on those keys
watchCh <- entries

var fireEvt = true
for {
_, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
WATCH:
keys, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
if err != nil {
return
}
if fireEvt {
kvs, err := s.getKVPairs(directory, keys)
if err != nil {
// Failed to get values for one or more of the keys,
// the list may be out of date so try again.
goto WATCH
}
watchCh <- kvs
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(directory); err == nil {
watchCh <- kv
}
}
// Only fire an event if the children have changed.
// Simply reset the watch if this is any other event
// (e.g. a session event).
fireEvt = e.Type == zk.EventNodeChildrenChanged
case <-stopCh:
// There is no way to stop GetW so just quit
// There is no way to stop ChildrenW so just quit
return
}
}
Expand All @@ -237,35 +235,24 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan

// List child nodes of a given directory
func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
keys, stat, err := s.client.Children(s.normalize(directory))
keys, _, err := s.client.Children(s.normalize(directory))
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}

kv := []*store.KVPair{}

// FIXME Costly Get request for each child key..
for _, key := range keys {
pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
if err != nil {
// If node is not found: List is out of date, retry
if err == store.ErrKeyNotFound {
return s.List(directory)
}
return nil, err
kvs, err := s.getKVPairs(directory, keys)
if err != nil {
// If node is not found: List is out of date, retry
if err == store.ErrKeyNotFound {
return s.List(directory)
}

kv = append(kv, &store.KVPair{
Key: key,
Value: []byte(pair.Value),
LastIndex: uint64(stat.Version),
})
return nil, err
}

return kv, nil
return kvs, nil
}

// DeleteTree deletes a range of keys under a given directory
Expand Down Expand Up @@ -313,7 +300,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair,
// Create the directory
parts := store.SplitKey(strings.TrimSuffix(key, "/"))
parts = parts[:len(parts)-1]
if err = s.createFullPath(parts, false); err != nil {
if err = s.createFullPath(parts, []byte{}, false); err != nil {
// Failed to create the directory.
return false, nil, err
}
Expand Down Expand Up @@ -427,3 +414,87 @@ func (s *Zookeeper) normalize(key string) string {
key = store.Normalize(key)
return strings.TrimSuffix(key, "/")
}

func (s *Zookeeper) get(key string) ([]byte, *zk.Stat, error) {
var resp []byte
var meta *zk.Stat
var err error

// To guard against older versions of libkv
// creating and writing to znodes non-atomically,
// We try to resync few times if we read SOH or
// an empty string
for i := 0; i <= syncRetryLimit; i++ {
resp, meta, err = s.client.Get(s.normalize(key))

if err != nil {
if err == zk.ErrNoNode {
return nil, nil, store.ErrKeyNotFound
}
return nil, nil, err
}

if string(resp) != SOH && string(resp) != "" {
return resp, meta, nil
}

if i < syncRetryLimit {
if _, err = s.client.Sync(s.normalize(key)); err != nil {
return nil, nil, err
}
}
}
return resp, meta, nil
}

func (s *Zookeeper) getW(key string) ([]byte, *zk.Stat, <-chan zk.Event, error) {
var resp []byte
var meta *zk.Stat
var ech <-chan zk.Event
var err error

// To guard against older versions of libkv
// creating and writing to znodes non-atomically,
// We try to resync few times if we read SOH or
// an empty string
for i := 0; i <= syncRetryLimit; i++ {
resp, meta, ech, err = s.client.GetW(s.normalize(key))

if err != nil {
if err == zk.ErrNoNode {
return nil, nil, nil, store.ErrKeyNotFound
}
return nil, nil, nil, err
}

if string(resp) != SOH && string(resp) != "" {
return resp, meta, ech, nil
}

if i < syncRetryLimit {
if _, err = s.client.Sync(s.normalize(key)); err != nil {
return nil, nil, nil, err
}
}
}
return resp, meta, ech, nil
}

func (s *Zookeeper) getKVPairs(directory string, keys []string) ([]*store.KVPair, error) {
kvs := []*store.KVPair{}

for _, key := range keys {
pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
if err != nil {
return nil, err
}

kvs = append(kvs, &store.KVPair{
Key: key,
Value: pair.Value,
LastIndex: pair.LastIndex,
})
}

return kvs, nil
}