Skip to content

Commit 381b95c

Browse files
committed
Use aws-sdk-go-v2 in dynamodb
Signed-off-by: Friedrich Gonzalez <[email protected]>
1 parent 2c07f00 commit 381b95c

File tree

2 files changed

+190
-161
lines changed

2 files changed

+190
-161
lines changed

pkg/ring/kv/dynamodb/dynamodb.go

Lines changed: 111 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import (
88
"strconv"
99
"time"
1010

11-
"github.com/aws/aws-sdk-go/aws"
12-
"github.com/aws/aws-sdk-go/aws/session"
13-
"github.com/aws/aws-sdk-go/service/dynamodb"
14-
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
11+
"github.com/aws/aws-sdk-go-v2/aws"
12+
"github.com/aws/aws-sdk-go-v2/config"
13+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
14+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
1515
"github.com/go-kit/log"
1616
)
1717

@@ -34,8 +34,15 @@ type dynamoDbClient interface {
3434
Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem, delete []dynamodbKey) (bool, error)
3535
}
3636

37+
type dynamoDBAPI interface {
38+
dynamodb.QueryAPIClient
39+
DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
40+
PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
41+
TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
42+
}
43+
3744
type dynamodbKV struct {
38-
ddbClient dynamodbiface.DynamoDBAPI
45+
ddbClient dynamoDBAPI
3946
logger log.Logger
4047
tableName *string
4148
ttlValue time.Duration
@@ -59,69 +66,65 @@ func newDynamodbKV(cfg Config, logger log.Logger) (dynamodbKV, error) {
5966
return dynamodbKV{}, err
6067
}
6168

62-
sess, err := session.NewSession()
69+
awsCfg, err := config.LoadDefaultConfig(context.Background(),
70+
config.WithRegion(cfg.Region),
71+
)
6372
if err != nil {
6473
return dynamodbKV{}, err
6574
}
6675

67-
awsCfg := aws.NewConfig()
68-
if len(cfg.Region) > 0 {
69-
awsCfg = awsCfg.WithRegion(cfg.Region)
70-
}
71-
72-
dynamoDB := dynamodb.New(sess, awsCfg)
76+
dynamoDB := dynamodb.NewFromConfig(awsCfg)
7377

74-
ddbKV := &dynamodbKV{
78+
return dynamodbKV{
7579
ddbClient: dynamoDB,
7680
logger: logger,
7781
tableName: aws.String(cfg.TableName),
7882
ttlValue: cfg.TTL,
79-
}
80-
81-
return *ddbKV, nil
83+
}, nil
8284
}
8385

8486
func validateConfigInput(cfg Config) error {
8587
if len(cfg.TableName) < 3 {
8688
return fmt.Errorf("invalid dynamodb table name: %s", cfg.TableName)
8789
}
88-
8990
return nil
9091
}
9192

92-
// for testing
9393
func (kv dynamodbKV) getTTL() time.Duration {
9494
return kv.ttlValue
9595
}
9696

9797
func (kv dynamodbKV) List(ctx context.Context, key dynamodbKey) ([]string, float64, error) {
9898
var keys []string
9999
var totalCapacity float64
100+
100101
input := &dynamodb.QueryInput{
101102
TableName: kv.tableName,
102-
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
103-
KeyConditions: map[string]*dynamodb.Condition{
103+
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
104+
KeyConditions: map[string]types.Condition{
104105
primaryKey: {
105-
ComparisonOperator: aws.String("EQ"),
106-
AttributeValueList: []*dynamodb.AttributeValue{
107-
{
108-
S: aws.String(key.primaryKey),
109-
},
106+
ComparisonOperator: types.ComparisonOperatorEq,
107+
AttributeValueList: []types.AttributeValue{
108+
&types.AttributeValueMemberS{Value: key.primaryKey},
110109
},
111110
},
112111
},
113-
AttributesToGet: []*string{aws.String(sortKey)},
112+
AttributesToGet: []string{sortKey},
114113
}
115114

116-
err := kv.ddbClient.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool {
117-
totalCapacity += getCapacityUnits(output.ConsumedCapacity)
118-
for _, item := range output.Items {
119-
keys = append(keys, item[sortKey].String())
115+
paginator := dynamodb.NewQueryPaginator(kv.ddbClient, input)
116+
117+
for paginator.HasMorePages() {
118+
page, err := paginator.NextPage(ctx)
119+
if err != nil {
120+
return nil, totalCapacity, err
121+
}
122+
totalCapacity += getCapacityUnits(page.ConsumedCapacity)
123+
for _, item := range page.Items {
124+
if v, ok := item[sortKey].(*types.AttributeValueMemberS); ok {
125+
keys = append(keys, v.Value)
126+
}
120127
}
121-
return true
122-
})
123-
if err != nil {
124-
return nil, totalCapacity, err
125128
}
126129

127130
return keys, totalCapacity, nil
@@ -130,48 +133,54 @@ func (kv dynamodbKV) List(ctx context.Context, key dynamodbKey) ([]string, float
130133
func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool) (map[string]dynamodbItem, float64, error) {
131134
keys := make(map[string]dynamodbItem)
132135
var totalCapacity float64
133-
co := dynamodb.ComparisonOperatorEq
136+
137+
co := types.ComparisonOperatorEq
134138
if isPrefix {
135-
co = dynamodb.ComparisonOperatorBeginsWith
139+
co = types.ComparisonOperatorBeginsWith
136140
}
141+
137142
input := &dynamodb.QueryInput{
138143
TableName: kv.tableName,
139-
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
140-
KeyConditions: map[string]*dynamodb.Condition{
144+
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
145+
KeyConditions: map[string]types.Condition{
141146
primaryKey: {
142-
ComparisonOperator: aws.String(co),
143-
AttributeValueList: []*dynamodb.AttributeValue{
144-
{
145-
S: aws.String(key.primaryKey),
146-
},
147+
ComparisonOperator: co,
148+
AttributeValueList: []types.AttributeValue{
149+
&types.AttributeValueMemberS{Value: key.primaryKey},
147150
},
148151
},
149152
},
150153
}
151154

152-
err := kv.ddbClient.QueryPagesWithContext(ctx, input, func(output *dynamodb.QueryOutput, _ bool) bool {
153-
totalCapacity += getCapacityUnits(output.ConsumedCapacity)
154-
for _, item := range output.Items {
155+
paginator := dynamodb.NewQueryPaginator(kv.ddbClient, input)
156+
157+
for paginator.HasMorePages() {
158+
page, err := paginator.NextPage(ctx)
159+
if err != nil {
160+
return nil, totalCapacity, err
161+
}
162+
totalCapacity += getCapacityUnits(page.ConsumedCapacity)
163+
164+
for _, item := range page.Items {
155165
itemVersion := int64(0)
156-
if item[version] != nil {
157-
parsedVersion, err := strconv.ParseInt(*item[version].N, 10, 0)
166+
if v, ok := item[version].(*types.AttributeValueMemberN); ok {
167+
parsedVersion, err := strconv.ParseInt(v.Value, 10, 0)
158168
if err != nil {
159-
kv.logger.Log("msg", "failed to parse item version", "version", *item[version].N, "err", err)
169+
kv.logger.Log("msg", "failed to parse item version", "version", v.Value, "err", err)
160170
} else {
161171
itemVersion = parsedVersion
162172
}
163173
}
164174

165-
keys[*item[sortKey].S] = dynamodbItem{
166-
data: item[contentData].B,
167-
version: itemVersion,
175+
if d, ok := item[contentData].(*types.AttributeValueMemberB); ok {
176+
if s, ok := item[sortKey].(*types.AttributeValueMemberS); ok {
177+
keys[s.Value] = dynamodbItem{
178+
data: d.Value,
179+
version: itemVersion,
180+
}
181+
}
168182
}
169-
170183
}
171-
return true
172-
})
173-
if err != nil {
174-
return nil, totalCapacity, err
175184
}
176185

177186
return keys, totalCapacity, nil
@@ -180,28 +189,22 @@ func (kv dynamodbKV) Query(ctx context.Context, key dynamodbKey, isPrefix bool)
180189
func (kv dynamodbKV) Delete(ctx context.Context, key dynamodbKey) (float64, error) {
181190
input := &dynamodb.DeleteItemInput{
182191
TableName: kv.tableName,
183-
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
192+
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
184193
Key: generateItemKey(key),
185194
}
186-
totalCapacity := float64(0)
187-
output, err := kv.ddbClient.DeleteItemWithContext(ctx, input)
188-
if err != nil {
189-
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
190-
}
195+
output, err := kv.ddbClient.DeleteItem(ctx, input)
196+
totalCapacity := getCapacityUnits(output.ConsumedCapacity)
191197
return totalCapacity, err
192198
}
193199

194200
func (kv dynamodbKV) Put(ctx context.Context, key dynamodbKey, data []byte) (float64, error) {
195201
input := &dynamodb.PutItemInput{
196202
TableName: kv.tableName,
197-
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
203+
ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal,
198204
Item: kv.generatePutItemRequest(key, dynamodbItem{data: data}),
199205
}
200-
totalCapacity := float64(0)
201-
output, err := kv.ddbClient.PutItemWithContext(ctx, input)
202-
if err != nil {
203-
totalCapacity = getCapacityUnits(output.ConsumedCapacity)
204-
}
206+
output, err := kv.ddbClient.PutItem(ctx, input)
207+
totalCapacity := getCapacityUnits(output.ConsumedCapacity)
205208
return totalCapacity, err
206209
}
207210

@@ -212,75 +215,72 @@ func (kv dynamodbKV) Batch(ctx context.Context, put map[dynamodbKey]dynamodbItem
212215
return totalCapacity, false, nil
213216
}
214217

215-
writeRequestsSlices := make([][]*dynamodb.TransactWriteItem, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit))))
216-
for i := 0; i < len(writeRequestsSlices); i++ {
217-
writeRequestsSlices[i] = make([]*dynamodb.TransactWriteItem, 0, DdbBatchSizeLimit)
218+
writeRequestsSlices := make([][]types.TransactWriteItem, int(math.Ceil(float64(writeRequestSize)/float64(DdbBatchSizeLimit))))
219+
for i := range writeRequestsSlices {
220+
writeRequestsSlices[i] = make([]types.TransactWriteItem, 0, DdbBatchSizeLimit)
218221
}
222+
219223
currIdx := 0
220224
for key, ddbItem := range put {
221225
item := kv.generatePutItemRequest(key, ddbItem)
222-
ddbPut := &dynamodb.Put{
223-
TableName: kv.tableName,
224-
Item: item,
225-
}
226-
// condition for optimistic locking; DynamoDB will only succeed the request if either the version attribute does not exist
227-
// (for backwards compatibility) or the object version has not changed since it was last read
228-
ddbPut.ConditionExpression = aws.String("attribute_not_exists(version) OR version = :v")
229-
ddbPut.ExpressionAttributeValues = map[string]*dynamodb.AttributeValue{
230-
":v": {N: aws.String(strconv.FormatInt(ddbItem.version, 10))},
226+
ddbPut := &types.Put{
227+
TableName: kv.tableName,
228+
Item: item,
229+
ConditionExpression: aws.String("attribute_not_exists(version) OR version = :v"),
230+
ExpressionAttributeValues: map[string]types.AttributeValue{
231+
":v": &types.AttributeValueMemberN{Value: strconv.FormatInt(ddbItem.version, 10)},
232+
},
231233
}
232234

233-
writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{Put: ddbPut})
235+
writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], types.TransactWriteItem{Put: ddbPut})
234236
if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit {
235237
currIdx++
236238
}
237239
}
238240

239241
for _, key := range delete {
240242
item := generateItemKey(key)
241-
writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], &dynamodb.TransactWriteItem{
242-
Delete: &dynamodb.Delete{
243-
TableName: kv.tableName,
244-
Key: item,
245-
},
246-
})
243+
ddbDelete := &types.Delete{
244+
TableName: kv.tableName,
245+
Key: item,
246+
}
247+
writeRequestsSlices[currIdx] = append(writeRequestsSlices[currIdx], types.TransactWriteItem{Delete: ddbDelete})
247248
if len(writeRequestsSlices[currIdx]) == DdbBatchSizeLimit {
248249
currIdx++
249250
}
250251
}
251252

252253
for _, slice := range writeRequestsSlices {
253-
transactItems := &dynamodb.TransactWriteItemsInput{
254-
TransactItems: slice,
254+
if len(slice) == 0 {
255+
continue
255256
}
256-
resp, err := kv.ddbClient.TransactWriteItemsWithContext(ctx, transactItems)
257+
resp, err := kv.ddbClient.TransactWriteItems(ctx, &dynamodb.TransactWriteItemsInput{
258+
TransactItems: slice,
259+
})
257260
if err != nil {
258-
var checkFailed *dynamodb.ConditionalCheckFailedException
259-
isCheckFailedException := errors.As(err, &checkFailed)
260-
if isCheckFailedException {
261-
kv.logger.Log("msg", "conditional check failed on DynamoDB Batch", "item", fmt.Sprintf("%v", checkFailed.Item), "err", err)
261+
var checkFailed *types.ConditionalCheckFailedException
262+
isCheckFailed := errors.As(err, &checkFailed)
263+
if isCheckFailed {
264+
kv.logger.Log("msg", "conditional check failed on DynamoDB Batch", "err", err)
262265
}
263-
return totalCapacity, isCheckFailedException, err
266+
return totalCapacity, isCheckFailed, err
264267
}
265268
for _, consumedCapacity := range resp.ConsumedCapacity {
266-
totalCapacity += getCapacityUnits(consumedCapacity)
269+
totalCapacity += getCapacityUnits(&consumedCapacity)
267270
}
268271
}
269272

270273
return totalCapacity, false, nil
271274
}
272275

273-
func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbItem) map[string]*dynamodb.AttributeValue {
276+
func (kv dynamodbKV) generatePutItemRequest(key dynamodbKey, ddbItem dynamodbItem) map[string]types.AttributeValue {
274277
item := generateItemKey(key)
275-
item[contentData] = &dynamodb.AttributeValue{
276-
B: ddbItem.data,
277-
}
278-
item[version] = &dynamodb.AttributeValue{
279-
N: aws.String(strconv.FormatInt(ddbItem.version+1, 10)),
280-
}
278+
item[contentData] = &types.AttributeValueMemberB{Value: ddbItem.data}
279+
item[version] = &types.AttributeValueMemberN{Value: strconv.FormatInt(ddbItem.version+1, 10)}
280+
281281
if kv.getTTL() > 0 {
282-
item[timeToLive] = &dynamodb.AttributeValue{
283-
N: aws.String(strconv.FormatInt(time.Now().UTC().Add(kv.getTTL()).Unix(), 10)),
282+
item[timeToLive] = &types.AttributeValueMemberN{
283+
Value: strconv.FormatInt(time.Now().UTC().Add(kv.getTTL()).Unix(), 10),
284284
}
285285
}
286286

@@ -326,22 +326,17 @@ func (d *dynamodbKVWithTimeout) Batch(ctx context.Context, put map[dynamodbKey]d
326326
return d.ddbClient.Batch(ctx, put, delete)
327327
}
328328

329-
func generateItemKey(key dynamodbKey) map[string]*dynamodb.AttributeValue {
330-
resp := map[string]*dynamodb.AttributeValue{
331-
primaryKey: {
332-
S: aws.String(key.primaryKey),
333-
},
329+
func generateItemKey(key dynamodbKey) map[string]types.AttributeValue {
330+
resp := map[string]types.AttributeValue{
331+
primaryKey: &types.AttributeValueMemberS{Value: key.primaryKey},
334332
}
335333
if len(key.sortKey) > 0 {
336-
resp[sortKey] = &dynamodb.AttributeValue{
337-
S: aws.String(key.sortKey),
338-
}
334+
resp[sortKey] = &types.AttributeValueMemberS{Value: key.sortKey}
339335
}
340-
341336
return resp
342337
}
343338

344-
func getCapacityUnits(cap *dynamodb.ConsumedCapacity) float64 {
339+
func getCapacityUnits(cap *types.ConsumedCapacity) float64 {
345340
if cap != nil && cap.CapacityUnits != nil {
346341
return *cap.CapacityUnits
347342
}

0 commit comments

Comments
 (0)