@@ -3,6 +3,7 @@ package stackitprovider
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "sync"
6
7
7
8
stackitdnsclient "github.com/stackitcloud/stackit-sdk-go/services/dns"
8
9
"go.uber.org/zap"
@@ -12,43 +13,99 @@ import (
12
13
13
14
// ApplyChanges applies a given set of changes in a given zone.
14
15
func (d * StackitDNSProvider ) ApplyChanges (ctx context.Context , changes * plan.Changes ) error {
16
+ var tasks []changeTask
15
17
// create rr set. POST /v1/projects/{projectId}/zones/{zoneId}/rrsets
16
- err := d .createRRSets (ctx , changes .Create )
17
- if err != nil {
18
- return err
19
- }
20
-
18
+ tasks = append (tasks , d .buildRRSetTasks (changes .Create , CREATE )... )
21
19
// update rr set. PATCH /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
22
- err = d .updateRRSets (ctx , changes .UpdateNew )
20
+ tasks = append (tasks , d .buildRRSetTasks (changes .UpdateNew , UPDATE )... )
21
+ d .logger .Info ("records to delete" , zap .String ("records" , fmt .Sprintf ("%v" , changes .Delete )))
22
+ // delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
23
+ tasks = append (tasks , d .buildRRSetTasks (changes .Delete , DELETE )... )
24
+
25
+ zones , err := d .zoneFetcherClient .zones (ctx )
23
26
if err != nil {
24
27
return err
25
28
}
26
29
27
- // delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
28
- err = d .deleteRRSets (ctx , changes .Delete )
29
- if err != nil {
30
- return err
30
+ return d .handleRRSetWithWorkers (ctx , tasks , zones )
31
+ }
32
+
33
+ // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
34
+ func (d * StackitDNSProvider ) buildRRSetTasks (
35
+ endpoints []* endpoint.Endpoint ,
36
+ action string ,
37
+ ) []changeTask {
38
+ tasks := make ([]changeTask , 0 , len (endpoints ))
39
+
40
+ for _ , change := range endpoints {
41
+ tasks = append (tasks , changeTask {
42
+ action : action ,
43
+ change : change ,
44
+ })
31
45
}
32
46
33
- return nil
47
+ return tasks
34
48
}
35
49
36
- // createRRSets creates new record sets in the stackitprovider for the given endpoints that are in the
37
- // creation field.
38
- func (d * StackitDNSProvider ) createRRSets (
50
+ // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
51
+ func (d * StackitDNSProvider ) handleRRSetWithWorkers (
39
52
ctx context.Context ,
40
- endpoints []* endpoint.Endpoint ,
53
+ tasks []changeTask ,
54
+ zones []stackitdnsclient.Zone ,
41
55
) error {
42
- if len (endpoints ) == 0 {
43
- return nil
56
+ workerChannel := make (chan changeTask , len (tasks ))
57
+ errorChannel := make (chan error , len (tasks ))
58
+
59
+ var wg sync.WaitGroup
60
+ for i := 0 ; i < d .workers ; i ++ {
61
+ wg .Add (1 )
62
+ go d .changeWorker (ctx , workerChannel , errorChannel , zones , & wg )
44
63
}
45
64
46
- zones , err := d .zoneFetcherClient .zones (ctx )
47
- if err != nil {
48
- return err
65
+ for _ , task := range tasks {
66
+ workerChannel <- task
67
+ }
68
+ close (workerChannel )
69
+
70
+ // capture first error
71
+ var err error
72
+ for i := 0 ; i < len (tasks ); i ++ {
73
+ err = <- errorChannel
74
+ if err != nil {
75
+ break
76
+ }
77
+ }
78
+
79
+ // wait until all workers have finished
80
+ wg .Wait ()
81
+
82
+ return err
83
+ }
84
+
85
+ // changeWorker is a worker that handles changes passed by a channel.
86
+ func (d * StackitDNSProvider ) changeWorker (
87
+ ctx context.Context ,
88
+ changes chan changeTask ,
89
+ errorChannel chan error ,
90
+ zones []stackitdnsclient.Zone ,
91
+ wg * sync.WaitGroup ,
92
+ ) {
93
+ defer wg .Done ()
94
+
95
+ for change := range changes {
96
+ var err error
97
+ switch change .action {
98
+ case CREATE :
99
+ err = d .createRRSet (ctx , change .change , zones )
100
+ case UPDATE :
101
+ err = d .updateRRSet (ctx , change .change , zones )
102
+ case DELETE :
103
+ err = d .deleteRRSet (ctx , change .change , zones )
104
+ }
105
+ errorChannel <- err
49
106
}
50
107
51
- return d . handleRRSetWithWorkers ( ctx , endpoints , zones , CREATE )
108
+ d . logger . Debug ( "change worker finished" )
52
109
}
53
110
54
111
// createRRSet creates a new record set in the stackitprovider for the given endpoint.
@@ -88,24 +145,6 @@ func (d *StackitDNSProvider) createRRSet(
88
145
return nil
89
146
}
90
147
91
- // updateRRSets patches (overrides) contents in the record sets in the stackitprovider for the given
92
- // endpoints that are in the update new field.
93
- func (d * StackitDNSProvider ) updateRRSets (
94
- ctx context.Context ,
95
- endpoints []* endpoint.Endpoint ,
96
- ) error {
97
- if len (endpoints ) == 0 {
98
- return nil
99
- }
100
-
101
- zones , err := d .zoneFetcherClient .zones (ctx )
102
- if err != nil {
103
- return err
104
- }
105
-
106
- return d .handleRRSetWithWorkers (ctx , endpoints , zones , UPDATE )
107
- }
108
-
109
148
// updateRRSet patches (overrides) contents in the record set in the stackitprovider.
110
149
func (d * StackitDNSProvider ) updateRRSet (
111
150
ctx context.Context ,
@@ -142,28 +181,6 @@ func (d *StackitDNSProvider) updateRRSet(
142
181
return nil
143
182
}
144
183
145
- // deleteRRSets deletes record sets in the stackitprovider for the given endpoints that are in the
146
- // deletion field.
147
- func (d * StackitDNSProvider ) deleteRRSets (
148
- ctx context.Context ,
149
- endpoints []* endpoint.Endpoint ,
150
- ) error {
151
- if len (endpoints ) == 0 {
152
- d .logger .Debug ("no endpoints to delete" )
153
-
154
- return nil
155
- }
156
-
157
- d .logger .Info ("records to delete" , zap .String ("records" , fmt .Sprintf ("%v" , endpoints )))
158
-
159
- zones , err := d .zoneFetcherClient .zones (ctx )
160
- if err != nil {
161
- return err
162
- }
163
-
164
- return d .handleRRSetWithWorkers (ctx , endpoints , zones , DELETE )
165
- }
166
-
167
184
// deleteRRSet deletes a record set in the stackitprovider for the given endpoint.
168
185
func (d * StackitDNSProvider ) deleteRRSet (
169
186
ctx context.Context ,
@@ -197,62 +214,3 @@ func (d *StackitDNSProvider) deleteRRSet(
197
214
198
215
return nil
199
216
}
200
-
201
- // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
202
- func (d * StackitDNSProvider ) handleRRSetWithWorkers (
203
- ctx context.Context ,
204
- endpoints []* endpoint.Endpoint ,
205
- zones []stackitdnsclient.Zone ,
206
- action string ,
207
- ) error {
208
- workerChannel := make (chan changeTask , len (endpoints ))
209
- errorChannel := make (chan error , len (endpoints ))
210
-
211
- for i := 0 ; i < d .workers ; i ++ {
212
- go d .changeWorker (ctx , workerChannel , errorChannel , zones )
213
- }
214
-
215
- for _ , change := range endpoints {
216
- workerChannel <- changeTask {
217
- action : action ,
218
- change : change ,
219
- }
220
- }
221
-
222
- for i := 0 ; i < len (endpoints ); i ++ {
223
- err := <- errorChannel
224
- if err != nil {
225
- close (workerChannel )
226
-
227
- return err
228
- }
229
- }
230
-
231
- close (workerChannel )
232
-
233
- return nil
234
- }
235
-
236
- // changeWorker is a worker that handles changes passed by a channel.
237
- func (d * StackitDNSProvider ) changeWorker (
238
- ctx context.Context ,
239
- changes chan changeTask ,
240
- errorChannel chan error ,
241
- zones []stackitdnsclient.Zone ,
242
- ) {
243
- for change := range changes {
244
- switch change .action {
245
- case CREATE :
246
- err := d .createRRSet (ctx , change .change , zones )
247
- errorChannel <- err
248
- case UPDATE :
249
- err := d .updateRRSet (ctx , change .change , zones )
250
- errorChannel <- err
251
- case DELETE :
252
- err := d .deleteRRSet (ctx , change .change , zones )
253
- errorChannel <- err
254
- }
255
- }
256
-
257
- d .logger .Debug ("change worker finished" )
258
- }
0 commit comments