Skip to content

Commit

Permalink
Pass call contexts to Consul API operations
Browse files Browse the repository at this point in the history
As of v0.9.0, the official Consul API supports canceling its operations
using Contexts. This commit removes the hacked-up implementation of
operation canceling I had previously.
  • Loading branch information
bwester committed Jul 27, 2017
1 parent cece853 commit 9baa8d6
Showing 1 changed file with 55 additions and 143 deletions.
198 changes: 55 additions & 143 deletions consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
)

// ConsulCanceler defines an API for accessing a Consul Key-Value store. It's mostly a
// clone of `github.com/hashicorp/consul/api.KV`, but it adds the ability to stop waiting
// for an operation if that operation has expired.
// clone of `github.com/hashicorp/consul/api.KV`, but it adds a parameter for a context
// to method signatures.
//
// Using this interface is deprecated. As of v0.9.0, the official Consul API package
// allows API calls be canceled using a Context, making this interface unnecessary.
type ConsulCanceler interface {
CAS(
ctx context.Context,
Expand Down Expand Up @@ -43,9 +46,7 @@ type ConsulCanceler interface {
}

// CancelConsulKV is the concrete implementation of ConsulCanceler. It takes a Consul
// `Client` object and performs all operations using that client. When an operation is
// "canceled", the method call will return immediately with the context error. The
// underlying HTTP call is not aborted.
// `Client` object and performs all operations using that client.
type CancelConsulKV struct {
// The Consul client to use for executing operations.
Client *consul.Client
Expand All @@ -59,35 +60,16 @@ func (cckv *CancelConsulKV) CAS(
p *consul.KVPair,
q *consul.WriteOptions,
) (bool, *consul.WriteMeta, error) {
successCh := make(chan bool, 1)
metaCh := make(chan *consul.WriteMeta, 1)
errCh := make(chan error, 1)
go func() {
cckv.Logger.WithField("key", p.Key).Debug(" => CAS")
success, meta, err := cckv.Client.KV().CAS(p, q)
cckv.Logger.WithFields(logrus.Fields{
"key": p.Key,
"kv": p,
"success": success,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= CAS")
if err != nil {
errCh <- err
} else {
successCh <- success
metaCh <- meta
}
}()
select {
case success := <-successCh:
meta := <-metaCh
return success, meta, nil
case err := <-errCh:
return false, nil, err
case <-ctx.Done():
return false, nil, ctx.Err()
}
cckv.Logger.WithField("key", p.Key).Debug(" => CAS")
success, meta, err := cckv.Client.KV().CAS(p, q.WithContext(ctx))
cckv.Logger.WithFields(logrus.Fields{
"key": p.Key,
"kv": p,
"success": success,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= CAS")
return success, meta, err
}

// Delete removes a key and its data.
Expand All @@ -96,31 +78,15 @@ func (cckv *CancelConsulKV) Delete(
key string,
w *consul.WriteOptions,
) (*consul.WriteMeta, error) {
metaCh := make(chan *consul.WriteMeta, 1)
errCh := make(chan error, 1)
go func() {
cckv.Logger.WithField("key", key).Debug(" => Delete")
meta, err := cckv.Client.KV().Delete(key, w)
cckv.Logger.WithFields(logrus.Fields{
"key": key,
"options": w,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Delete")
if err != nil {
errCh <- err
} else {
metaCh <- meta
}
}()
select {
case meta := <-metaCh:
return meta, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
cckv.Logger.WithField("key", key).Debug(" => Delete")
meta, err := cckv.Client.KV().Delete(key, w.WithContext(ctx))
cckv.Logger.WithFields(logrus.Fields{
"key": key,
"options": w,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Delete")
return meta, err
}

// Get returns the current value of a key.
Expand All @@ -129,35 +95,16 @@ func (cckv *CancelConsulKV) Get(
key string,
q *consul.QueryOptions,
) (*consul.KVPair, *consul.QueryMeta, error) {
pairCh := make(chan *consul.KVPair, 1)
metaCh := make(chan *consul.QueryMeta, 1)
errCh := make(chan error, 1)
go func() {
cckv.Logger.WithField("key", key).Debug(" => Get")
pair, meta, err := cckv.Client.KV().Get(key, q)
cckv.Logger.WithFields(logrus.Fields{
"key": key,
"options": q,
"kv": pair,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Get")
if err != nil {
errCh <- err
} else {
pairCh <- pair
metaCh <- meta
}
}()
select {
case pair := <-pairCh:
meta := <-metaCh
return pair, meta, nil
case err := <-errCh:
return nil, nil, err
case <-ctx.Done():
return nil, nil, ctx.Err()
}
cckv.Logger.WithField("key", key).Debug(" => Get")
pair, meta, err := cckv.Client.KV().Get(key, q.WithContext(ctx))
cckv.Logger.WithFields(logrus.Fields{
"key": key,
"options": q,
"kv": pair,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Get")
return pair, meta, err
}

// Keys lists all keys under a prefix
Expand All @@ -167,35 +114,16 @@ func (cckv *CancelConsulKV) Keys(
separator string,
q *consul.QueryOptions,
) ([]string, *consul.QueryMeta, error) {
keysCh := make(chan []string, 1)
metaCh := make(chan *consul.QueryMeta, 1)
errCh := make(chan error, 1)
go func() {
cckv.Logger.WithField("prefix", prefix).Debug(" => Keys")
keys, meta, err := cckv.Client.KV().Keys(prefix, separator, q)
cckv.Logger.WithFields(logrus.Fields{
"prefix": prefix,
"options": q,
"keys": keys,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Keys")
if err != nil {
errCh <- err
} else {
keysCh <- keys
metaCh <- meta
}
}()
select {
case keys := <-keysCh:
meta := <-metaCh
return keys, meta, nil
case err := <-errCh:
return nil, nil, err
case <-ctx.Done():
return nil, nil, ctx.Err()
}
cckv.Logger.WithField("prefix", prefix).Debug(" => Keys")
keys, meta, err := cckv.Client.KV().Keys(prefix, separator, q.WithContext(ctx))
cckv.Logger.WithFields(logrus.Fields{
"prefix": prefix,
"options": q,
"keys": keys,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Keys")
return keys, meta, err
}

// Put writes a key-value pair to the store
Expand All @@ -204,30 +132,14 @@ func (cckv *CancelConsulKV) Put(
p *consul.KVPair,
q *consul.WriteOptions,
) (*consul.WriteMeta, error) {
metaCh := make(chan *consul.WriteMeta, 1)
errCh := make(chan error, 1)
go func() {
cckv.Logger.WithField("key", p.Key).Debug(" => Put")
meta, err := cckv.Client.KV().Put(p, q)
cckv.Logger.WithFields(logrus.Fields{
"key": p.Key,
"kv": p,
"options": q,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Put")
if err != nil {
errCh <- err
} else {
metaCh <- meta
}
}()
select {
case meta := <-metaCh:
return meta, nil
case err := <-errCh:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
cckv.Logger.WithField("key", p.Key).Debug(" => Put")
meta, err := cckv.Client.KV().Put(p, q.WithContext(ctx))
cckv.Logger.WithFields(logrus.Fields{
"key": p.Key,
"kv": p,
"options": q,
"meta": meta,
logrus.ErrorKey: err,
}).Debug(" <= Put")
return meta, err
}

0 comments on commit 9baa8d6

Please sign in to comment.