1
1
package org .logstash .beats ;
2
2
3
+ import org .apache .logging .log4j .Level ;
4
+ import org .apache .logging .log4j .LogManager ;
5
+ import org .apache .logging .log4j .Logger ;
6
+
3
7
import io .netty .channel .ChannelHandlerContext ;
4
8
import io .netty .channel .SimpleChannelInboundHandler ;
5
9
import io .netty .util .AttributeKey ;
10
14
import javax .net .ssl .SSLHandshakeException ;
11
15
12
16
public class BeatsHandler extends SimpleChannelInboundHandler <Batch > {
13
- private final static Logger logger = LogManager .getLogger (BeatsHandler .class );
17
+
18
+ private final static Logger logger = LogManager .getLogger ();
14
19
private final IMessageListener messageListener ;
15
20
private ChannelHandlerContext context ;
16
21
@@ -22,33 +27,25 @@ public BeatsHandler(IMessageListener listener) {
22
27
@ Override
23
28
public void channelActive (final ChannelHandlerContext ctx ) throws Exception {
24
29
context = ctx ;
25
- if (logger .isTraceEnabled ()){
26
- logger .trace (format ("Channel Active" ));
27
- }
30
+ logger .trace ("{}" , () -> format ("Channel Active" ));
28
31
super .channelActive (ctx );
29
32
messageListener .onNewConnection (ctx );
30
33
}
31
34
32
35
@ Override
33
36
public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
34
37
super .channelInactive (ctx );
35
- if (logger .isTraceEnabled ()){
36
- logger .trace (format ("Channel Inactive" ));
37
- }
38
+ logger .trace ("{}" , () -> format ("Channel Inactive" ));
38
39
messageListener .onConnectionClose (ctx );
39
40
}
40
41
41
42
42
43
@ Override
43
44
public void channelRead0 (ChannelHandlerContext ctx , Batch batch ) throws Exception {
44
- if (logger .isDebugEnabled ()) {
45
- logger .debug (format ("Received a new payload" ));
46
- }
45
+ logger .debug ("{}" , () -> format ("Received a new payload" ));
47
46
try {
48
47
for (Message message : batch ) {
49
- if (logger .isDebugEnabled ()) {
50
- logger .debug (format ("Sending a new message for the listener, sequence: " + message .getSequence ()));
51
- }
48
+ logger .debug ("{}" , () -> format ("Sending a new message for the listener, sequence: " + message .getSequence ()));
52
49
messageListener .onNewMessage (ctx , message );
53
50
54
51
if (needAck (message )) {
@@ -58,9 +55,9 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio
58
55
}finally {
59
56
//this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive
60
57
ctx .channel ().attr (ConnectionHandler .CHANNEL_SEND_KEEP_ALIVE ).get ().set (false );
61
- if ( logger .isDebugEnabled ()) {
62
- logger . debug ( "{}: batches pending: {}" , ctx .channel ().id ().asShortText (),ctx . channel (). attr ( ConnectionHandler . CHANNEL_SEND_KEEP_ALIVE ). get (). get ());
63
- }
58
+ logger .debug ( "{}: batches pending: {}" ,
59
+ () -> ctx .channel ().id ().asShortText (),
60
+ () -> ctx . channel (). attr ( ConnectionHandler . CHANNEL_SEND_KEEP_ALIVE ). get (). get ());
64
61
batch .release ();
65
62
ctx .flush ();
66
63
}
@@ -83,11 +80,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
83
80
}
84
81
String causeMessage = cause .getMessage () == null ? cause .getClass ().toString () : cause .getMessage ();
85
82
86
- if (logger .isDebugEnabled ()){
87
- logger .debug (format ("Handling exception: " + causeMessage ), cause );
88
- }
89
- logger .info (format ("Handling exception: " + causeMessage ));
90
- } finally {
83
+ logger .info ("{}" , () -> format ("Handling exception: " + causeMessage ));
84
+ logger .catching (Level .DEBUG , cause );
85
+ } finally {
91
86
super .exceptionCaught (ctx , cause );
92
87
ctx .flush ();
93
88
ctx .close ();
@@ -99,9 +94,7 @@ private boolean needAck(Message message) {
99
94
}
100
95
101
96
private void ack (ChannelHandlerContext ctx , Message message ) {
102
- if (logger .isTraceEnabled ()){
103
- logger .trace (format ("Acking message number " + message .getSequence ()));
104
- }
97
+ logger .trace ("{}" , () -> format ("Acking message number " + message .getSequence ()));
105
98
writeAck (ctx , message .getBatch ().getProtocol (), message .getSequence ());
106
99
}
107
100
0 commit comments