Skip to content

Commit a52e94c

Browse files
andsellogstashmachine
authored andcommitted
Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16482)
Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met. Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter. Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array. Furthermore it translates the `buftok_spec` tests into JUnit tests. (cherry picked from commit 85493ce)
1 parent 6a573f4 commit a52e94c

File tree

4 files changed

+322
-12
lines changed

4 files changed

+322
-12
lines changed

logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.jruby.Ruby;
2424
import org.jruby.RubyArray;
25+
import org.jruby.RubyBoolean;
2526
import org.jruby.RubyClass;
2627
import org.jruby.RubyObject;
2728
import org.jruby.RubyString;
@@ -40,10 +41,12 @@ public class BufferedTokenizerExt extends RubyObject {
4041
freeze(RubyUtil.RUBY.getCurrentContext());
4142

4243
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
44+
private StringBuilder headToken = new StringBuilder();
4345
private RubyString delimiter = NEW_LINE;
4446
private int sizeLimit;
4547
private boolean hasSizeLimit;
4648
private int inputSize;
49+
private boolean bufferFullErrorNotified = false;
4750

4851
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
4952
super(runtime, metaClass);
@@ -66,7 +69,6 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
6669
* Extract takes an arbitrary string of input data and returns an array of
6770
* tokenized entities, provided there were any available to extract. This
6871
* makes for easy processing of datagrams using a pattern like:
69-
*
7072
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
7173
*
7274
* @param context ThreadContext
@@ -77,22 +79,63 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
7779
@SuppressWarnings("rawtypes")
7880
public RubyArray extract(final ThreadContext context, IRubyObject data) {
7981
final RubyArray entities = data.convertToString().split(delimiter, -1);
82+
if (!bufferFullErrorNotified) {
83+
input.clear();
84+
input.addAll(entities);
85+
} else {
86+
// after a full buffer signal
87+
if (input.isEmpty()) {
88+
// after a buffer full error, the remaining part of the line, till next delimiter,
89+
// has to be consumed, unless the input buffer doesn't still contain fragments of
90+
// subsequent tokens.
91+
entities.shift(context);
92+
input.addAll(entities);
93+
} else {
94+
// merge last of the input with first of incoming data segment
95+
if (!entities.isEmpty()) {
96+
RubyString last = ((RubyString) input.pop(context));
97+
RubyString nextFirst = ((RubyString) entities.shift(context));
98+
entities.unshift(last.concat(nextFirst));
99+
input.addAll(entities);
100+
}
101+
}
102+
}
103+
80104
if (hasSizeLimit) {
81-
final int entitiesSize = ((RubyString) entities.first()).size();
105+
if (bufferFullErrorNotified) {
106+
bufferFullErrorNotified = false;
107+
if (input.isEmpty()) {
108+
return RubyUtil.RUBY.newArray();
109+
}
110+
}
111+
final int entitiesSize = ((RubyString) input.first()).size();
82112
if (inputSize + entitiesSize > sizeLimit) {
113+
bufferFullErrorNotified = true;
114+
headToken = new StringBuilder();
115+
inputSize = 0;
116+
input.shift(context); // consume the token fragment that generates the buffer full
83117
throw new IllegalStateException("input buffer full");
84118
}
85119
this.inputSize = inputSize + entitiesSize;
86120
}
87-
input.append(entities.shift(context));
88-
if (entities.isEmpty()) {
121+
122+
if (input.getLength() < 2) {
123+
// this is a specialization case which avoid adding and removing from input accumulator
124+
// when it contains just one element
125+
headToken.append(input.shift(context)); // remove head
89126
return RubyUtil.RUBY.newArray();
90127
}
91-
entities.unshift(input.join(context));
92-
input.clear();
93-
input.append(entities.pop(context));
94-
inputSize = ((RubyString) input.first()).size();
95-
return entities;
128+
129+
if (headToken.length() > 0) {
130+
// if there is a pending token part, merge it with the first token segment present
131+
// in the accumulator, and clean the pending token part.
132+
headToken.append(input.shift(context)); // append buffer to first element and
133+
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
134+
headToken = new StringBuilder();
135+
}
136+
headToken.append(input.pop(context)); // put the leftovers in headToken for later
137+
inputSize = headToken.length();
138+
return input;
96139
}
97140

98141
/**
@@ -104,14 +147,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
104147
*/
105148
@JRubyMethod
106149
public IRubyObject flush(final ThreadContext context) {
107-
final IRubyObject buffer = input.join(context);
108-
input.clear();
150+
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
151+
headToken = new StringBuilder();
109152
return buffer;
110153
}
111154

112155
@JRubyMethod(name = "empty?")
113156
public IRubyObject isEmpty(final ThreadContext context) {
114-
return input.empty_p();
157+
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
115158
}
116159

117160
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyString;
24+
import org.jruby.runtime.ThreadContext;
25+
import org.jruby.runtime.builtin.IRubyObject;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.logstash.RubyTestBase;
29+
import org.logstash.RubyUtil;
30+
31+
import java.util.List;
32+
33+
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
35+
import static org.logstash.RubyUtil.RUBY;
36+
37+
@SuppressWarnings("unchecked")
38+
public final class BufferedTokenizerExtTest extends RubyTestBase {
39+
40+
private BufferedTokenizerExt sut;
41+
private ThreadContext context;
42+
43+
@Before
44+
public void setUp() {
45+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
46+
context = RUBY.getCurrentContext();
47+
IRubyObject[] args = {};
48+
sut.init(context, args);
49+
}
50+
51+
@Test
52+
public void shouldTokenizeASingleToken() {
53+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n"));
54+
55+
assertEquals(List.of("foo"), tokens);
56+
}
57+
58+
@Test
59+
public void shouldMergeMultipleToken() {
60+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo"));
61+
assertTrue(tokens.isEmpty());
62+
63+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n"));
64+
assertEquals(List.of("foobar"), tokens);
65+
}
66+
67+
@Test
68+
public void shouldTokenizeMultipleToken() {
69+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"));
70+
71+
assertEquals(List.of("foo", "bar"), tokens);
72+
}
73+
74+
@Test
75+
public void shouldIgnoreEmptyPayload() {
76+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
77+
assertTrue(tokens.isEmpty());
78+
79+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"));
80+
assertEquals(List.of("foo"), tokens);
81+
}
82+
83+
@Test
84+
public void shouldTokenizeEmptyPayloadWithNewline() {
85+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n"));
86+
assertEquals(List.of(""), tokens);
87+
88+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"));
89+
assertEquals(List.of("", "", ""), tokens);
90+
}
91+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyString;
24+
import org.jruby.runtime.ThreadContext;
25+
import org.jruby.runtime.builtin.IRubyObject;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.logstash.RubyTestBase;
29+
import org.logstash.RubyUtil;
30+
31+
import java.util.List;
32+
33+
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
35+
import static org.logstash.RubyUtil.RUBY;
36+
37+
@SuppressWarnings("unchecked")
38+
public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase {
39+
40+
private BufferedTokenizerExt sut;
41+
private ThreadContext context;
42+
43+
@Before
44+
public void setUp() {
45+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
46+
context = RUBY.getCurrentContext();
47+
IRubyObject[] args = {RubyUtil.RUBY.newString("||")};
48+
sut.init(context, args);
49+
}
50+
51+
@Test
52+
public void shouldTokenizeMultipleToken() {
53+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||"));
54+
55+
assertEquals(List.of("foo", "b|r"), tokens);
56+
}
57+
58+
@Test
59+
public void shouldIgnoreEmptyPayload() {
60+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
61+
assertTrue(tokens.isEmpty());
62+
63+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar"));
64+
assertEquals(List.of("foo"), tokens);
65+
}
66+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package org.logstash.common;
2+
/*
3+
* Licensed to Elasticsearch B.V. under one or more contributor
4+
* license agreements. See the NOTICE file distributed with
5+
* this work for additional information regarding copyright
6+
* ownership. Elasticsearch B.V. licenses this file to you under
7+
* the Apache License, Version 2.0 (the "License"); you may
8+
* not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*/
20+
21+
import org.jruby.RubyArray;
22+
import org.jruby.RubyString;
23+
import org.jruby.runtime.ThreadContext;
24+
import org.jruby.runtime.builtin.IRubyObject;
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.logstash.RubyTestBase;
28+
import org.logstash.RubyUtil;
29+
30+
import java.util.List;
31+
32+
import static org.hamcrest.MatcherAssert.assertThat;
33+
import static org.hamcrest.Matchers.containsString;
34+
import static org.junit.Assert.*;
35+
import static org.logstash.RubyUtil.RUBY;
36+
37+
@SuppressWarnings("unchecked")
38+
public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase {
39+
40+
private BufferedTokenizerExt sut;
41+
private ThreadContext context;
42+
43+
@Before
44+
public void setUp() {
45+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
46+
context = RUBY.getCurrentContext();
47+
IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)};
48+
sut.init(context, args);
49+
}
50+
51+
@Test
52+
public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() {
53+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"));
54+
55+
assertEquals(List.of("foo", "bar"), tokens);
56+
}
57+
58+
@Test
59+
public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() {
60+
Exception thrownException = assertThrows(IllegalStateException.class, () -> {
61+
sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom"));
62+
});
63+
assertThat(thrownException.getMessage(), containsString("input buffer full"));
64+
}
65+
66+
@Test
67+
public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() {
68+
Exception thrownException = assertThrows(IllegalStateException.class, () -> {
69+
sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom"));
70+
});
71+
assertThat(thrownException.getMessage(), containsString("input buffer full"));
72+
73+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\nanother"));
74+
assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens);
75+
}
76+
77+
@Test
78+
public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() {
79+
sut.extract(context, RubyUtil.RUBY.newString("aaaa"));
80+
81+
Exception thrownException = assertThrows(IllegalStateException.class, () -> {
82+
sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa"));
83+
});
84+
assertThat(thrownException.getMessage(), containsString("input buffer full"));
85+
86+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc"));
87+
assertEquals(List.of("bbbb"), tokens);
88+
}
89+
90+
@Test
91+
public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() {
92+
sut.extract(context, RubyUtil.RUBY.newString("aaaa"));
93+
94+
//first buffer full on 13 "a" letters
95+
Exception thrownException = assertThrows(IllegalStateException.class, () -> {
96+
sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa"));
97+
});
98+
assertThat(thrownException.getMessage(), containsString("input buffer full"));
99+
100+
// second buffer full on 11 "b" letters
101+
Exception secondThrownException = assertThrows(IllegalStateException.class, () -> {
102+
sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc"));
103+
});
104+
assertThat(secondThrownException.getMessage(), containsString("input buffer full"));
105+
106+
// now should resemble processing on c and d
107+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n"));
108+
assertEquals(List.of("ccccc", "ddd"), tokens);
109+
}
110+
}

0 commit comments

Comments
 (0)