Skip to content

Commit accc201

Browse files
authored
Revert "Backport PR #16482 to 8.x: Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16569)" (#16705)
This reverts commit 27bd2a0.
1 parent adfa02b commit accc201

File tree

4 files changed

+12
-322
lines changed

4 files changed

+12
-322
lines changed

logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.jruby.Ruby;
2424
import org.jruby.RubyArray;
25-
import org.jruby.RubyBoolean;
2625
import org.jruby.RubyClass;
2726
import org.jruby.RubyObject;
2827
import org.jruby.RubyString;
@@ -41,12 +40,10 @@ public class BufferedTokenizerExt extends RubyObject {
4140
freeze(RubyUtil.RUBY.getCurrentContext());
4241

4342
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
44-
private StringBuilder headToken = new StringBuilder();
4543
private RubyString delimiter = NEW_LINE;
4644
private int sizeLimit;
4745
private boolean hasSizeLimit;
4846
private int inputSize;
49-
private boolean bufferFullErrorNotified = false;
5047

5148
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
5249
super(runtime, metaClass);
@@ -69,6 +66,7 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
6966
* Extract takes an arbitrary string of input data and returns an array of
7067
* tokenized entities, provided there were any available to extract. This
7168
* makes for easy processing of datagrams using a pattern like:
69+
*
7270
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
7371
*
7472
* @param context ThreadContext
@@ -79,63 +77,22 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
7977
@SuppressWarnings("rawtypes")
8078
public RubyArray extract(final ThreadContext context, IRubyObject data) {
8179
final RubyArray entities = data.convertToString().split(delimiter, -1);
82-
if (!bufferFullErrorNotified) {
83-
input.clear();
84-
input.addAll(entities);
85-
} else {
86-
// after a full buffer signal
87-
if (input.isEmpty()) {
88-
// after a buffer full error, the remaining part of the line, till next delimiter,
89-
// has to be consumed, unless the input buffer doesn't still contain fragments of
90-
// subsequent tokens.
91-
entities.shift(context);
92-
input.addAll(entities);
93-
} else {
94-
// merge last of the input with first of incoming data segment
95-
if (!entities.isEmpty()) {
96-
RubyString last = ((RubyString) input.pop(context));
97-
RubyString nextFirst = ((RubyString) entities.shift(context));
98-
entities.unshift(last.concat(nextFirst));
99-
input.addAll(entities);
100-
}
101-
}
102-
}
103-
10480
if (hasSizeLimit) {
105-
if (bufferFullErrorNotified) {
106-
bufferFullErrorNotified = false;
107-
if (input.isEmpty()) {
108-
return RubyUtil.RUBY.newArray();
109-
}
110-
}
111-
final int entitiesSize = ((RubyString) input.first()).size();
81+
final int entitiesSize = ((RubyString) entities.first()).size();
11282
if (inputSize + entitiesSize > sizeLimit) {
113-
bufferFullErrorNotified = true;
114-
headToken = new StringBuilder();
115-
inputSize = 0;
116-
input.shift(context); // consume the token fragment that generates the buffer full
11783
throw new IllegalStateException("input buffer full");
11884
}
11985
this.inputSize = inputSize + entitiesSize;
12086
}
121-
122-
if (input.getLength() < 2) {
123-
// this is a specialization case which avoid adding and removing from input accumulator
124-
// when it contains just one element
125-
headToken.append(input.shift(context)); // remove head
87+
input.append(entities.shift(context));
88+
if (entities.isEmpty()) {
12689
return RubyUtil.RUBY.newArray();
12790
}
128-
129-
if (headToken.length() > 0) {
130-
// if there is a pending token part, merge it with the first token segment present
131-
// in the accumulator, and clean the pending token part.
132-
headToken.append(input.shift(context)); // append buffer to first element and
133-
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
134-
headToken = new StringBuilder();
135-
}
136-
headToken.append(input.pop(context)); // put the leftovers in headToken for later
137-
inputSize = headToken.length();
138-
return input;
91+
entities.unshift(input.join(context));
92+
input.clear();
93+
input.append(entities.pop(context));
94+
inputSize = ((RubyString) input.first()).size();
95+
return entities;
13996
}
14097

14198
/**
@@ -147,14 +104,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
147104
*/
148105
@JRubyMethod
149106
public IRubyObject flush(final ThreadContext context) {
150-
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
151-
headToken = new StringBuilder();
107+
final IRubyObject buffer = input.join(context);
108+
input.clear();
152109
return buffer;
153110
}
154111

155112
@JRubyMethod(name = "empty?")
156113
public IRubyObject isEmpty(final ThreadContext context) {
157-
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
114+
return input.empty_p();
158115
}
159116

160117
}

logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java

Lines changed: 0 additions & 91 deletions
This file was deleted.

logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

0 commit comments

Comments
 (0)