Skip to content

Commit b9953b6

Browse files
authored
Fix: LS failing with ssl_peer_metadata => true (#431)
1 parent 68e4878 commit b9953b6

File tree

6 files changed

+105
-27
lines changed

6 files changed

+105
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
## x.y.z
1+
## 6.2.1
2+
- Fix: LS failing with `ssl_peer_metadata => true` [#431](https://github.com/logstash-plugins/logstash-input-beats/pull/431)
23
- [DOC] described `executor_threads` configuration parameter [#421](https://github.com/logstash-plugins/logstash-input-beats/pull/421)
34

45
## 6.2.0

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.2.0
1+
6.2.1

lib/logstash/inputs/beats.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base
129129
config :executor_threads, :validate => :number, :default => LogStash::Config::CpuCoreStrategy.maximum
130130

131131
attr_reader :field_hostname, :field_hostip
132+
attr_reader :field_tls_protocol_version, :field_tls_peer_subject, :field_tls_cipher
132133

133134
def register
134135
# For Logstash 2.4 we need to make sure that the logger is correctly set for the
@@ -167,10 +168,10 @@ def register
167168

168169
# define ecs name mapping
169170
@field_hostname = ecs_select[disabled: "host", v1: "[@metadata][input][beats][host][name]"]
170-
@field_hostip = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][beats][host][ip]"]
171-
@field_tls_protocol_version = ecs_select[disabled: "[@metadata][tls_peer][protocol]", v1: "[@metadata][input][beats][tls][version_protocol]"]
172-
@field_tls_peer_subject = ecs_select[disabled: "[@metadata][tls_peer][subject]", v1: "[@metadata][input][beats][tls][client][subject]"]
173-
@field_tls_cipher = ecs_select[disabled: "[@metadata][tls_peer][cipher_suite]", v1: "[@metadata][input][beats][tls][cipher]"]
171+
@field_hostip = ecs_select[disabled: "[@metadata][ip_address]", v1: "[@metadata][input][beats][host][ip]"]
172+
@field_tls_protocol_version = ecs_select[disabled: "[@metadata][tls_peer][protocol]", v1: "[@metadata][input][beats][tls][version_protocol]"]
173+
@field_tls_peer_subject = ecs_select[disabled: "[@metadata][tls_peer][subject]", v1: "[@metadata][input][beats][tls][client][subject]"]
174+
@field_tls_cipher = ecs_select[disabled: "[@metadata][tls_peer][cipher_suite]", v1: "[@metadata][input][beats][tls][cipher]"]
174175

175176
@logger.info("Starting input listener", :address => "#{@host}:#{@port}")
176177

lib/logstash/inputs/beats/message_listener.rb

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def extract_tls_peer(hash, ctx)
132132
tls_session = ctx.channel().pipeline().get("ssl-handler").engine().getSession()
133133
tls_verified = true
134134

135-
if not @input.client_authentication_required?
135+
unless @input.client_authentication_required?
136136
# throws SSLPeerUnverifiedException if unverified
137137
begin
138138
tls_session.getPeerCertificates()
@@ -144,18 +144,16 @@ def extract_tls_peer(hash, ctx)
144144
end
145145
end
146146

147+
meta_data = hash['@metadata'] ||= {}
148+
147149
if tls_verified
148-
set_nested(hash, @field_tls_protocol_version, tls_session.getProtocol())
149-
set_nested(hash, @field_tls_peer_subject, tls_session.getPeerPrincipal().getName())
150-
set_nested(hash, @field_tls_cipher, tls_session.getCipherSuite())
150+
meta_data['tls_peer'] = { :status => "verified" }
151151

152-
hash['@metadata']['tls_peer'] = {
153-
:status => "verified"
154-
}
152+
set_nested(hash, input.field_tls_protocol_version, tls_session.getProtocol())
153+
set_nested(hash, input.field_tls_peer_subject, tls_session.getPeerPrincipal().getName())
154+
set_nested(hash, input.field_tls_cipher, tls_session.getCipherSuite())
155155
else
156-
hash['@metadata']['tls_peer'] = {
157-
:status => "unverified"
158-
}
156+
meta_data['tls_peer'] = { :status => "unverified" }
159157
end
160158
end
161159
end
@@ -166,9 +164,6 @@ def set_nested(hash, field_name, value)
166164
field_ref = Java::OrgLogstash::FieldReference.from(field_name)
167165
# create @metadata sub-hash if needed
168166
if field_ref.type == Java::OrgLogstash::FieldReference::META_CHILD
169-
unless hash.key?("@metadata")
170-
hash["@metadata"] = {}
171-
end
172167
nesting_hash = hash["@metadata"]
173168
else
174169
nesting_hash = hash

spec/inputs/beats_spec.rb

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,28 @@
1212
let(:connection) { double("connection") }
1313
let(:certificate) { BeatsInputTest.certificate }
1414
let(:port) { BeatsInputTest.random_port }
15+
let(:client_inactivity_timeout) { 400 }
16+
let(:threads) { 1 + rand(9) }
1517
let(:queue) { Queue.new }
1618
let(:config) do
1719
{
18-
"port" => 0,
20+
"port" => port,
1921
"ssl_certificate" => certificate.ssl_cert,
2022
"ssl_key" => certificate.ssl_key,
23+
"client_inactivity_timeout" => client_inactivity_timeout,
24+
"executor_threads" => threads,
2125
"type" => "example",
2226
"tags" => "beats"
2327
}
2428
end
2529

30+
subject(:plugin) { LogStash::Inputs::Beats.new(config) }
31+
2632
context "#register" do
2733
context "host related configuration" do
28-
let(:config) { super().merge("host" => host, "port" => port, "client_inactivity_timeout" => client_inactivity_timeout, "executor_threads" => threads) }
34+
let(:config) { super().merge("host" => host, "port" => port) }
2935
let(:host) { "192.168.1.20" }
30-
let(:port) { 9000 }
31-
let(:client_inactivity_timeout) { 400 }
32-
let(:threads) { 10 }
33-
34-
subject(:plugin) { LogStash::Inputs::Beats.new(config) }
36+
let(:port) { 9001 }
3537

3638
it "sends the required options to the server" do
3739
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads)
@@ -158,9 +160,80 @@
158160

159161
it "raise a ConfigurationError when multiline codec is set" do
160162
plugin = LogStash::Inputs::Beats.new(config)
161-
expect {plugin.register}.to raise_error(LogStash::ConfigurationError, "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html")
163+
expect { plugin.register }.to raise_error(LogStash::ConfigurationError, "Multiline codec with beats input is not supported. Please refer to the beats documentation for how to best manage multiline data. See https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html")
164+
end
165+
end
166+
end
167+
168+
context "tls meta-data" do
169+
let(:config) { super().merge("host" => host, "ssl_peer_metadata" => true, "ssl_certificate_authorities" => [ certificate.ssl_cert ]) }
170+
let(:host) { "192.168.1.20" }
171+
let(:port) { 9002 }
172+
173+
let(:queue) { Queue.new }
174+
let(:event) { LogStash::Event.new }
175+
176+
subject(:plugin) { LogStash::Inputs::Beats.new(config) }
177+
178+
before do
179+
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads)
180+
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads).and_return @server
181+
expect( @server ).to receive(:listen)
182+
183+
subject.register
184+
subject.run(queue) # listen does nothing
185+
@message_listener = @server.getMessageListener
186+
187+
allow( ssl_engine = double('ssl_engine') ).to receive(:getSession).and_return ssl_session
188+
allow( ssl_handler = double('ssl-handler') ).to receive(:engine).and_return ssl_engine
189+
allow( pipeline = double('pipeline') ).to receive(:get).and_return ssl_handler
190+
allow( @channel = double('channel') ).to receive(:pipeline).and_return pipeline
191+
end
192+
193+
let(:ctx) do
194+
Java::io.netty.channel.ChannelHandlerContext.impl do |method, *args|
195+
fail("unexpected #{method}( #{args} )") unless method.eql?(:channel)
196+
@channel
162197
end
163198
end
199+
200+
let(:ssl_session) do
201+
Java::javax.net.ssl.SSLSession.impl do |method, *args|
202+
case method
203+
when :getPeerCertificates
204+
[].to_java(java.security.cert.Certificate)
205+
when :getProtocol
206+
'TLS-Mock'
207+
when :getCipherSuite
208+
'SSL_NULL_WITH_TEST_SPEC'
209+
when :getPeerPrincipal
210+
javax.security.auth.x500.X500Principal.new('CN=TEST, OU=RSpec, O=Logstash, C=NL', {})
211+
else
212+
fail("unexpected #{method}( #{args} )")
213+
end
214+
end
215+
end
216+
217+
let(:ssl_session_peer_principal) do
218+
javax.security.auth.x500.X500Principal
219+
end
220+
221+
let(:message) do
222+
org.logstash.beats.Message.new(0, java.util.HashMap.new('foo' => 'bar'))
223+
end
224+
225+
it 'sets tls fields' do
226+
@message_listener.onNewMessage(ctx, message)
227+
228+
expect( queue.size ).to be 1
229+
expect( event = queue.pop ).to be_a LogStash::Event
230+
231+
expect( event.get('[@metadata][tls_peer][status]') ).to eql 'verified'
232+
233+
expect( event.get('[@metadata][tls_peer][protocol]') ).to eql 'TLS-Mock'
234+
expect( event.get('[@metadata][tls_peer][cipher_suite]') ).to eql 'SSL_NULL_WITH_TEST_SPEC'
235+
expect( event.get('[@metadata][tls_peer][subject]') ).to eql 'CN=TEST,OU=RSpec,O=Logstash,C=NL'
236+
end
164237
end
165238

166239
context "when interrupting the plugin" do

src/main/java/org/logstash/beats/Server.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ private void shutdown(){
8585
}
8686
}
8787

88+
/**
89+
* @note used in tests
90+
* @return the message listener
91+
*/
92+
public IMessageListener getMessageListener() {
93+
return messageListener;
94+
}
95+
8896
public void setMessageListener(IMessageListener listener) {
8997
messageListener = listener;
9098
}

0 commit comments

Comments
 (0)