Skip to content

Commit 957a19d

Browse files
committed
Added support for "update" action
Including doc and doc_as_upsert support, scripted updated and upsert.
1 parent aeeff35 commit 957a19d

File tree

2 files changed

+103
-8
lines changed

2 files changed

+103
-8
lines changed

lib/logstash/codecs/es_bulk.rb

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
class LogStash::Codecs::ESBulk < LogStash::Codecs::Base
1212
config_name "es_bulk"
1313

14+
# Parse update action source line (doc, doc_as_upsert)
15+
config :parse_update, :validate => :boolean, :default => false
16+
17+
# Parse scripted_upsert update action source line (script, params, upsert). Relevant only when parse_update=true
18+
config :parse_scripted_upsert, :validate => :boolean, :default => true
19+
1420
public
1521
def initialize(params={})
1622
super(params)
@@ -27,7 +33,35 @@ def decode(data)
2733
line = LogStash::Json.load(bulk.get("message"))
2834
case state
2935
when :metadata
30-
event = LogStash::Event.new(line)
36+
if metadata["action"] == 'update' and @parse_update
37+
if line.has_key?("doc")
38+
event = LogStash::Event.new(line["doc"])
39+
if line.has_key?("doc_as_upsert")
40+
metadata["doc_as_upsert"] = line["doc_as_upsert"]
41+
end
42+
elsif line.has_key?("params") and @parse_scripted_upsert
43+
event = LogStash::Event.new(line["params"])
44+
metadata["scripted_upsert"] = true
45+
if line.has_key?("script")
46+
metadata["script"] = line["script"]
47+
metadata["script_type"] = "inline"
48+
elsif line.has_key?("script_id")
49+
metadata["script"] = line["script_id"]
50+
metadata["script_type"] = "indexed"
51+
end
52+
if line.has_key?("lang")
53+
metadata["script_lang"] = line["lang"]
54+
end
55+
if line.has_key?("upsert")
56+
metadata["upsert"] = LogStash::Json.dump(line["upsert"])
57+
end
58+
else
59+
# fallback to default
60+
event = LogStash::Event.new(line)
61+
end
62+
else
63+
event = LogStash::Event.new(line)
64+
end
3165
event.set("@metadata", metadata)
3266
yield event
3367
state = :initial

spec/codecs/es_bulk_spec.rb

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@
33
require "insist"
44

55
describe LogStash::Codecs::ESBulk do
6-
subject do
7-
next LogStash::Codecs::ESBulk.new
8-
end
9-
106
context "#decode" do
7+
let(:config) { {} }
8+
let(:subject) { LogStash::Codecs::ESBulk.new(config) }
9+
1110
it "should return 4 events from json data" do
1211
data = <<-HERE
1312
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
@@ -42,16 +41,78 @@
4241
end
4342
insist { count } == 4
4443
end
45-
end
4644

47-
context "fail to process non-bulk event then continue" do
48-
it "continues after a fail" do
45+
it "fail to process non-bulk event then continue" do
4946
decoded = false
5047
subject.decode("something that isn't a bulk event\n") do |event|
5148
decoded = true
5249
end
5350
insist { decoded } == false
5451
end
52+
53+
it "when parse_update=true, should return 7 events from json data" do
54+
config.update("parse_update" => true)
55+
56+
data = <<-HERE
57+
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
58+
{ "field1" : "value1" }
59+
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
60+
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
61+
{ "field1" : "value3" }
62+
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
63+
{ "doc" : {"field2" : "value2"} }
64+
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
65+
{ "doc" : {"field2" : "value2"}, "doc_as_upsert": true }
66+
{ "update" : {"_id" : "5", "_type" : "type1", "_index" : "index1"} }
67+
{ "params" : {"field1" : "value5"}, "script": "some script", "upsert": {} }
68+
{ "update" : {"_id" : "6", "_type" : "type1", "_index" : "index1"} }
69+
{ "params" : {"field1" : "value6"}, "script_id": "some_script_id", "lang": "js", "upsert": { "field2": "value7"} }
70+
HERE
71+
72+
count = 0
73+
subject.decode(data) do |event|
74+
case count
75+
when 0
76+
insist { event.get("[@metadata][_id]") } == "1"
77+
insist { event.get("[@metadata][action]") } == "index"
78+
insist { event.get("field1") } == "value1"
79+
when 1
80+
insist { event.get("[@metadata][_id]") } == "2"
81+
insist { event.get("[@metadata][action]") } == "delete"
82+
when 2
83+
insist { event.get("[@metadata][_id]") } == "3"
84+
insist { event.get("[@metadata][action]") } == "create"
85+
insist { event.get("field1") } == "value3"
86+
when 3
87+
insist { event.get("[@metadata][_id]") } == "1"
88+
insist { event.get("[@metadata][action]") } == "update"
89+
insist { event.get("[field2]") } == "value2"
90+
when 4
91+
insist { event.get("[@metadata][doc_as_upsert]") } == true
92+
insist { event.get("[field2]") } == "value2"
93+
when 5
94+
insist { event.get("[@metadata][_id]") } == "5"
95+
insist { event.get("[@metadata][action]") } == "update"
96+
insist { event.get("[@metadata][script]") } == "some script"
97+
insist { event.get("[@metadata][script_type]") } == "inline"
98+
insist { event.get("[@metadata][scripted_upsert]") } == true
99+
insist { event.get("[@metadata][upsert]") } == "{}"
100+
insist { event.get("[field1]") } == "value5"
101+
when 6
102+
insist { event.get("[@metadata][_id]") } == "6"
103+
insist { event.get("[@metadata][action]") } == "update"
104+
insist { event.get("[@metadata][script]") } == "some_script_id"
105+
insist { event.get("[@metadata][script_type]") } == "indexed"
106+
insist { event.get("[@metadata][script_lang]") } == "js"
107+
insist { event.get("[@metadata][scripted_upsert]") } == true
108+
insist { event.get("[@metadata][upsert]") } == "{\"field2\":\"value7\"}"
109+
insist { event.get("[field1]") } == "value6"
110+
end
111+
count += 1
112+
end
113+
insist { count } == 7
114+
end
115+
55116
end
56117

57118
end

0 commit comments

Comments
 (0)