17
17
*/
18
18
package com .dtstack .jlogstash .outputs ;
19
19
20
+ import java .io .IOException ;
20
21
import java .net .InetAddress ;
21
22
import java .net .UnknownHostException ;
22
23
import java .util .List ;
23
24
import java .util .Map ;
25
+ import java .util .Set ;
26
+ import java .util .HashMap ;
24
27
import java .util .concurrent .ExecutorService ;
25
28
import java .util .concurrent .Executors ;
26
29
import java .util .concurrent .TimeUnit ;
27
30
import java .util .concurrent .atomic .AtomicBoolean ;
28
- //import java.util.concurrent.atomic.AtomicLong;
29
-
31
+ import java .util .Iterator ;
32
+ import java .util .HashSet ;
33
+ import java .util .Arrays ;
30
34
import org .apache .commons .lang3 .StringUtils ;
31
35
import org .elasticsearch .action .ActionFuture ;
32
36
import org .elasticsearch .action .ActionRequest ;
48
52
import org .elasticsearch .transport .client .PreBuiltTransportClient ;
49
53
import org .slf4j .Logger ;
50
54
import org .slf4j .LoggerFactory ;
55
+ import org .elasticsearch .ElasticsearchException ;
56
+ import org .elasticsearch .index .mapper .MapperException ;
51
57
52
58
import com .dtstack .jlogstash .annotation .Required ;
53
59
import com .dtstack .jlogstash .outputs .BaseOutput ;
@@ -81,6 +87,8 @@ public class Elasticsearch5 extends BaseOutput {
81
87
public static List <String > hosts ;
82
88
83
89
private static boolean sniff =true ;
90
+
91
+ private static Set <String > protectionKeySet ;
84
92
85
93
private static int bulkActions = 20000 ;
86
94
@@ -93,28 +101,23 @@ public class Elasticsearch5 extends BaseOutput {
93
101
private BulkProcessor bulkProcessor ;
94
102
95
103
private TransportClient esclient ;
96
-
97
- // private AtomicLong sendReqs = new AtomicLong(0);
98
- //
99
- // private AtomicLong ackReqs = new AtomicLong(0);
100
- //
101
- // private int maxLag = bulkActions;
102
-
103
- // private AtomicLong needDelayTime = new AtomicLong(0l);
104
-
104
+
105
+ private static String protectionKeys = "message" ;
106
+
105
107
private AtomicBoolean isClusterOn = new AtomicBoolean (true );
106
108
107
109
private ExecutorService executor ;
108
110
109
- @ SuppressWarnings ("rawtypes" )
110
111
public Elasticsearch5 (Map config ) {
111
112
super (config );
112
113
}
113
114
114
115
public void prepare () {
115
116
try {
117
+ protectionKeySet = new HashSet <>(Arrays .asList (protectionKeys .split ("," )));
116
118
executor = Executors .newSingleThreadExecutor ();
117
119
this .initESClient ();
120
+
118
121
} catch (Exception e ) {
119
122
logger .error (e .getMessage ());
120
123
System .exit (1 );
@@ -164,27 +167,28 @@ public void afterBulk(long arg0, BulkRequest arg1,
164
167
case TOO_MANY_REQUESTS :
165
168
if (totalFailed == 0 ) {
166
169
logger .error ("too many request {}:{}" ,item .getIndex (),item .getFailureMessage ());
167
- }
168
- addFailedMsg (requests .get (item .getItemId ()));
170
+ }
171
+ addFailedMsg ((( IndexRequest ) requests .get (item .getItemId ())). sourceAsMap ( ));
169
172
break ;
170
173
case SERVICE_UNAVAILABLE :
171
174
if (toberetry == 0 ) {
172
175
logger .error ("sevice unavaible cause {}:{}" ,item .getIndex (),item .getFailureMessage ());
173
176
}
174
- // toberetry++;
175
- addFailedMsg (requests .get (item .getItemId ()));
177
+ addFailedMsg (((IndexRequest ) requests .get (item .getItemId ())).sourceAsMap ());
176
178
break ;
177
179
default :
178
- if (totalFailed == 0 ) {
179
- logger .error ("data formate cause {}:{}:{}" ,item .getIndex (),((IndexRequest )requests .get (item .getItemId ())).sourceAsMap (),item .getFailureMessage ());
180
- }
180
+ Map <String , Object > sourceEvent = ((IndexRequest ) requests .get (item .getItemId ()))
181
+ .sourceAsMap ();
182
+ logger .warn ("bulk error,fail status={}, message={}, sourceEvent={}" ,
183
+ item .getFailure ().getStatus (), item .getFailure ().getMessage (), sourceEvent );
184
+ logger .error ("bulk error" , item .getFailure ().getCause ());
185
+ doError (sourceEvent , item .getFailure ().getCause ());
181
186
break ;
182
187
}
183
188
totalFailed ++;
184
189
}
185
190
}
186
191
187
- // addAckSeqs(requests.size());
188
192
189
193
if (totalFailed > 0 ) {
190
194
logger .info (totalFailed + " doc failed, "
@@ -193,13 +197,6 @@ public void afterBulk(long arg0, BulkRequest arg1,
193
197
logger .debug ("no failed docs" );
194
198
}
195
199
196
- // if (toberetry > 0) {
197
- // logger.info("sleep " + toberetry / 2
198
- // + "millseconds after bulk failure");
199
- // setDelayTime(toberetry / 2);
200
- // } else {
201
- // logger.debug("no docs need to retry");
202
- // }
203
200
204
201
}
205
202
@@ -212,8 +209,6 @@ public void afterBulk(long arg0, BulkRequest arg1,
212
209
addFailedMsg (request );
213
210
}
214
211
215
- // addAckSeqs(arg1.requests().size());
216
- // setDelayTime(1000);
217
212
}
218
213
219
214
@ Override
@@ -229,8 +224,7 @@ public void beforeBulk(long arg0, BulkRequest arg1) {
229
224
.setConcurrentRequests (concurrentRequests ).build ();
230
225
}
231
226
232
- @ SuppressWarnings ("rawtypes" )
233
- public void emit (Map event ) {
227
+ public void doEmit (Map event ) {
234
228
String _index = Formatter .format (event , index , indexTimezone );
235
229
String _indexType = Formatter .format (event , documentType , indexTimezone );
236
230
IndexRequest indexRequest ;
@@ -245,64 +239,120 @@ public void emit(Map event) {
245
239
}
246
240
}
247
241
this .bulkProcessor .add (indexRequest );
248
- checkNeedWait ();
249
242
}
250
-
243
+
244
+ public void emit (Map event ) {
245
+
246
+ logger .info ("event enter,event={}" , event );
247
+ try {
248
+ checkNeedWait ();
249
+ doEmit (event );
250
+ } catch (Exception e ) {
251
+ logger .warn ("emit error, event={}" , event );
252
+ logger .error ("emit error" , e );
253
+
254
+ doError (event , e );
255
+
256
+ }
257
+ }
258
+
259
+ public void doError (Map event , Throwable e ) {
260
+ if (!(e instanceof MapperException ) && (e instanceof ElasticsearchException || e instanceof IOException )) {
261
+ doErrorFirst (event );
262
+ } else {
263
+ doErrorCandidate (event );
264
+ }
265
+ }
266
+
267
+ public void doErrorFirst (Map <String , Object > event ) {
268
+
269
+ logger .error ("doErrorFirst event ={}" , event );
270
+
271
+ addFailedMsg (event );
272
+ }
273
+
274
+ public void doErrorCandidate (Map <String , Object > event ) {
275
+
276
+ boolean flag = false ;
277
+ for (Map .Entry <String , Object > entry : event .entrySet ()) {
278
+ if (!protectionKeySet .contains (entry .getKey ())) {
279
+ flag = true ;
280
+ break ;
281
+ }
282
+ }
283
+
284
+ if (flag == false ) {
285
+ logger .error ("size equal protectionKeySet, not save, event={}" , event );
286
+ return ;
287
+ }
288
+
289
+ Map <String , Object > newEvent = new HashMap ();
290
+ for (Iterator <String > s = protectionKeySet .iterator (); s .hasNext ();) {
291
+ String k = s .next ();
292
+ if (event .containsKey (k )) {
293
+ newEvent .put (k , event .get (k ));
294
+ }
295
+ }
296
+
297
+ addFailedMsg (newEvent );
298
+
299
+ logger .info ("doErrorCandidate end,newEvent={}" , newEvent );
300
+ }
301
+
251
302
@ Override
252
- public void sendFailedMsg (Object msg ){
253
-
254
- // if(needDelayTime.get() > 0){//不需要sleep影响性能
255
- // try {
256
- // Thread.sleep(needDelayTime.get());
257
- // } catch (InterruptedException e) {
258
- // logger.error("", e);
259
- // }
260
- // }
261
-
262
- this .bulkProcessor .add ((IndexRequest )msg );
263
- // needDelayTime.set(0);
264
- checkNeedWait ();
303
+ public void addFailedMsg (Object msg ) {
304
+ if (msg instanceof Map ) {
305
+ super .addFailedMsg (msg );
306
+ return ;
307
+ }
308
+
309
+ throw new IllegalArgumentException ("addFailedMsg only accept Map instance" );
265
310
}
266
-
267
-
311
+
312
+ @ Override
313
+ public void sendFailedMsg (Object msg ) {
314
+
315
+ try {
316
+
317
+ checkNeedWait ();
318
+
319
+ Map <String , Object > event = (Map ) msg ;
320
+
321
+ // 加入时间戳,用于计算耗时
322
+ // event.put(esCreatedTimeKey, Public.getTimeStamp(DateTimeZone.UTC));
323
+
324
+ logger .error ("sendFailedMsg,msg={}" , msg );
325
+ emit (event );
326
+
327
+ } catch (Exception e ) {
328
+ logger .error ("sendFailedMsg error" , e );
329
+
330
+ if (!(e instanceof MapperException ) && (e instanceof ElasticsearchException || e instanceof IOException )) {
331
+ addFailedMsg (msg );
332
+ }
333
+ }
334
+
335
+ }
336
+
337
+
268
338
@ Override
269
339
public void release (){
270
340
if (bulkProcessor !=null )bulkProcessor .close ();
271
341
}
272
-
273
- public void checkNeedWait (){
274
- while (!isClusterOn .get ()){//等待集群可用
275
- try {
276
- logger .warn ("wait cluster avaliable..." );
277
- Thread .sleep (1000 );//FIXME
278
- } catch (InterruptedException e ) {
279
- logger .error ("" , e );
280
- }
281
- }
282
- // sendReqs.incrementAndGet();
283
- // if(sendReqs.get() - ackReqs.get() < maxLag){
284
- // return;
285
- // }
286
- // while(sendReqs.get() - ackReqs.get() > maxLag){
287
- // try {
288
- // logger.warn("wait sendReqs less than ackReqs...");
289
- // Thread.sleep(1000);
290
- // } catch (InterruptedException e) {
291
- // logger.error("", e);
292
- // }
293
- // }
342
+
343
+ public void checkNeedWait () {
344
+ while (!isClusterOn .get ()) {// 等待集群可用
345
+ try {
346
+ logger .warn ("wait cluster avaliable..." );
347
+ Thread .sleep (3000 );
348
+ } catch (InterruptedException e ) {
349
+ logger .error ("" , e );
350
+ }
351
+ }
352
+
294
353
}
295
-
296
- // public void addAckSeqs(int num){
297
- // ackReqs.addAndGet(num);
298
- // }
299
-
300
- // public void setDelayTime(long delayTime){
301
- // if(delayTime > needDelayTime.get()){
302
- // needDelayTime.set(delayTime);
303
- // }
304
- // }
305
-
354
+
355
+
306
356
class ClusterMonitor implements Runnable {
307
357
308
358
private TransportClient transportClient ;
0 commit comments