@@ -55,14 +55,13 @@ public void eventReceived(Watcher.Action action, CustomResource resource) {
55
55
log .debug ("Event received for action: {}, {}: {}" , action .toString ().toLowerCase (), resource .getClass ().getSimpleName (),
56
56
resource .getMetadata ().getName ());
57
57
CustomResourceEvent event = new CustomResourceEvent (action , resource , retry );
58
- scheduleEvent (event );
58
+ scheduleEventFromApi (event );
59
59
}
60
60
61
- void scheduleEvent (CustomResourceEvent event ) {
62
- log .trace ("Current queue size {}" , executor .getQueue ().size ());
63
- log .debug ("Scheduling event: {}" , event );
61
+ void scheduleEventFromApi (CustomResourceEvent event ) {
64
62
try {
65
63
lock .lock ();
64
+ log .debug ("Scheduling event from Api: {}" , event );
66
65
if (event .getResource ().getMetadata ().getDeletionTimestamp () != null && event .getAction () == Action .DELETED ) {
67
66
// Note that we always use finalizers, we want to process delete event just in corner case,
68
67
// when we are not able to add finalizer (lets say because of optimistic locking error, and the resource was deleted instantly).
@@ -83,30 +82,39 @@ void scheduleEvent(CustomResourceEvent event) {
83
82
eventStore .addOrReplaceEventAsNotScheduled (event );
84
83
return ;
85
84
}
85
+ scheduleEventForExecution (event );
86
+ log .trace ("Scheduling event from API finished: {}" , event );
87
+ } finally {
88
+ lock .unlock ();
89
+ }
90
+ }
86
91
92
+ private void scheduleEventForExecution (CustomResourceEvent event ) {
93
+ try {
94
+ lock .lock ();
95
+ log .trace ("Current queue size {}" , executor .getQueue ().size ());
96
+ log .debug ("Scheduling event for execution: {}" , event );
87
97
Optional <Long > nextBackOff = event .nextBackOff ();
88
98
if (!nextBackOff .isPresent ()) {
89
99
log .warn ("Event max retry limit reached. Will be discarded. {}" , event );
90
100
return ;
91
101
}
92
- log .debug ("Creating scheduled task for event: {}" , event );
93
102
eventStore .addEventUnderProcessing (event );
94
103
executor .schedule (new EventConsumer (event , eventDispatcher , this ),
95
104
nextBackOff .get (), TimeUnit .MILLISECONDS );
105
+ log .trace ("Scheduled task for event: {}" , event );
96
106
} finally {
97
- log .debug ("Scheduling event finished: {}" , event );
98
107
lock .unlock ();
99
108
}
100
109
}
101
110
102
111
void eventProcessingFinishedSuccessfully (CustomResourceEvent event ) {
103
112
try {
104
113
lock .lock ();
105
- log .debug ("Event processing successful for event: {}" , event );
106
114
eventStore .removeEventUnderProcessing (event .resourceUid ());
107
115
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
108
- log .debug ("Scheduling recent event for processing processing : {}" , event );
109
- scheduleEvent ( eventStore . removeEventNotScheduled ( event .resourceUid () ));
116
+ log .debug ("Scheduling recent event for processing: {}" , event );
117
+ scheduleNotYetScheduledEventForExecution ( event .resourceUid ());
110
118
}
111
119
} finally {
112
120
lock .unlock ();
@@ -118,19 +126,22 @@ void eventProcessingFailed(CustomResourceEvent event) {
118
126
lock .lock ();
119
127
eventStore .removeEventUnderProcessing (event .resourceUid ());
120
128
if (eventStore .containsNotScheduledEvent (event .resourceUid ())) {
121
- CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (event .resourceUid ());
122
- log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}," +
123
- " Most recent event: {}" , event , notScheduledEvent );
124
- scheduleEvent (notScheduledEvent );
129
+ log .debug ("Event processing failed. Scheduling the most recent event. Failed event: {}" , event );
130
+ scheduleNotYetScheduledEventForExecution (event .resourceUid ());
125
131
} else {
126
132
log .debug ("Event processing failed. Attempting to re-schedule the event: {}" , event );
127
- scheduleEvent (event );
133
+ scheduleEventForExecution (event );
128
134
}
129
135
} finally {
130
136
lock .unlock ();
131
137
}
132
138
}
133
139
140
+ private void scheduleNotYetScheduledEventForExecution (String uuid ) {
141
+ CustomResourceEvent notScheduledEvent = eventStore .removeEventNotScheduled (uuid );
142
+ scheduleEventForExecution (notScheduledEvent );
143
+ }
144
+
134
145
@ Override
135
146
public void onClose (KubernetesClientException e ) {
136
147
log .error ("Error: " , e );
0 commit comments