Skip to content

Commit 85560b6

Browse files
bug fixed
1 parent 3b99d17 commit 85560b6

File tree

1 file changed

+24
-24
lines changed

1 file changed

+24
-24
lines changed

elasticsearch/src/main/java/com/dtstack/jlogstash/outputs/Elasticsearch.java

+24-24
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.atomic.AtomicBoolean;
28-
import java.util.concurrent.atomic.AtomicLong;
28+
//import java.util.concurrent.atomic.AtomicLong;
2929

3030
import org.apache.commons.lang3.StringUtils;
3131
import org.elasticsearch.action.ActionFuture;
@@ -95,11 +95,11 @@ public class Elasticsearch extends BaseOutput {
9595

9696
private TransportClient esclient;
9797

98-
private AtomicLong sendReqs = new AtomicLong(0);
99-
100-
private AtomicLong ackReqs = new AtomicLong(0);
101-
102-
private int maxLag = bulkActions;
98+
// private AtomicLong sendReqs = new AtomicLong(0);
99+
//
100+
// private AtomicLong ackReqs = new AtomicLong(0);
101+
//
102+
// private int maxLag = bulkActions;
103103

104104
// private AtomicLong needDelayTime = new AtomicLong(0l);
105105

@@ -184,7 +184,7 @@ public void afterBulk(long arg0, BulkRequest arg1,
184184
}
185185
}
186186

187-
addAckSeqs(requests.size());
187+
// addAckSeqs(requests.size());
188188

189189
if (totalFailed > 0) {
190190
logger.info(totalFailed + " doc failed, "
@@ -213,7 +213,7 @@ public void afterBulk(long arg0, BulkRequest arg1,
213213
addFailedMsg(request);
214214
}
215215

216-
addAckSeqs(arg1.requests().size());
216+
// addAckSeqs(arg1.requests().size());
217217
// setDelayTime(1000);
218218
}
219219

@@ -279,24 +279,24 @@ public void checkNeedWait(){
279279
logger.error("", e);
280280
}
281281
}
282-
283-
sendReqs.incrementAndGet();
284-
if(sendReqs.get() - ackReqs.get() < maxLag){
285-
return;
286-
}
287-
288-
while(sendReqs.get() - ackReqs.get() > maxLag){
289-
try {
290-
Thread.sleep(1000);
291-
} catch (InterruptedException e) {
292-
logger.error("", e);
293-
}
294-
}
282+
//
283+
// sendReqs.incrementAndGet();
284+
// if(sendReqs.get() - ackReqs.get() < maxLag){
285+
// return;
286+
// }
287+
//
288+
// while(sendReqs.get() - ackReqs.get() > maxLag){
289+
// try {
290+
// Thread.sleep(1000);
291+
// } catch (InterruptedException e) {
292+
// logger.error("", e);
293+
// }
294+
// }
295295
}
296296

297-
public void addAckSeqs(int num){
298-
ackReqs.addAndGet(num);
299-
}
297+
// public void addAckSeqs(int num){
298+
// ackReqs.addAndGet(num);
299+
// }
300300

301301
// public void setDelayTime(long delayTime){
302302
// if(delayTime > needDelayTime.get()){

0 commit comments

Comments
 (0)