Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ source 'https://rubygems.org/'
gemspec

gem 'benchmark'
gem 'json', git: 'https://github.com/byroot/json.git', branch: 'resumable-parser'

local_gemfile = File.join(File.dirname(__FILE__), "Gemfile.local")
if File.exist?(local_gemfile)
Expand Down
1 change: 0 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ Gem::Specification.new do |gem|

gem.add_runtime_dependency("bundler")
gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 2.3.2", "< 3.0.0"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.9.0"])
Expand Down
13 changes: 9 additions & 4 deletions lib/fluent/compat/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

require 'msgpack'
require 'json'
require 'yajl'

require 'fluent/engine'
require 'fluent/plugin'
Expand Down Expand Up @@ -78,9 +77,15 @@ def each_line(line)

class JSONParser < Parser
def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
parser = JSON::Ext::ResumableParser.new({})
buffer_size = 8192

while (chunk = io.read(buffer_size))
parser << chunk
while parser.parse
@on_message.call(parser.value)
end
end
end
end

Expand Down
1 change: 0 additions & 1 deletion lib/fluent/config/literal_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
require 'stringio'

require 'json'
require 'yajl'
require 'socket'
require 'ripper'

Expand Down
1 change: 0 additions & 1 deletion lib/fluent/load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require 'stringio'
require 'fileutils'
require 'json'
require 'yajl'
require 'uri'
require 'msgpack'
require 'strptime'
Expand Down
16 changes: 9 additions & 7 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

require 'fluent/plugin/input'
require 'fluent/msgpack_factory'
require 'yajl'
require 'json'
require 'digest'
require 'securerandom'

Expand Down Expand Up @@ -248,13 +248,15 @@ def read_messages(conn, &block)
unless feeder
first = data[0]
if first == '{' || first == '[' # json
parser = Yajl::Parser.new
parser.on_parse_complete = ->(obj){
block.call(obj, bytes, serializer)
bytes = 0
}
parser = JSON::Ext::ResumableParser.new({})
serializer = :to_json.to_proc
feeder = ->(d){ parser << d }
feeder = ->(d){
parser << d
while parser.parse
block.call(parser.value, bytes, serializer)
bytes = 0
end
}
else # msgpack
parser = Fluent::MessagePackFactory.msgpack_unpacker
serializer = :to_msgpack.to_proc
Expand Down
8 changes: 5 additions & 3 deletions lib/fluent/plugin/in_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
require 'fluent/msgpack_factory'

require 'cool.io'
require 'yajl'
require 'json'
require 'fileutils'
require 'socket'

Expand Down Expand Up @@ -158,8 +158,7 @@ def on_read(data)
first = data[0]
if first == '{'.freeze || first == '['.freeze
m = method(:on_read_json)
@parser = Yajl::Parser.new
@parser.on_parse_complete = @on_message
@parser = JSON::Ext::ResumableParser.new({})
else
m = method(:on_read_msgpack)
@parser = Fluent::MessagePackFactory.msgpack_unpacker
Expand All @@ -173,6 +172,9 @@ def on_read(data)

def on_read_json(data)
@parser << data
while @parser.parse
@on_message.call(@parser.value)
end
rescue => e
@log.error "unexpected error in json payload", error: e.to_s
@log.error_backtrace
Expand Down
25 changes: 13 additions & 12 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
require 'fluent/time'
require 'fluent/oj_options'

require 'yajl'
require 'json'

module Fluent
Expand All @@ -28,12 +27,10 @@ class JSONParser < Parser

config_set_default :time_key, 'time'
desc 'Set JSON parser'
# NOTE: Contains yajl for backward compatibility
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj

# The Yajl library defines a default buffer size of 8KiB when parsing
# from IO streams, so maintain this for backwards-compatibility.
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
desc 'Set the buffer size that Yajl will use when parsing streaming input'
desc 'Set the buffer size that JSON parser will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192

config_set_default :time_type, :float
Expand All @@ -54,8 +51,8 @@ def configure_json_parser(name)

log&.info "Oj is not installed, and failing back to JSON for json parser"
configure_json_parser(:json)
when :json then [JSON.method(:parse), JSON::ParserError]
when :yajl then [Yajl.method(:load), Yajl::ParseError]
when :yajl, :json # NOTE: Fallback yajl to json for backward compatibility
[JSON.method(:parse), JSON::ParserError]
else
raise "BUG: unknown json parser specified: #{name}"
end
Expand Down Expand Up @@ -94,11 +91,15 @@ def parser_type
end

def parse_io(io, &block)
y = Yajl::Parser.new
y.on_parse_complete = ->(record){
block.call(parse_time(record), record)
}
y.parse(io, @stream_buffer_size)
parser = JSON::Ext::ResumableParser.new({})
while (chunk = io.read(@stream_buffer_size))
parser << chunk

while parser.parse
record = parser.value
block.call(parse_time(record), record)
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def setup
sub_test_case "configure_json_parser" do
data("oj", [:oj, [Oj.method(:load), Oj::ParseError]])
data("json", [:json, [JSON.method(:parse), JSON::ParserError]])
data("yajl", [:yajl, [Yajl.method(:load), Yajl::ParseError]])
data("yajl", [:yajl, [JSON.method(:parse), JSON::ParserError]])
def test_return_each_loader((input, expected_return))
result = @parser.instance.configure_json_parser(input)
assert_equal expected_return, result
Expand Down
8 changes: 1 addition & 7 deletions test/test_event_time.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require_relative 'helper'
require 'timecop'
require 'oj'
require 'yajl'
require 'json'

class EventTimeTest < Test::Unit::TestCase
setup do
Expand Down Expand Up @@ -70,12 +70,6 @@ class EventTimeTest < Test::Unit::TestCase
assert_equal('["tag",100,{"key":"value"}]', Oj.dump(["tag", time, {"key" => "value"}], mode: :compat))
end

test 'Yajl.dump' do
time = Fluent::EventTime.new(100)
assert_equal('{"time":100}', Yajl.dump({'time' => time}))
assert_equal('["tag",100,{"key":"value"}]', Yajl.dump(["tag", time, {"key" => "value"}]))
end

test '.from_time' do
sec = 1000
usec = 2
Expand Down
12 changes: 6 additions & 6 deletions test/test_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,28 @@ def test_format
configure({})
formatted = @formatter.format(tag, @time, record)

assert_equal("#{time2str(@time)}\t#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{time2str(@time)}\t#{tag}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time
configure('output_time' => 'false')
formatted = @formatter.format(tag, @time, record)

assert_equal("#{tag}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{tag}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_tag
configure('output_tag' => 'false')
formatted = @formatter.format(tag, @time, record)

assert_equal("#{time2str(@time)}\t#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{time2str(@time)}\t#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time_and_tag
configure('output_tag' => 'false', 'output_time' => 'false')
formatted = @formatter.format('tag', @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end

def test_format_without_time_and_tag_against_string_literal_configure
Expand All @@ -100,7 +100,7 @@ def test_format_without_time_and_tag_against_string_literal_configure
])
formatted = @formatter.format('tag', @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end
end

Expand All @@ -122,7 +122,7 @@ def test_format(data)
@formatter.configure('json_parser' => data)
formatted = @formatter.format(tag, @time, record)

assert_equal("#{Yajl.dump(record)}#{@newline}", formatted)
assert_equal("#{JSON.generate(record)}#{@newline}", formatted)
end
end

Expand Down
Loading