Skip to content

Implement BufferedTokenizer to return an iterable that can verify size limit for every token emitted #17229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jul 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9eab0c8
First rough idea of reimplementation to return an iterator
andsel Mar 3, 2025
7213acb
Exposed Java BufferedTokenizer under FileWatch module and adapted tes…
andsel Mar 3, 2025
ef5ead0
Moved string encoding logic to outer Ruby extension BufferedTokenizer…
andsel Mar 4, 2025
51d8edd
Fixed flush behavior
andsel Mar 4, 2025
950b8d0
Minor, removed commented code
andsel Mar 4, 2025
32b05eb
Moved creation of iterable upfront in the constructor to be executed …
andsel Mar 5, 2025
f4a9d9d
Fixed tests to grab the failure on exceeded size limit on itereation …
andsel Mar 5, 2025
3f5b82c
Fixed license headers
andsel Mar 5, 2025
e40da7f
[Test] added test to verify buffer full error is notified not only on…
andsel Mar 5, 2025
49e4d9c
Updated benchmark to consume effectively the iterator
andsel Mar 5, 2025
84342a1
[Benchmark] JMH report in milliseconds instead of nanoseconds
andsel Mar 6, 2025
d5bc543
Added an isEmpty method to the iterable returned by BufferedTokenizer…
andsel Mar 12, 2025
57b6c0c
Aligned with main
andsel Mar 26, 2025
c32578d
Removes a test that's not anymore valid. Not int math happens in code…
andsel Mar 31, 2025
7111559
- inverted BufferedTokenizerExt§IterableAdapterWithEmptyCheck#isEmpty
andsel Apr 10, 2025
8e9d8e9
Avoid to executed double token scan, in hasNext and next methods
andsel Apr 11, 2025
74e8993
Emptied StringBuilder accumulator in flush and protected the state of…
andsel Apr 11, 2025
81fb197
[Test] Added test to verify that tokenizer condition empty is true wh…
andsel Apr 14, 2025
dc3d7b2
Switched tokenizer implementation's empty to consider also the unterm…
andsel Apr 14, 2025
e2cd80f
Minor, removed commented code
andsel Apr 14, 2025
92c3437
[Test] Added test to verify the avoidance of infnite accumulation
andsel Mar 10, 2025
602bf94
Inloined PR #17293 to check for potential OOM condition during the ap…
andsel Jun 18, 2025
0051a13
Fixed matchNextSeparatorIdx to raise the flag to start dropping on wr…
andsel Jun 18, 2025
b2c44b8
[Test] fixed test, given the flush call the accumulator is emptied so…
andsel Jun 18, 2025
1f4193a
Updated append method to avoid OOM accumulation without that token li…
andsel Jun 19, 2025
983a5ea
Removed unused code and expanded * imports in tests
andsel Jun 19, 2025
1c67ff6
[Test] remove commented code in test
andsel Jun 19, 2025
6976451
Lifted a else branch into the first if statement
andsel Jun 19, 2025
b3d4089
Removed not necessary condition, because if currentIdx is unset (min …
andsel Jun 19, 2025
1162882
Removed redundant if statement
andsel Jun 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash.benchmark;

import org.logstash.common.BufferedTokenizer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.TimeUnit;


@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS)
@Fork(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class BufferedTokenizerBenchmark {

private BufferedTokenizer sut;
private String singleTokenPerFragment;
private String multipleTokensPerFragment;
private String multipleTokensSpreadMultipleFragments_1;
private String multipleTokensSpreadMultipleFragments_2;
private String multipleTokensSpreadMultipleFragments_3;

@Setup(Level.Invocation)
public void setUp() {
sut = new BufferedTokenizer();
singleTokenPerFragment = "a".repeat(512) + "\n";

multipleTokensPerFragment = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n";

multipleTokensSpreadMultipleFragments_1 = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256);
multipleTokensSpreadMultipleFragments_2 = "c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256);
multipleTokensSpreadMultipleFragments_3 = "f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n";
}

@Benchmark
public final void onlyOneTokenPerFragment(Blackhole blackhole) {
Iterable<String> tokens = sut.extract(singleTokenPerFragment);
tokens.forEach(blackhole::consume);
blackhole.consume(tokens);
}

@Benchmark
public final void multipleTokenPerFragment(Blackhole blackhole) {
Iterable<String> tokens = sut.extract(multipleTokensPerFragment);
tokens.forEach(blackhole::consume);
blackhole.consume(tokens);
}

@Benchmark
public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) {
Iterable<String> tokens = sut.extract(multipleTokensSpreadMultipleFragments_1);
tokens.forEach(t -> {});
blackhole.consume(tokens);

tokens = sut.extract(multipleTokensSpreadMultipleFragments_2);
tokens.forEach(t -> {});
blackhole.consume(tokens);

tokens = sut.extract(multipleTokensSpreadMultipleFragments_3);
tokens.forEach(blackhole::consume);
blackhole.consume(tokens);
}
}

This file was deleted.

32 changes: 21 additions & 11 deletions logstash-core/spec/logstash/util/buftok_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,37 @@
describe FileWatch::BufferedTokenizer do
subject { FileWatch::BufferedTokenizer.new }


# A matcher that ensures the result of BufferedTokenizer#extract "quacks like" an expected ruby Array in two respects:
# - #empty? -> boolean: true indicates that the _next_ Enumerable#each will emit zero items.
# - #entries -> Array: the ordered entries
def emit_exactly(expected_array)
# note: order matters; Iterator#each and the methods that delegate to it consume the iterator
have_attributes(:empty? => expected_array.empty?,
:entries => expected_array.entries) # consumes iterator, must be done last
end

it "should tokenize a single token" do
expect(subject.extract("foo\n")).to eq(["foo"])
expect(subject.extract("foo\n")).to emit_exactly(["foo"])
end

it "should merge multiple token" do
expect(subject.extract("foo")).to eq([])
expect(subject.extract("bar\n")).to eq(["foobar"])
expect(subject.extract("foo")).to emit_exactly([])
expect(subject.extract("bar\n")).to emit_exactly(["foobar"])
end

it "should tokenize multiple token" do
expect(subject.extract("foo\nbar\n")).to eq(["foo", "bar"])
expect(subject.extract("foo\nbar\n")).to emit_exactly(["foo", "bar"])
end

it "should ignore empty payload" do
expect(subject.extract("")).to eq([])
expect(subject.extract("foo\nbar")).to eq(["foo"])
expect(subject.extract("")).to emit_exactly([])
expect(subject.extract("foo\nbar")).to emit_exactly(["foo"])
end

it "should tokenize empty payload with newline" do
expect(subject.extract("\n")).to eq([""])
expect(subject.extract("\n\n\n")).to eq(["", "", ""])
expect(subject.extract("\n")).to emit_exactly([""])
expect(subject.extract("\n\n\n")).to emit_exactly(["", "", ""])
end

describe 'flush' do
Expand Down Expand Up @@ -83,12 +93,12 @@
let(:delimiter) { "||" }

it "should tokenize multiple token" do
expect(subject.extract("foo||b|r||")).to eq(["foo", "b|r"])
expect(subject.extract("foo||b|r||")).to emit_exactly(["foo", "b|r"])
end

it "should ignore empty payload" do
expect(subject.extract("")).to eq([])
expect(subject.extract("foo||bar")).to eq(["foo"])
expect(subject.extract("")).to emit_exactly([])
expect(subject.extract("foo||bar")).to emit_exactly(["foo"])
end
end
end
1 change: 1 addition & 0 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.BufferedTokenizerExt;
import org.logstash.common.BufferedTokenizer;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
Expand Down
Loading