1
-
2
1
// Yury Kozyrev (urakozz)
3
2
// MIT License
4
3
package stream
@@ -23,9 +22,9 @@ type StreamSubscriber struct {
23
22
}
24
23
25
24
func NewStreamSubscriber (
26
- dynamoSvc * dynamodb.DynamoDB ,
27
- streamSvc * dynamodbstreams.DynamoDBStreams ,
28
- table string ) * StreamSubscriber {
25
+ dynamoSvc * dynamodb.DynamoDB ,
26
+ streamSvc * dynamodbstreams.DynamoDBStreams ,
27
+ table string ) * StreamSubscriber {
29
28
s := & StreamSubscriber {dynamoSvc : dynamoSvc , streamSvc : streamSvc , table : & table }
30
29
s .applyDefaults ()
31
30
return s
@@ -50,7 +49,7 @@ func (r *StreamSubscriber) GetStreamData() (<-chan *dynamodbstreams.Record, <-ch
50
49
ch := make (chan * dynamodbstreams.Record , 1 )
51
50
errCh := make (chan error , 1 )
52
51
53
- go func (ch chan <- * dynamodbstreams.Record , errCh chan <- error ) {
52
+ go func (ch chan <- * dynamodbstreams.Record , errCh chan <- error ) {
54
53
var shardId * string
55
54
var prevShardId * string
56
55
var streamArn * string
@@ -89,7 +88,7 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,
89
88
needUpdateChannel <- struct {}{}
90
89
91
90
allShards := make (map [string ]struct {})
92
- shardProcessingLimit := 5 ;
91
+ shardProcessingLimit := 5
93
92
shardsCh := make (chan * dynamodbstreams.GetShardIteratorInput , shardProcessingLimit )
94
93
lock := sync.Mutex {}
95
94
@@ -112,7 +111,7 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,
112
111
errCh <- err
113
112
return
114
113
}
115
- ids , err := r .getShardIds (streamArn );
114
+ ids , err := r .getShardIds (streamArn )
116
115
if err != nil {
117
116
errCh <- err
118
117
return
@@ -137,17 +136,17 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,
137
136
138
137
limit := make (chan struct {}, shardProcessingLimit )
139
138
140
- go func (){
139
+ go func () {
141
140
time .Sleep (time .Second * 10 )
142
141
for shardInput := range shardsCh {
143
142
limit <- struct {}{}
144
- go func (sInput * dynamodbstreams.GetShardIteratorInput ){
143
+ go func (sInput * dynamodbstreams.GetShardIteratorInput ) {
145
144
err := r .processShard (sInput , ch )
146
145
if err != nil {
147
146
errCh <- err
148
147
}
149
148
// TODO: think about cleaning list of shards: delete(allShards, *sInput.ShardId)
150
- <- limit
149
+ <- limit
151
150
}(shardInput )
152
151
}
153
152
}()
@@ -211,15 +210,15 @@ func (r *StreamSubscriber) getLatestStreamArn() (*string, error) {
211
210
return tableInfo .Table .LatestStreamArn , nil
212
211
}
213
212
214
- func (r * StreamSubscriber ) processShardBackport (shardId , lastStreamArn * string , ch chan <- * dynamodbstreams.Record ) error {
213
+ func (r * StreamSubscriber ) processShardBackport (shardId , lastStreamArn * string , ch chan <- * dynamodbstreams.Record ) error {
215
214
return r .processShard (& dynamodbstreams.GetShardIteratorInput {
216
215
StreamArn : lastStreamArn ,
217
216
ShardId : shardId ,
218
217
ShardIteratorType : r .ShardIteratorType ,
219
- }, ch );
218
+ }, ch )
220
219
}
221
220
222
- func (r * StreamSubscriber ) processShard (input * dynamodbstreams.GetShardIteratorInput , ch chan <- * dynamodbstreams.Record ) error {
221
+ func (r * StreamSubscriber ) processShard (input * dynamodbstreams.GetShardIteratorInput , ch chan <- * dynamodbstreams.Record ) error {
223
222
iter , err := r .streamSvc .GetShardIterator (input )
224
223
if err != nil {
225
224
return err
@@ -264,4 +263,3 @@ func (r *StreamSubscriber) processShard(input *dynamodbstreams.GetShardIteratorI
264
263
}
265
264
return nil
266
265
}
267
-
0 commit comments