Skip to content

Pipeline Input / Output #9225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

andrewvc
Copy link
Contributor

@andrewvc andrewvc commented Mar 12, 2018

Takes over from logstash-plugins/logstash-integration-internal#1 and is a superset of the changes in #9206 .

This PR is the full implementation of inter-pipeline communications in rough form.

The main goal here is to let users address complex problems where isolation and encapsulation are desired. Currently, Logstash pipelines are often times complected logical tasks leading to confusing config files. There are a number of cases where this feature is useful that are described below:

Use Cases / Patterns

The Distributor

It is often the case that users would like to use a single input listening on a single port, say, the Beats input, or the HTTP input, to handle different types of data in different ways.

Users will often wind up with a pipeline that looks like:

input { 
  beats {}
}

filter {
  if ([type] == "weblogs") {
    # do weblog stuff
  } else {
     # do generic stuff
  }
}

output {
  if ([type] == "weblogs") {
    elasticsearch {
      hosts => ["weblogs.local"]
    }
  } else {
    elasticsearch {
      hosts => ["generic.local"]
    }
  }
}

Note the constant repetition of conditionals. With larger configs, in the thousands of lines, this is quite confusing and hard to reason about. With this patch, we can simplify things considerably by breaking things up into distinct pipelines

# intake.conf
input {
  beats {}
}

output {
  if ([type] == "weblogs") {
    pipeline { send_to => weblogs }
  } else {
    pipeline { send_to generic_logs }
  }
}

# weblogs.conf
input {
  pipeline { address => weblogs }
}

output {
  elasticsearch { hosts => weblogs.local }
}


# generic.conf
input {
  pipeline { address => generic_logs }
}

output {
  elasticsearch { hosts => generic.local }
}

The Double Buffered Output

Sometimes users have one output, among many, output that has temporary outages. The Logstash pipeline strives not to lose data when an output is blocked, so it blocks and exerts backpressure. If you want to not have this behavior it's not possible today.

In the wild:

- pipeline.id: senderone
  config.string: "input { http {}  } filter { #lotsoflogic  } output { pipeline { send_to => [es, s3] } }"
  queue.type: persisted
- pipeline.id: sendertwo
  config.string: "input  { pipeline { address => es } } output { elasticsearch {} }"
  queue.type: persisted
- pipeline.id: out
  config.string: "input { pipeline { address => s3 } } output { s3 {}  }"
  queue.type: persisted

The Forked Output

Another common pattern is used to deal with the situation where one has multiple outputs that should receive different versions of the data.

In the wild: https://discuss.elastic.co/t/can-i-filter-the-output-data-separately/36064/5

Today these pipelines look like:

input {
  http { ... }
}

filter {
  clone { clones => ["toS3"] }
  if [type] == "toS3" {
    mutate { add_field => {"forS3only" => true}}
  } else {
    mutate { add_field => {"forESonly" => true}}
  }
}

output {
  if [type] == "toS3" {
    s3 {}
  } else {
    elasticsearch {}
  }
}

With this patch this can be improved to:

# distributor.conf
input {
  http { ... }
}

output {
  pipeline { send_to => [s3, es] }
}

# s3.conf

input {
  pipeline { address => s3 }
}

filter {
  mutate { add_field => {"forS3only" => true}}
}

output {
  s3 { ... }
}

# es.conf


input {
  pipeline { address => es }
}

filter {
  mutate { add_field => {"forEsOnly" => true}}
}

output {
  elasticesearch { ... }
}

Implementation

It includes three distinct changes:

  • Introduction of the pipeline input / output, which can be used to send messages across pipelines
  • Parallel lifecycle events for Pipeline lifecycle events, which is necessary to prevent deadlocks (This code is identical to Parallel lifecycle #9206)
  • Introduction of Copy on Write (CoW) semantics for Event#clone, since that is any time an Event crosses a pipeline boundary. Since changes to events must be isolated across pipelines, cheap copies are desirable.

The Challenge of CoW semantics

Edit: The latest version of this should behave correctly WRT CoW. We now copy values on Event#get. If a given root key on the ConvertedMap is marked CoW we will dereference that key and replace its value with a rubyified copy on ruby Event#get.

The CoW implementation here is only scoped to top-level keys in event data and metadata. If you change a sub-key in the Event, say [foo][bar] then the entirety of [foo] is copied. Since there is an additional overhead to preserving the CoW structure with an additional HashSet tracking which keys are CoW.

The CoW implementation in this PR is somewhat broken, though it does meet our API specs. According to our Event API docs, Event#get returns a read-only copy of the event. However, the below pattern is common, even in Logstash core:

arrVal = event.get("tags")
arrVal << "foo"
event.set(arrVal)

The returned value is not a read-only copy at all, neither is it a clone of the underlying data. That means that with our 'proper' CoW semantics, as implemented in this patch, the following breaks the CoW promise.

original = Event.new
copy = original.clone
arrVal = copy.get("tags")
arrVal << "foo"
copy.set(arrVal)
original.get("tags") # ["foo"]
copy.get("tags") # ["foo"]

The only practical workaround I can think for this is to make the cloned events CoWoR (copy on write or read), and have Event#get return a clone of the specified data. This really negates a lot of the benefits of CoW. We could have the compiled pipeline conditionals use a different Event#get that didn't make unnecessary copies, as well as things like add_tag that we control. Ultimately, the real problem here is that we return the original mutable datastructures. We could traverse the structures and return an unmodifiable variant but that adds overhead.

@kovyrin
Copy link

kovyrin commented Mar 14, 2018

Great idea! We have both the "single input, multiple outputs controlled by conditionals" and the "sending different versions to different outputs" cases in our pipeline and I'd love to be able to simplify those! 👍

@yaauie
Copy link
Member

yaauie commented Mar 16, 2018

I like the idea behind this a LOT. 🎉

  • The plugin name internal doesn't really ring true to me; is there a reason we don't call this pipeline?
input {
  pipeline {
    address => "foo"
  }
}
output {
  pipeline {
    send_to => ["foo", "bar"]
  }
}

I do wonder if we could somehow implement true-CoW using shallow wrappers of ImmutableRuby containers that "quacked like" their mutable counterparts. It'd be very very hard to catch all possible edge cases since ruby is ruby, especially in containers-of-containers scenarios, but maybe we could just coerce to those immutable collections for 7.0?

class COW::Hash < Delegator
  def initialize(hash)
    hash = Immutable::Hash.new(hash) unless hash.kind_of?(Immutable::Hash)
    @reference = hash
    # ...
  end

  def []=(key, value)
    sync do
      value = @reference.put(key, value)
      @reference = value unless value.__id__ == @reference.__id__
    end
  end

  # ...
end

@yaauie
Copy link
Member

yaauie commented Mar 16, 2018

@andrewvc what about performing a deep clone of the event any time we cross the pipeline boundary, at least until we can plan for true-cow or immutable structures? That would carry zero increase in cost for existing pipeline configurations, and only count against us when we're explicitly invoking this featureset.

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick first-pass at the underlying code and found most of it straight-forward (although I think I found a couple bugs or opportunities to squash future-bugs).

end
rescue SystemExit => e
converge_result.add(action, e)
rescue Exception => e
logger.error("Failed to execute action", :action => action, :exception => e.class.name, :message => e.message, :backtrace => e.backtrace)
converge_result.add(action, e)
end
var
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I don't think we need to emit var as the last item in the block, or to capture the output of the previous begin...end as var; neither is ever read.

end

def report_currently_running_pipelines(converge_result)
if converge_result.success? && converge_result.total > 0
with_running_pipelines do |pipelines|
running_pipelines do |pipelines|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we mean:

-      running_pipelines do |pipelines|
+      running_pipelines.each do |pipelines|

as-proposed, I don't believe that the block will ever get executed 😩

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, oops! This is a definite bug with the changed behavior. Good catch.

pipelines[pipeline_id] = pipeline # The pipeline is successfully started we can add it to the hash
status = nil
pipelines.compute(pipeline_id) do |id,value|
status = pipeline.start # block until the pipeline is correctly started or crashed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a value already exists for this pipeline_id in pipelines, should we error or emit a log message in some helpful way? We didn't before, but I could see this as a potential point where we could attempt to create two pipelines with the same id and get very unexpected results because we orphaned the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Theoretically, this shouldn't be possible because higher up, when we determine the actions to be run, we would do that check. Today that's safe because that code isn't concurrent. However, it may not be in the future.

I propose that we add an invariant that throws an exception if the pipeline does already exist, since that should never be the case.

return Create.new(@pipeline_config, @metric).execute(agent, pipelines)
else
return status
pipelines.compute(pipeline_id) do |k,pipeline|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: common convention in ruby when capturing a variable that will not be used is to simply bind it to a lone underscore (_):

-      pipelines.compute(pipeline_id) do |k,pipeline|
+      pipelines.compute(pipeline_id) do |_,pipeline|

if the pipeline does not exist in the map, we should log or error helpfully here because we've got serious problems

pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
pipeline.thread.join
pipelines.delete(pipeline_id)
pipelines.compute(pipeline_id) do |k,pipeline|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: in this case, since we are effectively deleting the pipeline in the compute block, this comment isn't particularly relevant.

I think in a lot of these cases we could use Map#computeIfPresent(K, Function) when we explicitly expect the item to be present, and use the return value to catch cases where the item wasn't there; this would allow us to catch issues here at this level instead of blindly passing a nil down to other places or sending a method to nil:

If we expect to have the pipeline in the map:

pipelines.computeIfPresent(pipeline_id) do |_,pipeline|
  # stuff
end || fail("Pipeline missing from registry: #{pipeline_id}")

OR:

pipeline = pipelines.computeIfPresent(pipeline_id) do |_,pipeline|
  # stuff
end
fail("Pipeline missing from registry: #{pipeline_id}") if pipeline.nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe, given your comment, that there's nothing to do here?

One point btw: all the CHM#compute* functions return the new value, so the second example wouldn't work. The code as is is buggy as you mention below, since this should always return nil.

}));
}

public static void special(Collection<String> addresses, InternalOutput output) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

special doesn't really tell me that this method unbinds the given output from the given addresses, and it may read clearer if we reverse the argument order:

-    public static void special(Collection<String> addresses, InternalOutput output) {
+    public static void unregisterSender(final InternalOutput output, final Collection<String> addresses) {

Also, if one or more of the given addresses is not registered in ADDRESS_STATES , this will NPE dereferencing state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is from when I was experimenting with that JRuby not finding the method issue. I renamed it at one point to see if the previous name was somehow breaking JRuby.

I should mention, this is a WIP, and the intent was to get feedback more on general direction rather than specific code.

This is obviously a garbage method of course.

public class Common {
public static class AddressState {
private final String address;
private final Set<InternalOutput> outputs = ConcurrentHashMap.newKeySet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we get a concurrent hashset , but don't seem to use any of the atomic methods it provides.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we share it across threads, hence the use of a threadsafe class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does deserve a comment however.

return addressesByRunState;
}

public static void registerSender(Collection<String> addresses, InternalOutput output) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: this would read clearer to me if the arguments were reversed; they can also be final:

-    public static void registerSender(Collection<String> addresses, InternalOutput output) {
+    public static void registerSender(final InternalOutput output, final Collection<String> addresses) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! I wound up at this weird order guessing at why JRuby couldn't find these methods. Good catch.

end

def close
# Tried to do this with a static method on Common, but somehow jruby kept saying there weren't
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very weird; I've occasionally had issues where IntelliJ loses track of the ruby/jruby bridge that required a restart, but would be interested to try to chase this down at least a little further before "leaking" these details over into the ruby side of things if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The really weird thing is that this happened at runtime too, not just in IntelliJ 🤷‍♀️

I'll try and figure this out before the PR is closed, one way or the other.

@send_to.each do |address|
org.logstash.plugins.internal.Common.ADDRESS_STATES.compute(address, proc {|a, state|
state.getOutputs.remove(self);
return state;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this return will return from the close method when attempting to unbind from the first address?

╭─{ yaauie@castrovel:~ }
╰─○ ruby -e 'def wrapper; %w(foo bar).each {|i| proc { return "oh:#{i}" }.call; puts "nope" }; puts "never"; end; puts wrapper'
oh:foo
[success]

I think this method should be:

  def close
    @send_to.each do |address|
      org.logstash.plugins.internal.Common.ADDRESS_STATES.compute(address, proc {|a, state|
        state.getOutputs.remove(self)
        state # put state back in the map
      end
      output.removeAddressReceiver(address);
    end
  end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, def a bug. That's what I get for copy/pasting from the original java where return did the right thing.

@andrewvc
Copy link
Contributor Author

@yaauie thanks for the feedback! I should have been more clear when I labeled this a WIP that the code wasn't really of review quality. Thanks for going through it regardless, and recommending a number of improvements :) . The feedback is much appreciated.

WRT the name, I'm down to switch to pipeline, I think that's a great suggestion.

WRT true CoW, I think we should make that a goal for 7.0. It would break too much stuff in 6.x of course. Unfortunately, that will be a breaking change. I think figuring out how to do that is tricky. I love immutable Ruby, but my preference there would be to use java types for both the Java API and ruby to remove the conversion cost of moving between them. That'd mean standardizing on Java types and leaving it to library authors to ruby-ify as needed. The good thing about java types is that String, Integer etc. are all immutable.

That'd be a big breaking change however. I'd want to hear from a number of stakeholders on that one. It'd be great if we could start that discussion however. If that's an area of particular interest to you I'd appreciate someone kicking off the thread.

@andrewvc
Copy link
Contributor Author

andrewvc commented Mar 17, 2018

I took a first stab at making the CoW become CoWR. It (should) work as is?

Needs many tests plus benchmarking to see if this makes sense from a perf standpoint.

@andrewvc andrewvc force-pushed the internal-pipelines-plus branch from ae7f84f to ad18909 Compare March 21, 2018 14:38
@pemontto
Copy link

Will the pipeline send_to property be able use event variables?

@andrewvc
Copy link
Contributor Author

@pemontto that is currently not implemented, but potentially could be. Can you share an example config of how you'd use that?

I was thinking people would have a small number of pipelines, and for that situation `if`` statements would suffice. Would love to hear more about your use case however.

@pemontto
Copy link

@andrewvc we have an common enrichment "pipeline" we'd like to share across numerous pipelines mainly so they can share a cache (well at least the worker threads can), and then we'd like to fan back out to discrete output pipelines based on certain criteria in the event.

@andrewvc
Copy link
Contributor Author

@pemontto would you be able to accomplish that goal with if statements? If so, would it be materially worse?

We can add that feature, but it does have performance implications. It also has UX implications. What would happen if the variable referenced a pipeline that did not exist? Would we block for it? Drop it? If we define behavior there, how do we discern between a reloading pipeline and one that flat out does not exist?

@pemontto
Copy link

@andrewvc just wanted to raise it for discussion, though I can see it introducing many more failure modes. Can most definitely be done with if statements in my scenario.

@andrewvc
Copy link
Contributor Author

@pemontto I do think it's a great idea in many ways from a UX standpoint, but it seems like maybe it shouldn't be in the initial release. We can definitely add that feature from there.

Thanks for taking the time to give your feedback :)

@andrewvc andrewvc force-pushed the internal-pipelines-plus branch 2 times, most recently from 0389c60 to a2d81eb Compare March 27, 2018 23:56
@andrewvc andrewvc force-pushed the internal-pipelines-plus branch from a2d81eb to e8e1aba Compare April 4, 2018 15:54
@@ -17,7 +17,7 @@
require 'rack/test'
require 'rspec'
require "json"

require 'logstash/runner'
Copy link
Contributor Author

@andrewvc andrewvc Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that all our specs require random bits of LS, and potentially dangerous for our testing as well. This just has us slurp the whole thing in.

@andrewvc andrewvc changed the title [WIP] Internal Pipelines Internal Pipelines Apr 4, 2018
@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 4, 2018

This is (pretty much) out of WIP. I still need to get a qa integration test passing. The code should be good however.

@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 4, 2018

This will also need docs at some point, (preferable before merge CC @karenzone ).

I think I'd like to get a code / feature review in first to make sure there are no changes there before we document things.

@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 5, 2018

jenkins, please test this

@andrewvc andrewvc force-pushed the internal-pipelines-plus branch from c79237f to dd2589f Compare April 5, 2018 16:08
@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 6, 2018

jenkins, please retest this

@@ -0,0 +1,166 @@
package org.logstash.plugins.pipeline;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split tests across here and some rspec tests. The rspec tests are more comprehensive in terms of functionality.

@andrewvc
Copy link
Contributor Author

andrewvc commented Apr 7, 2018

Build is finally green, woohoo!

This still needs docs, but aside from that is good for review (again) @yaauie

@andrewvc andrewvc changed the title Internal Pipelines Pipeline Input / Output Apr 10, 2018
@yaauie
Copy link
Member

yaauie commented Apr 10, 2018

I did a s/internal/pipeline on the main issue comment to reflect the changes you made in naming

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, LGTM and this is going to be an amazing addition to the Logstash toolset.

I left a note on atomicity, asked for a minor clarification in docs, and found a single comment that is made irrelevant by this changeset, but am OK with a merge without further review from me.

@@ -39,9 +39,11 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@auto_reload = setting("config.reload.automatic")
@ephemeral_id = SecureRandom.uuid

# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new

# Do not use @pipelines directly. Use #with_pipelines which does locking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment no longer true, since this diff deletes with_pipelines

@@ -161,13 +152,17 @@ def converge_state_and_update
converge_result = nil

# we don't use the variable here, but we want the locking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment applied to the variable yielded to the with_pipelines block, and is no longer relevant

===== Delivery Guarantees
In its standard configuration the `pipeline` input/output have at-least-once delivery guarantees. The output wil block if the address is unavailable or blocked.

By default, the `ensure_delivery` option on the `pipeline` output is set to `true. If the `ensure_delivery` flag is set to `false`, an unavailable downstream pipeline will cause the sent message to be discarded. A blocked downstream pipeline will block the sending output/pipeline regardless of the value of the `ensure_delivery` flag.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: how do we handle pipeline "reloads" when ensure_delivery: false? is the restarting pipeline considered blocked or unavailable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, its considered to be unavailable, so you'll lose messages. I do wonder if we should keep ensure_delivery. I feel like maybe it's more useful if it drops messages when blocked too, but could be confusing in its current limited incarnation?

We need timeouts to have it drop on blocked pipelines. But, if we add that feature iteratively we'd need another option on top of ensure_delivery, like drop_after_timeout.

WDYT? Do we keep it and have a dual option later? I feel like letting users configure a timeout is too fiddly, we probably just want an option that acts like UDP. If there's even a whiff of trouble we just drop processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking through it further, I'm OK with ensure_delivery. People may actually want to configure the timeout separately. I'm OK with those controls being a little weird but more precise.

mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
----
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we may want to add a "collector" pattern (the opposite of the distributor pattern, in many senses)

  • disparate input, pipelines to normalize
  • single unified output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll add that :)

public class AddressState {
private final String address;
private final Set<PipelineOutput> outputs = ConcurrentHashMap.newKeySet();
private volatile PipelineInput input = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 @andrewvc you're not leaving me much room for nitpicks. Excellent use of volatile.

* @param newInput
* @return true if successful, false if another input is listening
*/
public boolean assignInputIfMissing(PipelineInput newInput) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on atomicity: can we guarantee that two threads won't interleave calls to AddressState#assignInputIfMissing() and AddressState#unassignInput(), is it worth marking these two methods as synchronized? call-frequency of both should be ultra-low, so sync overhead would be minimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! This should definitely be synchronized. We cannot make that guarantee.

@andrewvc andrewvc force-pushed the internal-pipelines-plus branch from 848ef52 to 93dc219 Compare April 10, 2018 22:27
This also makes the load / reload cycle of pipelines threadsafe
and concurrent in the Agent class.
@andrewvc andrewvc force-pushed the internal-pipelines-plus branch from a965cbf to 8e98911 Compare April 10, 2018 23:29
@elasticsearch-bot
Copy link

Andrew Cholakian merged this into the following branches!

Branch Commits
master a1c0e41, 181000c

andrewvc added a commit to andrewvc/logstash that referenced this pull request Apr 11, 2018
This also makes the load / reload cycle of pipelines threadsafe
and concurrent in the Agent class.

Fixes elastic#9225
andrewvc added a commit to andrewvc/logstash that referenced this pull request Apr 11, 2018
andrewvc added a commit to andrewvc/logstash that referenced this pull request Apr 11, 2018
elasticsearch-bot pushed a commit that referenced this pull request Apr 11, 2018
This also makes the load / reload cycle of pipelines threadsafe
and concurrent in the Agent class.

Fixes #9225

Fixes #9352
@andrewvc andrewvc mentioned this pull request Apr 11, 2018
@robcowart
Copy link

robcowart commented May 14, 2018

@andrewvc I am curious what performance/scale testing was performed? I am not talking about just this feature in isolation. This feature has architectural ramifications that would necessitate end-to-end testing to understand where it can be deployed vs. alternatives.

Background... the current release of our solution includes 37 pipelines (and this number will grow) that can either be deployed all on a single node or distributed as needed. Redis is used for intra-pipeline communications, and can be replaced between major tiers by Kafka when scale or other requirements so dictate. It is very much a "microservices-like" pattern for deploying Logstash logic, much the same as this new feature seeks to address.

It seems that with this new feature persistent queues are a necessity to catch back pressure-related queueing, especially when downstream pipelines are unavailable. With any UDP use-case (syslog, netflow, snmp traps etc) things that slow down throughput are your enemy, because ultimately they lead to dropped packets and lost data. I am concerned that introducing persistent queues also introduces a disk I/O latency bottleneck that will be more harmful than helpful (not to mention that it completely changes the hardware requirements for a logstash instance).

What I think is important to know is how these scenarios compare, with the key metric being which results in the least lost UDP packets as event rates increase...

udp input --> memory queue --> pipeline --> local redis --> next pipeline
udp input --> persistent queue --> pipeline --> pipeline output --> next pipeline

Barring this one concern I could see this being a very valuable feature. I wouldn't really have concerns if the Logstash memory queue was configurable, but it isn't. It will definitely be important to understand the scenarios where this might have advantages or disadvantages.

So if there has been any internal benchmarking/scale testing that you could share? It would save us the time of doing this ourselves.

@andrewvc
Copy link
Contributor Author

@robcowart great questions:

So, this feature is not a blanket substitute for anything else. Redis/kafka may still be the right solution for you, but I do suspect that some of those Redis/kafka instances could be replaced.

This is far more efficient than any external queue because there is no serialization/deserialization penalty. However, the buffering semantics are not as rich as those you get from external queues. You're limited by either the amount of heap if using an MQ, or the amount of disk with the PQ. And, of course, you're limited to single machine reliability with either of those.

WRT the UDP example, that's a big "it depends". Generally, buffering to disk with the PQ is preferred because there's less risk that the filter section could block and cause packets to drop unless such a block is long lived causing the PQ to fill.

I'm not certain why one scenario is MQ+Redis, and the other is PQ +PipelineOutput, I think you could mix and match either way. I would say, however, that the pipeline output / PQ version should work pretty well.

One caveat, the PQ isn't always as fast to write/read as other queues, something we're working on. This should be able to be optimized specially for inter-pipeline comms with #9498 , which in my local tests boosted PQ speeds by 33%.

One thing you didn't mention, BTW, is what queue the next pipeline is using. Each MQ you use has a minimal cost, but PQs can be quite a bit more expensive.

At the end of the day, benchmarking your specific setup is the best thing to do (I know you knew I was going to say that :) ). If you do get some numbers I'd love to see them! Please share them either here, or on a discuss post if possible!

@robcowart
Copy link

No worries... it was worth checking.

I have been spending a lot of time working on throughput optimization specifically because of the need to clear UDP receive buffers as quickly as possible to avoid dropped packets. While tweaking things like kernel buffers, process priorities, queue and batch sizes all make a difference, the most important thing remains grabbing the data and pushing it to the next stage as efficiently as possible. We were able to get one customer from 85% packet loss, down to 0.05%. However at 10K eps that is still 5 lost messages every second, or 432K per day.

I will have to add the benchmarking work we still have planned. Given a three tier architecture with... collect --> process --> index, I suspect that I will still have a redis/kafka between each tier. However within the processing tier using the pipeline input/output could make a lot of sense.

@redbaron4
Copy link

@andrewvc This is a great feature. We normally use multiple pipelines to separate out outputs that are prone to disruption (managed by another team) such as WebHDFS. Looking at the docs, it seems that ensure_delivery: False will allow upstream pipeline to continue working if downstream pipeline is not available but not in case downstream pipeline is blocked. Do you plan to cover the latter case as well or is it intentional to exclude blocked pipelines from this behaviour?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants