Skip to content

Commit 9be8dc2

Browse files
yaauiekarenzone
andauthored
Add ECS Compatibility mode (#291)
* ecs: add ECS Compatibility mode * Apply suggestions from code review Co-authored-by: Karen Metts <[email protected]> Co-authored-by: Karen Metts <[email protected]>
1 parent 51a88e1 commit 9be8dc2

File tree

6 files changed

+105
-31
lines changed

6 files changed

+105
-31
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.3.0
2+
- Add ECS Compatibility Mode [#291](https://github.com/logstash-plugins/logstash-input-file/pull/291)
3+
14
## 4.2.4
25
- Fix: sincedb_write issue on Windows machines [#283](https://github.com/logstash-plugins/logstash-input-file/pull/283)
36

docs/index.asciidoc

+30
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ Read mode also allows for an action to take place after processing the file comp
7878
In the past attempts to simulate a Read mode while still assuming infinite streams
7979
was not ideal and a dedicated Read mode is an improvement.
8080

81+
[id="plugins-{type}s-{plugin}-ecs"]
82+
==== Compatibility with the Elastic Common Schema (ECS)
83+
84+
This plugin adds metadata about event's source, and can be configured to do so
85+
in an {ecs-ref}[ECS-compatible] way with <<plugins-{type}s-{plugin}-ecs_compatibility>>.
86+
This metadata is added after the event has been decoded by the appropriate codec,
87+
and will never overwrite existing values.
88+
89+
|========
90+
| ECS Disabled | ECS v1 | Description
91+
92+
| `host` | `[host][name]` | The name of the {ls} host that processed the event
93+
| `path` | `[log][file][path]` | The full path to the log file from which the event originates
94+
|========
95+
8196
==== Tracking of current position in watched files
8297

8398
The plugin keeps track of the current position in each file by
@@ -168,6 +183,7 @@ see <<plugins-{type}s-{plugin}-string_duration,string_duration>> for the details
168183
| <<plugins-{type}s-{plugin}-close_older>> |<<number,number>> or <<plugins-{type}s-{plugin}-string_duration,string_duration>>|No
169184
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
170185
| <<plugins-{type}s-{plugin}-discover_interval>> |<<number,number>>|No
186+
| <<plugins-{type}s-{plugin}-ecs_compatibility>> |<<string,string>>|No
171187
| <<plugins-{type}s-{plugin}-exclude>> |<<array,array>>|No
172188
| <<plugins-{type}s-{plugin}-exit_after_read>> |<<boolean,boolean>>|No
173189
| <<plugins-{type}s-{plugin}-file_chunk_count>> |<<number,number>>|No
@@ -242,6 +258,20 @@ This value is a multiple to `stat_interval`, e.g. if `stat_interval` is "500 ms"
242258
files could be discovered every 15 X 500 milliseconds - 7.5 seconds.
243259
In practice, this will be the best case because the time taken to read new content needs to be factored in.
244260

261+
[id="plugins-{type}s-{plugin}-ecs_compatibility"]
262+
===== `ecs_compatibility`
263+
264+
* Value type is <<string,string>>
265+
* Supported values are:
266+
** `disabled`: sets non-ECS metadata on event (such as top-level `host`, `path`)
267+
** `v1`: sets ECS-compatible metadata on event (such as `[host][name]`, `[log][file][path]`)
268+
* Default value depends on which version of Logstash is running:
269+
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
270+
** Otherwise, the default value is `disabled`.
271+
272+
Controls this plugin's compatibility with the
273+
{ecs-ref}[Elastic Common Schema (ECS)].
274+
245275
[id="plugins-{type}s-{plugin}-exclude"]
246276
===== `exclude`
247277

lib/logstash/inputs/file.rb

+22-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require "logstash/namespace"
33
require "logstash/inputs/base"
44
require "logstash/codecs/identity_map_codec"
5+
require 'logstash/plugin_mixins/ecs_compatibility_support'
56

67
require "pathname"
78
require "socket" # for Socket.gethostname
@@ -88,6 +89,8 @@ module LogStash module Inputs
8889
class File < LogStash::Inputs::Base
8990
config_name "file"
9091

92+
include PluginMixins::ECSCompatibilitySupport(:disabled, :v1)
93+
9194
# The path(s) to the file(s) to use as an input.
9295
# You can use filename patterns here, such as `/var/log/*.log`.
9396
# If you use a pattern like `/var/log/**/*.log`, a recursive search
@@ -325,6 +328,9 @@ def register
325328
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
326329
@completely_stopped = Concurrent::AtomicBoolean.new
327330
@queue = Concurrent::AtomicReference.new
331+
332+
@source_host_field = ecs_select[disabled: 'host', v1:'[host][name]']
333+
@source_path_field = ecs_select[disabled: 'path', v1:'[log][file][path]']
328334
end # def register
329335

330336
def completely_stopped?
@@ -369,7 +375,11 @@ def run(queue)
369375

370376
def post_process_this(event)
371377
event.set("[@metadata][host]", @host)
372-
event.set("host", @host) unless event.include?("host")
378+
attempt_set(event, @source_host_field, @host)
379+
380+
source_path = event.get('[@metadata][path]') and
381+
attempt_set(event, @source_path_field, source_path)
382+
373383
decorate(event)
374384
@queue.get << event
375385
end
@@ -407,6 +417,17 @@ def build_sincedb_base_from_settings(settings)
407417
end
408418
end
409419

420+
# Attempt to set an event's field to the provided value
421+
# without overwriting an existing value or producing an error
422+
def attempt_set(event, field_reference, value)
423+
return false if event.include?(field_reference)
424+
425+
event.set(field_reference, value)
426+
rescue => e
427+
logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message)
428+
false
429+
end
430+
410431
def build_sincedb_base_from_env
411432
# This section is going to be deprecated eventually, as path.data will be
412433
# the default, not an environment variable (SINCEDB_DIR or LOGSTASH_HOME)

lib/logstash/inputs/file_listener.rb

-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def accept(data)
4141

4242
def process_event(event)
4343
event.set("[@metadata][path]", path)
44-
event.set("path", path) unless event.include?("path")
4544
input.post_process_this(event)
4645
end
4746

logstash-input-file.gemspec

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.2.4'
4+
s.version = '4.3.0'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -33,6 +33,7 @@ Gem::Specification.new do |s|
3333

3434
s.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
3535
s.add_runtime_dependency 'logstash-codec-multiline', ['~> 3.0']
36+
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.1'
3637

3738
s.add_development_dependency 'stud', ['~> 0.0.19']
3839
s.add_development_dependency 'logstash-devutils'

spec/inputs/file_tail_spec.rb

+48-28
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
require "helpers/spec_helper"
44
require "logstash/devutils/rspec/shared_examples"
55
require "logstash/inputs/file"
6+
require "logstash/plugin_mixins/ecs_compatibility_support/spec_helper"
67

8+
require "json"
79
require "tempfile"
810
require "stud/temporary"
911
require "logstash/codecs/multiline"
@@ -99,41 +101,59 @@
99101
end
100102
end
101103

102-
context "when path and host fields exist" do
103-
let(:name) { "C" }
104-
it "should not overwrite them" do
105-
conf = <<-CONFIG
106-
input {
107-
file {
108-
type => "blah"
109-
path => "#{path_path}"
110-
start_position => "beginning"
111-
sincedb_path => "#{sincedb_path}"
112-
delimiter => "#{TEST_FILE_DELIMITER}"
113-
codec => "json"
114-
}
115-
}
116-
CONFIG
117104

118-
File.open(tmpfile_path, "w") do |fd|
119-
fd.puts('{"path": "my_path", "host": "my_host"}')
120-
fd.puts('{"my_field": "my_val"}')
121-
fd.fsync
105+
context "when path and host fields exist", :ecs_compatibility_support do
106+
ecs_compatibility_matrix(:disabled, :v1) do |ecs_select|
107+
108+
before(:each) do
109+
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
122110
end
123111

124-
events = input(conf) do |pipeline, queue|
125-
2.times.collect { queue.pop }
112+
let(:file_path_target_field ) { ecs_select[disabled: "path", v1: '[log][file][path]'] }
113+
let(:source_host_target_field) { ecs_select[disabled: "host", v1: '[host][name]'] }
114+
115+
let(:event_with_existing) do
116+
LogStash::Event.new.tap do |e|
117+
e.set(file_path_target_field, 'my_path')
118+
e.set(source_host_target_field, 'my_host')
119+
end.to_hash
126120
end
127121

128-
existing_path_index, added_path_index = "my_val" == events[0].get("my_field") ? [1,0] : [0,1]
122+
let(:name) { "C" }
123+
it "should not overwrite them" do
124+
conf = <<-CONFIG
125+
input {
126+
file {
127+
type => "blah"
128+
path => "#{path_path}"
129+
start_position => "beginning"
130+
sincedb_path => "#{sincedb_path}"
131+
delimiter => "#{TEST_FILE_DELIMITER}"
132+
codec => "json"
133+
}
134+
}
135+
CONFIG
129136

130-
expect(events[existing_path_index].get("path")).to eq "my_path"
131-
expect(events[existing_path_index].get("host")).to eq "my_host"
132-
expect(events[existing_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
137+
File.open(tmpfile_path, "w") do |fd|
138+
fd.puts(event_with_existing.to_json)
139+
fd.puts('{"my_field": "my_val"}')
140+
fd.fsync
141+
end
133142

134-
expect(events[added_path_index].get("path")).to eq "#{tmpfile_path}"
135-
expect(events[added_path_index].get("host")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
136-
expect(events[added_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
143+
events = input(conf) do |pipeline, queue|
144+
2.times.collect { queue.pop }
145+
end
146+
147+
existing_path_index, added_path_index = "my_val" == events[0].get("my_field") ? [1,0] : [0,1]
148+
149+
expect(events[existing_path_index].get(file_path_target_field)).to eq "my_path"
150+
expect(events[existing_path_index].get(source_host_target_field)).to eq "my_host"
151+
expect(events[existing_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
152+
153+
expect(events[added_path_index].get(file_path_target_field)).to eq "#{tmpfile_path}"
154+
expect(events[added_path_index].get(source_host_target_field)).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
155+
expect(events[added_path_index].get("[@metadata][host]")).to eq "#{Socket.gethostname.force_encoding(Encoding::UTF_8)}"
156+
end
137157
end
138158
end
139159

0 commit comments

Comments
 (0)