|
9 | 9 | import org.apache.hc.client5.http.classic.methods.HttpGet; |
10 | 10 | import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; |
11 | 11 | import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; |
| 12 | +import org.apache.hc.core5.io.CloseMode; |
12 | 13 | import org.slf4j.Logger; |
13 | 14 | import org.slf4j.LoggerFactory; |
14 | 15 |
|
@@ -115,19 +116,18 @@ public boolean isOpen() { |
115 | 116 | } |
116 | 117 |
|
117 | 118 | public void close() { |
| 119 | + _log.debug("closing SSE client"); |
118 | 120 | try { |
119 | 121 | lock.lock(); |
120 | 122 | _forcedStop.set(true); |
121 | 123 | if (_state.compareAndSet(ConnectionState.OPEN, ConnectionState.CLOSED)) { |
122 | 124 | if (_ongoingResponse.get() != null) { |
123 | | - try { |
124 | | - _ongoingRequest.get().abort(); |
125 | | - _ongoingResponse.get().close(); |
126 | | - } catch (IOException e) { |
127 | | - _log.debug(String.format("SSEClient close forced: %s", e.getMessage())); |
128 | | - } |
| 125 | + _ongoingRequest.get().abort(); |
| 126 | + _ongoingResponse.get().close(CloseMode.IMMEDIATE); |
129 | 127 | } |
130 | 128 | } |
| 129 | + } catch (Exception e) { |
| 130 | + _log.debug("Exception in closing SSE client: " + e.getMessage()); |
131 | 131 | } finally { |
132 | 132 | lock.unlock(); |
133 | 133 | } |
@@ -184,19 +184,15 @@ private void connectAndLoop(URI uri, CountDownLatch signal) { |
184 | 184 | } |
185 | 185 | } |
186 | 186 | } catch (Exception e) { // Any other error non related to the connection disables streaming altogether |
| 187 | + _log.debug(String.format("SSE connection exception: %s", e.getMessage())); |
187 | 188 | _telemetryRuntimeProducer |
188 | 189 | .recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), |
189 | 190 | StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), |
190 | 191 | System.currentTimeMillis())); |
191 | 192 | _log.warn(e.getMessage(), e); |
192 | 193 | _statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR); |
193 | 194 | } finally { |
194 | | - try { |
195 | | - _ongoingResponse.get().close(); |
196 | | - } catch (IOException e) { |
197 | | - _log.debug(e.getMessage()); |
198 | | - } |
199 | | - |
| 195 | + _ongoingResponse.get().close(CloseMode.IMMEDIATE); |
200 | 196 | _state.set(ConnectionState.CLOSED); |
201 | 197 | _log.debug("SSEClient finished."); |
202 | 198 | _forcedStop.set(false); |
|
0 commit comments