@@ -61,14 +61,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
61
61
log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
62
62
resource .getMetadata ().getName ());
63
63
CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
64
- scheduleEvent (event );
64
+ scheduleEventFromApi (event );
65
65
}
66
66
67
- void scheduleEvent (CustomResourceEvent event ) {
68
- log .trace ("Current queue size {}" , executor .getQueue ().size ());
69
- log .debug ("Scheduling event: {}" , event );
67
+ void scheduleEventFromApi (CustomResourceEvent event ) {
70
68
try {
71
69
lock .lock ();
70
+ log .debug ("Scheduling event from Api: {}" , event );
72
71
if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
73
72
// Note that we always use finalizers, we want to process delete event just in corner case,
74
73
// when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -89,30 +88,39 @@ void scheduleEvent(CustomResourceEvent event) {
89
88
eventStore .addOrReplaceEventAsNotScheduled (event );
90
89
return ;
91
90
}
91
+ scheduleEventForExecution (event );
92
+ log .trace ("Scheduling event from API finished: {}" , event );
93
+ } finally {
94
+ lock .unlock ();
95
+ }
96
+ }
92
97
98
+ private void scheduleEventForExecution (CustomResourceEvent event ) {
99
+ try {
100
+ lock .lock ();
101
+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
102
+ log .debug ("Scheduling event for execution: {}" , event );
93
103
Optional <Long > nextBackOff = event .nextBackOff ();
94
104
if (!nextBackOff .isPresent ()) {
95
105
log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
96
106
return ;
97
107
}
98
- log .debug ("Creating scheduled task for event: {}" , event );
99
108
eventStore .addEventUnderProcessing (event );
100
109
executor .schedule (new EventConsumer (event , eventDispatcher , this ),
101
110
nextBackOff .get (), TimeUnit .MILLISECONDS );
111
+ log .trace ("Scheduled task for event: {}" , event );
102
112
} finally {
103
- log .debug ("Scheduling event finished: {}" , event );
104
113
lock .unlock ();
105
114
}
106
115
}
107
116
108
117
void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
109
118
try {
110
119
lock .lock ();
111
- log .debug ("Event processing successful for event: {}" , event );
112
120
eventStore .removeEventUnderProcessing (event .resourceUid ());
113
121
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
114
- log .debug ("Scheduling recent event for processing processing : {}" , event );
115
- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
122
+ log .debug ("Scheduling recent event for processing: {}" , event );
123
+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
116
124
}
117
125
} finally {
118
126
lock .unlock ();
@@ -124,19 +132,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
124
132
lock .lock ();
125
133
eventStore .removeEventUnderProcessing (event .resourceUid ());
126
134
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
127
- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
128
- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
129
- " Most recent event: {}" , event , notScheduledEvent );
130
- scheduleEvent (notScheduledEvent );
135
+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
136
+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
131
137
} else {
132
138
log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
133
- scheduleEvent (event );
139
+ scheduleEventForExecution (event );
134
140
}
135
141
} finally {
136
142
lock .unlock ();
137
143
}
138
144
}
139
145
146
+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
147
+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
148
+ scheduleEventForExecution (notScheduledEvent );
149
+ }
150
+
140
151
@ Override
141
152
public void onClose (KubernetesClientException e ) {
142
153
log .error ("Error: " , e );
0 commit comments