@@ -22,33 +22,34 @@ class KubernetesManager(Manager):
22
22
def __init__ (self , ** args ):
23
23
super ().__init__ (** args )
24
24
25
- self .thread_stop = threading .Event ()
26
-
27
- self .pod_watcher_thread = threading .Thread (
28
- target = self ._run_pod_watcher )
29
- self .job_watcher_thread = threading .Thread (
30
- target = self ._run_job_watcher )
31
- self .event_watcher_thread = threading .Thread (
32
- target = self ._run_event_watcher )
33
-
34
25
if os .environ .get ('KUBECONFIG' ):
35
26
k8s .config .load_kube_config ()
36
27
else :
37
28
k8s .config .load_incluster_config ()
38
29
39
- self .namespace = args .get ('namespace' , 'default' )
30
+ # the namespace in which to create the jobs
31
+ # and to watch for events
32
+ self .namespace = os .environ .get ('NAMESPACE' )
33
+ if self .namespace :
34
+ self .namespace = '' .join ([self .namespace , '-controller' ])
35
+ else :
36
+ self .namespace = 'villas-controller'
40
37
41
- self .my_namespace = os .environ .get ('NAMESPACE' )
38
+ self ._check_namespace (self .namespace )
39
+
40
+ # name and UID of the pod in which this controller is running
41
+ # used in kubernetes simulator to set the owner reference
42
42
self .my_pod_name = os .environ .get ('POD_NAME' )
43
43
self .my_pod_uid = os .environ .get ('POD_UID' )
44
44
45
- self ._check_namespace ( self . namespace )
45
+ self .thread_stop = threading . Event ( )
46
46
47
- # self.pod_watcher_thread.start()
48
- # self.job_watcher_thread.start( )
47
+ self .event_watcher_thread = threading . Thread (
48
+ target = self ._run_event_watcher )
49
49
self .event_watcher_thread .setDaemon (True )
50
50
self .event_watcher_thread .start ()
51
51
52
+
52
53
def _check_namespace (self , ns ):
53
54
c = k8s .client .CoreV1Api ()
54
55
@@ -59,28 +60,6 @@ def _check_namespace(self, ns):
59
60
60
61
raise RuntimeError (f'Namespace { ns } does not exist' )
61
62
62
- def _run_pod_watcher (self ):
63
- w = k8s .watch .Watch ()
64
- c = k8s .client .CoreV1Api ()
65
-
66
- for sts in w .stream (c .list_namespaced_pod ,
67
- namespace = self .namespace ):
68
- stso = sts .get ('object' )
69
- typ = sts .get ('type' )
70
-
71
- self .logger .info ('%s Pod: %s' , typ , stso .metadata .name )
72
-
73
- def _run_job_watcher (self ):
74
- w = k8s .watch .Watch ()
75
- b = k8s .client .BatchV1Api ()
76
-
77
- for sts in w .stream (b .list_namespaced_job ,
78
- namespace = self .namespace ):
79
- stso = sts .get ('object' )
80
- typ = sts .get ('type' )
81
-
82
- self .logger .info ('%s Job: %s' , typ , stso .metadata .name )
83
-
84
63
def _run_event_watcher (self ):
85
64
while not self .thread_stop .is_set ():
86
65
w = k8s .watch .Watch ()
@@ -107,6 +86,10 @@ def _run_event_watcher(self):
107
86
108
87
if _match (comp .job .metadata .name ,
109
88
eo .involved_object .name ):
89
+ if comp ._state == 'stopping' :
90
+ # incoming events are old repetitions
91
+ continue
92
+
110
93
if eo .reason == 'Completed' :
111
94
comp .change_state ('stopping' , True )
112
95
elif eo .reason == 'Started' :
0 commit comments