@@ -62,7 +62,7 @@ type Scheduler struct {
62
62
63
63
// New creates a new data retrieval.
64
64
func New (cfg * dblabCfg.Config , docker * client.Client , pm * pool.Manager , tm * telemetry.Agent , runner runners.Runner ) * Retrieval {
65
- return & Retrieval {
65
+ r := & Retrieval {
66
66
cfg : & cfg .Retrieval ,
67
67
global : & cfg .Global ,
68
68
docker : docker ,
@@ -74,6 +74,20 @@ func New(cfg *dblabCfg.Config, docker *client.Client, pm *pool.Manager, tm *tele
74
74
Status : models .Inactive ,
75
75
},
76
76
}
77
+
78
+ for _ , jobName := range r .cfg .Jobs {
79
+ jobSpec , ok := r .cfg .JobsSpec [jobName ]
80
+ if ! ok {
81
+ continue
82
+ }
83
+
84
+ jobSpec .Name = jobName
85
+ r .jobSpecs [jobName ] = jobSpec
86
+ }
87
+
88
+ r .defineRetrievalMode ()
89
+
90
+ return r
77
91
}
78
92
79
93
// Reload reloads retrieval configuration.
@@ -100,7 +114,9 @@ func (r *Retrieval) Run(ctx context.Context) error {
100
114
runCtx , cancel := context .WithCancel (ctx )
101
115
r .ctxCancel = cancel
102
116
103
- fsManager , err := r .getPoolToDataRefresh ()
117
+ log .Msg ("Retrieval mode:" , r .State .Mode )
118
+
119
+ fsManager , err := r .getPoolToDataRetrieving ()
104
120
if err != nil {
105
121
var skipError * SkipRefreshingError
106
122
if errors .As (err , & skipError ) {
@@ -118,7 +134,7 @@ func (r *Retrieval) Run(ctx context.Context) error {
118
134
return fmt .Errorf ("failed to choose pool to refresh: %w" , err )
119
135
}
120
136
121
- log .Msg ("Pool to perform a full refresh : " , fsManager .Pool ().Name )
137
+ log .Msg ("Pool to perform data retrieving : " , fsManager .Pool ().Name )
122
138
123
139
if err := r .run (runCtx , fsManager ); err != nil {
124
140
r .tm .SendEvent (ctx , telemetry .AlertEvent , telemetry.Alert {Level : models .RefreshFailed , Message : err .Error ()})
@@ -130,7 +146,7 @@ func (r *Retrieval) Run(ctx context.Context) error {
130
146
return nil
131
147
}
132
148
133
- func (r * Retrieval ) getPoolToDataRefresh () (pool.FSManager , error ) {
149
+ func (r * Retrieval ) getPoolToDataRetrieving () (pool.FSManager , error ) {
134
150
firstPool := r .poolManager .First ()
135
151
if firstPool == nil {
136
152
return nil , errors .New ("no available pools" )
@@ -140,6 +156,12 @@ func (r *Retrieval) getPoolToDataRefresh() (pool.FSManager, error) {
140
156
return firstPool , nil
141
157
}
142
158
159
+ // For physical or unknown modes, changing the pool is possible only by the refresh timetable.
160
+ if r .State .Mode != models .Logical {
161
+ return firstPool , nil
162
+ }
163
+
164
+ // For logical mode try to find another pool to avoid rewriting prepared data.
143
165
elementToRefresh := r .poolManager .GetPoolToUpdate ()
144
166
145
167
if elementToRefresh == nil || elementToRefresh .Value == nil {
@@ -220,8 +242,6 @@ func (r *Retrieval) configure(fsm pool.FSManager) error {
220
242
return errors .Wrap (err , "invalid data retrieval configuration" )
221
243
}
222
244
223
- r .defineRetrievalMode ()
224
-
225
245
return nil
226
246
}
227
247
@@ -237,14 +257,11 @@ func (r *Retrieval) parseJobs(fsm pool.FSManager) error {
237
257
r .jobs = make ([]components.JobRunner , 0 , len (r .cfg .Jobs ))
238
258
239
259
for _ , jobName := range r .cfg .Jobs {
240
- jobSpec , ok := r .cfg . JobsSpec [jobName ]
260
+ jobSpec , ok := r .jobSpecs [jobName ]
241
261
if ! ok {
242
262
return errors .Errorf ("Job %q not found" , jobName )
243
263
}
244
264
245
- jobSpec .Name = jobName
246
- r .jobSpecs [jobName ] = jobSpec
247
-
248
265
jobCfg := config.JobConfig {
249
266
Spec : jobSpec ,
250
267
Docker : r .docker ,
@@ -315,11 +332,15 @@ func (r *Retrieval) hasPhysicalJob() bool {
315
332
func (r * Retrieval ) defineRetrievalMode () {
316
333
if r .hasPhysicalJob () {
317
334
r .State .Mode = models .Physical
335
+ return
318
336
}
319
337
320
338
if r .hasLogicalJob () {
321
339
r .State .Mode = models .Logical
340
+ return
322
341
}
342
+
343
+ r .State .Mode = models .Unknown
323
344
}
324
345
325
346
func (r * Retrieval ) prepareEnvironment (fsm pool.FSManager ) error {
0 commit comments