Skip to content

Commit 9ae6a59

Browse files
author
Michael Gelfand
committed
Added support for "update" action
Including doc and doc_as_upsert support, scripted updated and upsert.
1 parent aeeff35 commit 9ae6a59

File tree

2 files changed

+54
-3
lines changed

2 files changed

+54
-3
lines changed

lib/logstash/codecs/es_bulk.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,32 @@ def decode(data)
2727
line = LogStash::Json.load(bulk.get("message"))
2828
case state
2929
when :metadata
30-
event = LogStash::Event.new(line)
30+
if metadata["action"] == 'update'
31+
if line.has_key?("doc")
32+
event = LogStash::Event.new(line["doc"])
33+
if line.has_key?("doc_as_upsert")
34+
metadata["doc_as_upsert"] = line["doc_as_upsert"]
35+
end
36+
elsif line.has_key?("params")
37+
event = LogStash::Event.new(line["params"])
38+
metadata["scripted_upsert"] = true
39+
if line.has_key?("script")
40+
metadata["script"] = line["script"]
41+
metadata["script_type"] = "inline"
42+
elsif line.has_key?("script_id")
43+
metadata["script"] = line["script_id"]
44+
metadata["script_type"] = "indexed"
45+
end
46+
if line.has_key?("lang")
47+
metadata["script_lang"] = line["lang"]
48+
end
49+
if line.has_key?("upsert")
50+
metadata["upsert"] = LogStash::Json.dump(line["upsert"])
51+
end
52+
end
53+
else
54+
event = LogStash::Event.new(line)
55+
end
3156
event.set("@metadata", metadata)
3257
yield event
3358
state = :initial

spec/codecs/es_bulk_spec.rb

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
end
99

1010
context "#decode" do
11-
it "should return 4 events from json data" do
11+
it "should return 7 events from json data" do
1212
data = <<-HERE
1313
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
1414
{ "field1" : "value1" }
@@ -17,6 +17,12 @@
1717
{ "field1" : "value3" }
1818
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
1919
{ "doc" : {"field2" : "value2"} }
20+
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
21+
{ "doc" : {"field2" : "value2"}, "doc_as_upsert": true }
22+
{ "update" : {"_id" : "5", "_type" : "type1", "_index" : "index1"} }
23+
{ "params" : {"field1" : "value5"}, "script": "some script", "upsert": {} }
24+
{ "update" : {"_id" : "6", "_type" : "type1", "_index" : "index1"} }
25+
{ "params" : {"field1" : "value6"}, "script_id": "some_script_id", "lang": "js", "upsert": { "field2": "value7"} }
2026
HERE
2127

2228
count = 0
@@ -37,10 +43,30 @@
3743
insist { event.get("[@metadata][_id]") } == "1"
3844
insist { event.get("[@metadata][action]") } == "update"
3945
insist { event.get("[doc][field2]") } == "value2"
46+
when 4
47+
insist { event.get("[@metadata][doc_as_upsert]") } == true
48+
insist { event.get("[doc][field2]") } == "value2"
49+
when 5
50+
insist { event.get("[@metadata][_id]") } == "5"
51+
insist { event.get("[@metadata][action]") } == "update"
52+
insist { event.get("[@metadata][script]") } == "some script"
53+
insist { event.get("[@metadata][script_type]") } == "inline"
54+
insist { event.get("[@metadata][scripted_upsert]") } == true
55+
insist { event.get("[@metadata][upsert]") } == "{}"
56+
insist { event.get("[doc][field1]") } == "value5"
57+
when 6
58+
insist { event.get("[@metadata][_id]") } == "6"
59+
insist { event.get("[@metadata][action]") } == "update"
60+
insist { event.get("[@metadata][script]") } == "some_script_id"
61+
insist { event.get("[@metadata][script_type]") } == "indexed"
62+
insist { event.get("[@metadata][script_lang]") } == "js"
63+
insist { event.get("[@metadata][scripted_upsert]") } == true
64+
insist { event.get("[@metadata][upsert]") } == "{\"field2\":\"value7\"}"
65+
insist { event.get("[doc][field1]") } == "value6"
4066
end
4167
count += 1
4268
end
43-
insist { count } == 4
69+
insist { count } == 7
4470
end
4571
end
4672

0 commit comments

Comments
 (0)