Skip to content

Commit 09a2cae

Browse files
committed
Added support for "update" action
Including doc and doc_as_upsert support, scripted updated and upsert.
1 parent aeeff35 commit 09a2cae

File tree

2 files changed

+113
-10
lines changed

2 files changed

+113
-10
lines changed

lib/logstash/codecs/es_bulk.rb

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111
class LogStash::Codecs::ESBulk < LogStash::Codecs::Base
1212
config_name "es_bulk"
1313

14+
# Parse update action source line (doc, doc_as_upsert)
15+
config :parse_update, :validate => :boolean, :default => false
16+
17+
# Parse scripted_upsert update action source line (script, params, upsert). Relevant only when parse_update=true
18+
config :parse_scripted_upsert, :validate => :boolean, :default => true
19+
1420
public
1521
def initialize(params={})
1622
super(params)
@@ -27,7 +33,35 @@ def decode(data)
2733
line = LogStash::Json.load(bulk.get("message"))
2834
case state
2935
when :metadata
30-
event = LogStash::Event.new(line)
36+
if metadata["action"] == 'update' and @parse_update
37+
if line.has_key?("doc")
38+
event = LogStash::Event.new(line["doc"])
39+
if line.has_key?("doc_as_upsert")
40+
metadata["doc_as_upsert"] = line["doc_as_upsert"]
41+
end
42+
elsif line.has_key?("params") and @parse_scripted_upsert
43+
event = LogStash::Event.new(line["params"])
44+
metadata["scripted_upsert"] = true
45+
if line.has_key?("script")
46+
metadata["script"] = line["script"]
47+
metadata["script_type"] = "inline"
48+
elsif line.has_key?("script_id")
49+
metadata["script"] = line["script_id"]
50+
metadata["script_type"] = "indexed"
51+
end
52+
if line.has_key?("lang")
53+
metadata["script_lang"] = line["lang"]
54+
end
55+
if line.has_key?("upsert")
56+
metadata["upsert"] = LogStash::Json.dump(line["upsert"])
57+
end
58+
else
59+
# fallback to default
60+
event = LogStash::Event.new(line)
61+
end
62+
else
63+
event = LogStash::Event.new(line)
64+
end
3165
event.set("@metadata", metadata)
3266
yield event
3367
state = :initial

spec/codecs/es_bulk_spec.rb

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
require "insist"
44

55
describe LogStash::Codecs::ESBulk do
6-
subject do
7-
next LogStash::Codecs::ESBulk.new
8-
end
9-
106
context "#decode" do
11-
it "should return 4 events from json data" do
7+
let(:config) { {} }
8+
let(:subject) { LogStash::Codecs::ESBulk.new(config) }
9+
10+
it "should return 5 events from json data" do
1211
data = <<-HERE
1312
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
1413
{ "field1" : "value1" }
@@ -17,6 +16,8 @@
1716
{ "field1" : "value3" }
1817
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
1918
{ "doc" : {"field2" : "value2"} }
19+
{ "update" : {"_id" : "5", "_type" : "type1", "_index" : "index1"} }
20+
{ "upsert": {}, "params" : {"field2" : "value2"}, "scripted_upsert": true, "script": "test_script" }
2021
HERE
2122

2223
count = 0
@@ -37,21 +38,89 @@
3738
insist { event.get("[@metadata][_id]") } == "1"
3839
insist { event.get("[@metadata][action]") } == "update"
3940
insist { event.get("[doc][field2]") } == "value2"
41+
when 3
42+
insist { event.get("[@metadata][_id]") } == "5"
43+
insist { event.get("[@metadata][action]") } == "update"
44+
insist { event.get("[params][field2]") } == "value2"
45+
insist { event.get("[scripted_upsert]") } == true
46+
insist { event.get("[script]") } == "test_script"
4047
end
4148
count += 1
4249
end
43-
insist { count } == 4
50+
insist { count } == 5
4451
end
45-
end
4652

47-
context "fail to process non-bulk event then continue" do
48-
it "continues after a fail" do
53+
it "fail to process non-bulk event then continue" do
4954
decoded = false
5055
subject.decode("something that isn't a bulk event\n") do |event|
5156
decoded = true
5257
end
5358
insist { decoded } == false
5459
end
60+
61+
it "when parse_update=true, should return 7 events from json data" do
62+
config.update("parse_update" => true)
63+
64+
data = <<-HERE
65+
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
66+
{ "field1" : "value1" }
67+
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
68+
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
69+
{ "field1" : "value3" }
70+
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
71+
{ "doc" : {"field2" : "value2"} }
72+
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
73+
{ "doc" : {"field2" : "value2"}, "doc_as_upsert": true }
74+
{ "update" : {"_id" : "5", "_type" : "type1", "_index" : "index1"} }
75+
{ "params" : {"field1" : "value5"}, "script": "some script", "upsert": {} }
76+
{ "update" : {"_id" : "6", "_type" : "type1", "_index" : "index1"} }
77+
{ "params" : {"field1" : "value6"}, "script_id": "some_script_id", "lang": "js", "upsert": { "field2": "value7"} }
78+
HERE
79+
80+
count = 0
81+
subject.decode(data) do |event|
82+
case count
83+
when 0
84+
insist { event.get("[@metadata][_id]") } == "1"
85+
insist { event.get("[@metadata][action]") } == "index"
86+
insist { event.get("field1") } == "value1"
87+
when 1
88+
insist { event.get("[@metadata][_id]") } == "2"
89+
insist { event.get("[@metadata][action]") } == "delete"
90+
when 2
91+
insist { event.get("[@metadata][_id]") } == "3"
92+
insist { event.get("[@metadata][action]") } == "create"
93+
insist { event.get("field1") } == "value3"
94+
when 3
95+
insist { event.get("[@metadata][_id]") } == "1"
96+
insist { event.get("[@metadata][action]") } == "update"
97+
insist { event.get("[field2]") } == "value2"
98+
when 4
99+
insist { event.get("[@metadata][doc_as_upsert]") } == true
100+
insist { event.get("[field2]") } == "value2"
101+
when 5
102+
insist { event.get("[@metadata][_id]") } == "5"
103+
insist { event.get("[@metadata][action]") } == "update"
104+
insist { event.get("[@metadata][script]") } == "some script"
105+
insist { event.get("[@metadata][script_type]") } == "inline"
106+
insist { event.get("[@metadata][scripted_upsert]") } == true
107+
insist { event.get("[@metadata][upsert]") } == "{}"
108+
insist { event.get("[field1]") } == "value5"
109+
when 6
110+
insist { event.get("[@metadata][_id]") } == "6"
111+
insist { event.get("[@metadata][action]") } == "update"
112+
insist { event.get("[@metadata][script]") } == "some_script_id"
113+
insist { event.get("[@metadata][script_type]") } == "indexed"
114+
insist { event.get("[@metadata][script_lang]") } == "js"
115+
insist { event.get("[@metadata][scripted_upsert]") } == true
116+
insist { event.get("[@metadata][upsert]") } == "{\"field2\":\"value7\"}"
117+
insist { event.get("[field1]") } == "value6"
118+
end
119+
count += 1
120+
end
121+
insist { count } == 7
122+
end
123+
55124
end
56125

57126
end

0 commit comments

Comments
 (0)