Skip to content

Commit e99d419

Browse files
Michal Tichákteo
Michal Tichák
authored andcommitted
use two channel to communicate mesos REVIVE
1 parent d7ccd5b commit e99d419

File tree

3 files changed

+35
-41
lines changed

3 files changed

+35
-41
lines changed

core/task/manager.go

+15-19
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ type Manager struct {
9191

9292
tasksToDeploy chan<- *ResourceOffersDeploymentRequest
9393

94-
reviveOffersTrg chan struct{}
95-
cq *controlcommands.CommandQueue
94+
reviveOffersTrg chan struct{}
95+
reviveOffersDone chan struct{}
96+
cq *controlcommands.CommandQueue
9697

9798
tasksLaunched int
9899
tasksFinished int
@@ -141,6 +142,7 @@ func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *M
141142
taskman.cq = taskman.schedulerState.commandqueue
142143
taskman.tasksToDeploy = taskman.schedulerState.tasksToDeploy
143144
taskman.reviveOffersTrg = taskman.schedulerState.reviveOffersTrg
145+
taskman.reviveOffersDone = taskman.schedulerState.reviveOffersDone
144146
taskman.ackKilledTasks = newAcks()
145147

146148
schedState.setupCli()
@@ -156,7 +158,8 @@ func (m *Manager) newTaskForMesosOffer(
156158
offer *mesos.Offer,
157159
descriptor *Descriptor,
158160
localBindMap channel.BindMap,
159-
executorId mesos.ExecutorID) (t *Task) {
161+
executorId mesos.ExecutorID,
162+
) (t *Task) {
160163
newId := uid.New().String()
161164
t = &Task{
162165
name: fmt.Sprintf("%s#%s", descriptor.TaskClassName, newId),
@@ -197,8 +200,8 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
197200
if err != nil {
198201
return
199202
}
200-
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] //get IRepo pointer from RepoManager
201-
if repo == nil { //should never end up here
203+
repo := repoManager.GetAllRepos()[tempRepo.GetIdentifier()] // get IRepo pointer from RepoManager
204+
if repo == nil { // should never end up here
202205
return nil, errors.New("getTaskClassList: repo not found for " + taskClass)
203206
}
204207

@@ -223,7 +226,6 @@ func getTaskClassList(taskClassesRequired []string) (taskClassList []*taskclass.
223226
taskInfo := strings.Split(taskPath, "/tasks/")
224227
if len(taskInfo) == 1 {
225228
taskFilename = taskInfo[0]
226-
227229
} else {
228230
taskFilename = taskInfo[1]
229231
}
@@ -280,7 +282,7 @@ func (m *Manager) removeInactiveClasses() {
280282
return
281283
}
282284

283-
func (m *Manager) RemoveReposClasses(repoPath string) { //Currently unused
285+
func (m *Manager) RemoveReposClasses(repoPath string) { // Currently unused
284286
utils.EnsureTrailingSlash(&repoPath)
285287

286288
_ = m.classes.Do(func(classMap *map[string]*taskclass.Class) error {
@@ -327,7 +329,6 @@ func (m *Manager) RefreshClasses(taskClassesRequired []string) (err error) {
327329
}
328330

329331
func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err error) {
330-
331332
/*
332333
Here's what's gonna happen:
333334
1) check if any tasks are already in Roster, whether they are already locked
@@ -516,7 +517,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
516517
timeReviveOffers := time.Now()
517518
timeDeployMu := time.Now()
518519
m.reviveOffersTrg <- struct{}{} // signal scheduler to revive offers
519-
<-m.reviveOffersTrg // we only continue when it's done
520+
<-m.reviveOffersDone // we only continue when it's done
520521
utils.TimeTrack(timeReviveOffers, "acquireTasks: revive offers",
521522
log.WithField("tasksToRun", len(tasksToRun)).
522523
WithField("partition", envId))
@@ -597,7 +598,7 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
597598
// can't lock some of them, so we must roll back and keep them
598599
// unlocked in the roster.
599600
var deployedTaskIds []string
600-
for taskPtr, _ := range deployedTasks {
601+
for taskPtr := range deployedTasks {
601602
taskPtr.SetParent(nil)
602603
deployedTaskIds = append(deployedTaskIds, taskPtr.taskId)
603604
}
@@ -612,11 +613,11 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
612613
}
613614

614615
// Finally, we write to the roster. Point of no return!
615-
for taskPtr, _ := range deployedTasks {
616+
for taskPtr := range deployedTasks {
616617
m.roster.append(taskPtr)
617618
}
618619
if deploymentSuccess {
619-
for taskPtr, _ := range deployedTasks {
620+
for taskPtr := range deployedTasks {
620621
taskPtr.GetParent().SetTask(taskPtr)
621622
}
622623
for taskPtr, descriptor := range tasksAlreadyRunning {
@@ -629,7 +630,6 @@ func (m *Manager) acquireTasks(envId uid.ID, taskDescriptors Descriptors) (err e
629630
}
630631

631632
func (m *Manager) releaseTasks(envId uid.ID, tasks Tasks) error {
632-
633633
taskReleaseErrors := make(map[string]error)
634634
taskIdsReleased := make([]string, 0)
635635

@@ -686,7 +686,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
686686
taskPath := task.GetParentRolePath()
687687
for inbChName, endpoint := range task.GetLocalBindMap() {
688688
var bindMapKey string
689-
if strings.HasPrefix(inbChName, "::") { //global channel alias
689+
if strings.HasPrefix(inbChName, "::") { // global channel alias
690690
bindMapKey = inbChName
691691

692692
// deduplication
@@ -785,7 +785,6 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
785785
func (m *Manager) transitionTasks(envId uid.ID, tasks Tasks, src string, event string, dest string, commonArgs controlcommands.PropertyMap) error {
786786
notify := make(chan controlcommands.MesosCommandResponse)
787787
receivers, err := tasks.GetMesosCommandTargets()
788-
789788
if err != nil {
790789
return err
791790
}
@@ -870,7 +869,6 @@ func (m *Manager) TriggerHooks(envId uid.ID, tasks Tasks) error {
870869

871870
notify := make(chan controlcommands.MesosCommandResponse)
872871
receivers, err := tasks.GetMesosCommandTargets()
873-
874872
if err != nil {
875873
return err
876874
}
@@ -935,7 +933,6 @@ func (m *Manager) GetTask(id string) *Task {
935933
}
936934

937935
func (m *Manager) updateTaskState(taskId string, state string) {
938-
939936
taskPtr := m.roster.getByTaskId(taskId)
940937
if taskPtr == nil {
941938
log.WithField("taskId", taskId).
@@ -989,7 +986,7 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
989986
}
990987
if ack, ok := m.ackKilledTasks.getValue(taskId); ok {
991988
ack <- struct{}{}
992-
//close(ack) // It can even be left open?
989+
// close(ack) // It can even be left open?
993990
}
994991

995992
return
@@ -1030,7 +1027,6 @@ func (m *Manager) updateTaskStatus(status *mesos.TaskStatus) {
10301027

10311028
// Kill all tasks outside an environment (all unlocked tasks)
10321029
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
1033-
10341030
toKill := m.roster.filtered(func(t *Task) bool {
10351031
return !t.IsLocked()
10361032
})

core/task/scheduler.go

+15-17
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ var schedEventsCh = make(chan scheduler.Event_Type)
8484

8585
func runSchedulerController(ctx context.Context,
8686
state *schedulerState,
87-
fidStore store.Singleton) error {
87+
fidStore store.Singleton,
88+
) error {
8889
// Set up communication from controller to state machine.
8990
go func() {
9091
for {
@@ -103,7 +104,7 @@ func runSchedulerController(ctx context.Context,
103104
for {
104105
<-state.reviveOffersTrg
105106
doReviveOffers(ctx, state)
106-
state.reviveOffersTrg <- struct{}{}
107+
state.reviveOffersDone <- struct{}{}
107108
}
108109
}()
109110

@@ -272,7 +273,6 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
272273
// only one entry in the list, we signal back to commandqueue
273274
// otherwise, we log and ignore.
274275
return func(ctx context.Context, e *scheduler.Event) (err error) {
275-
276276
mesosMessage := e.GetMessage()
277277
if mesosMessage == nil {
278278
err = errors.New("message handler got bad MESSAGE")
@@ -336,7 +336,7 @@ func (state *schedulerState) incomingMessageHandler() events.HandlerFunc {
336336
return
337337
}
338338
state.taskman.internalEventCh <- ev
339-
//state.handleDeviceEvent(ev)
339+
// state.handleDeviceEvent(ev)
340340
} else {
341341
log.WithFields(logrus.Fields{
342342
"type": incomingEvent.Type.String(),
@@ -437,7 +437,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
437437
timeResourceOffersCall := time.Now()
438438
var (
439439
offers = e.GetOffers().GetOffers()
440-
callOption = calls.RefuseSeconds(time.Second) //calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
440+
callOption = calls.RefuseSeconds(time.Second) // calls.RefuseSecondsWithJitter(state.random, state.config.maxRefuseSeconds)
441441
tasksLaunchedThisCycle = 0
442442
offersDeclined = 0
443443
)
@@ -613,7 +613,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
613613
var offerWaitGroup sync.WaitGroup
614614
offerWaitGroup.Add(len(offers))
615615

616-
for offerIndex, _ := range offers {
616+
for offerIndex := range offers {
617617
go func(offerIndex int) {
618618
defer offerWaitGroup.Done()
619619

@@ -1013,7 +1013,6 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
10131013
WithField("descriptorsStillToDeploy", len(descriptorsStillToDeploy)).
10141014
WithField("offers", len(offers)).
10151015
WithField("offerHost", offer.Hostname))
1016-
10171016
}(offerIndex) // end for offer closure
10181017
} // end for _, offer := range offers
10191018
offerWaitGroup.Wait()
@@ -1094,7 +1093,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
10941093
machinesUsedSlice := func(machines map[string]struct{}) []string { // StringSet to StringSlice
10951094
out := make([]string, len(machines))
10961095
i := 0
1097-
for k, _ := range machines {
1096+
for k := range machines {
10981097
out[i] = k
10991098
i++
11001099
}
@@ -1184,6 +1183,7 @@ func (state *schedulerState) statusUpdate() events.HandlerFunc {
11841183
// have set through ACCEPT or DECLINE calls, in the hope that Mesos then sends us new resource offers.
11851184
// This should generally run when we have received a TASK_FINISHED for some tasks, and we have more
11861185
// tasks to run.
1186+
11871187
func (state *schedulerState) tryReviveOffers(ctx context.Context) {
11881188
// limit the rate at which we request offer revival
11891189
select {
@@ -1274,7 +1274,7 @@ func logAllEvents() eventrules.Rule {
12741274
}
12751275
}
12761276
offerIds := make([]string, len(off))
1277-
for i, _ := range off {
1277+
for i := range off {
12781278
offerIds[i] = off[i].GetID().Value
12791279
}
12801280
fields["offerIds"] = strings.Join(offerIds, ",")
@@ -1335,7 +1335,6 @@ func makeTaskForMesosResources(
13351335
descriptorDetector string,
13361336
offerIDsToDecline map[mesos.OfferID]struct{},
13371337
) (*Task, *mesos.TaskInfo) {
1338-
13391338
bindMap := make(channel.BindMap)
13401339
for _, ch := range wants.InboundChannels {
13411340
if ch.Addressing == channel.IPC {
@@ -1368,7 +1367,7 @@ func makeTaskForMesosResources(
13681367
Attributes: offer.Attributes,
13691368
Hostname: offer.Hostname,
13701369
}
1371-
state.taskman.AgentCache.Update(agentForCache) //thread safe
1370+
state.taskman.AgentCache.Update(agentForCache) // thread safe
13721371
machinesUsed[offer.Hostname] = struct{}{}
13731372

13741373
taskPtr := state.taskman.newTaskForMesosOffer(offer, descriptor, bindMap, targetExecutorId)
@@ -1566,12 +1565,11 @@ func makeTaskForMesosResources(
15661565
ldLibPath, ok := agentForCache.Attributes.Get("executor_env_LD_LIBRARY_PATH")
15671566
mesosTaskInfo.Executor.Command.Environment = &mesos.Environment{}
15681567
if ok {
1569-
mesosTaskInfo.Executor.Command.Environment.Variables =
1570-
append(mesosTaskInfo.Executor.Command.Environment.Variables,
1571-
mesos.Environment_Variable{
1572-
Name: "LD_LIBRARY_PATH",
1573-
Value: proto.String(ldLibPath),
1574-
})
1568+
mesosTaskInfo.Executor.Command.Environment.Variables = append(mesosTaskInfo.Executor.Command.Environment.Variables,
1569+
mesos.Environment_Variable{
1570+
Name: "LD_LIBRARY_PATH",
1571+
Value: proto.String(ldLibPath),
1572+
})
15751573
}
15761574

15771575
return taskPtr, &mesosTaskInfo

core/task/schedulerstate.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ type schedulerState struct {
6969
reviveTokens <-chan struct{}
7070
tasksToDeploy chan *ResourceOffersDeploymentRequest
7171

72-
reviveOffersTrg chan struct{}
73-
random *rand.Rand
72+
reviveOffersTrg chan struct{}
73+
reviveOffersDone chan struct{}
74+
random *rand.Rand
7475

7576
// shouldn't change at runtime, so thread safe:
7677
role string
@@ -106,8 +107,6 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
106107

107108
tasksToDeploy := make(chan *ResourceOffersDeploymentRequest, MAX_CONCURRENT_DEPLOY_REQUESTS)
108109

109-
reviveOffersTrg := make(chan struct{})
110-
111110
state := &schedulerState{
112111
taskman: taskman,
113112
fidStore: fidStore,
@@ -117,7 +116,8 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
117116
viper.GetDuration("mesosReviveWait"),
118117
nil),
119118
tasksToDeploy: tasksToDeploy,
120-
reviveOffersTrg: reviveOffersTrg,
119+
reviveOffersTrg: make(chan struct{}),
120+
reviveOffersDone: make(chan struct{}),
121121
wantsTaskResources: mesos.Resources{},
122122
executor: executorInfo,
123123
metricsAPI: metricsAPI,

0 commit comments

Comments
 (0)