-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Pipeline Input / Output #9225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Pipeline Input / Output #9225
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
[[pipeline-to-pipeline]] | ||
=== Configuring Pipeline-to-Pipeline Communication | ||
|
||
When using the multiple pipeline feature of Logstash you may want to connect multiple pipelines on the same Logstash instance together. This can be useful to isolate the execution of these pipelines, as well as to help break-up the logic of complex pipelines. The `pipeline` input / output enables a number of advanced architectural patterns discussed later in this document. | ||
|
||
Where communication is needed between Logstash instances you will need to use either {logstash-ref}/ls-to-ls.html[Logstash-to-Logstash] communications, or an intermediary queue, such as Kafka or Redis. | ||
|
||
[[pipeline-to-pipeline-overview]] | ||
==== Configuration overview | ||
|
||
Use the `pipeline` input and `pipeline` output to connect two Logstash pipelines running within the same instance. These inputs use a client / server approach, where the `pipeline` input registers a virtual address that a `pipeline` output can connect to. | ||
|
||
. Create a 'downstream' pipeline that listens for events on a virtual address. | ||
. Create an 'upstream' pipeline that produces events, sending them through a `pipeline` output to one or more virtual addresses | ||
|
||
A simple example of this configuration can be seen in the below example. | ||
|
||
[source,yaml] | ||
---- | ||
# config/pipelines.yml | ||
- pipeline.id: upstream | ||
config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } } | ||
- pipeline.id: downstream | ||
config.string: input { pipeline { address => myVirtualAddress } } | ||
---- | ||
|
||
[[how-it-works]] | ||
===== How it works | ||
|
||
The `pipeline` input acts as a virtual server listening on a single virtual address in the local process. Only `pipeline` outputs running on the same local Logstash can send events to this address. Pipeline `outputs` can send events to a list of virtual addresses. A `pipeline` output will block if the downstream pipeline is either unavailable or blocked. | ||
|
||
When events are sent across pipelines their data is fully copied. Modifications to an event in a downstream pipeline will not affect any other pipelines that event may be used within. | ||
|
||
Copying events does, however incur a performance cost. While the `pipeline` plugin may be the most efficient way to communicate between pipelines it still does incur a cost. Logstash must duplicate each event in full on the Java heap for each downstream pipeline a `pipeline` output sends to. Beware that using this feature may affect the heap memory utilization of Logstash. | ||
|
||
[[delivery-guarantees]] | ||
===== Delivery Guarantees | ||
In its standard configuration the `pipeline` input/output have at-least-once delivery guarantees. The output wil block if the address is unavailable or blocked. | ||
|
||
By default, the `ensure_delivery` option on the `pipeline` output is set to `true. If the `ensure_delivery` flag is set to `false`, an unavailable downstream pipeline will cause the sent message to be discarded. A blocked downstream pipeline will block the sending output/pipeline regardless of the value of the `ensure_delivery` flag. | ||
|
||
[[avoid-cycles]] | ||
===== Avoid cycles | ||
|
||
It is important when connecting pipelines that the data only flow in one direction. Looping data back around, or connecting the pipelines into a cyclic graph, can cause problems. Logstash waits for each pipeline's work to complete before shutting down. If the pipelines loop data between them this can prevent Logstash from cleanly shutting down. | ||
|
||
[[architectural-patterns]] | ||
==== Architectural patterns | ||
|
||
You can use the `pipeline` input and output to better organize code, streamline control flow, and isolate the performance of complex configurations. There are an infinite number of ways to connect pipelines. The ones presented here are hardly comprehensive. | ||
|
||
* <<distributor-pattern>> | ||
* <<output-isolator-pattern>> | ||
* <<forked-path-pattern>> | ||
|
||
[[distributor-pattern]] | ||
====== The distributor pattern | ||
|
||
The Distributor pattern is used in situations where there are multiple types of data coming through a single input, each with its own complex set of processing rules. With the distributor pattern one pipeline is used to route data to other pipelines based on type. Each type is routed to a pipeline with only the logic for handling that type. In this way each type's logic can be isolated. | ||
|
||
As an example, in many organizations a single beats input may be used to receive traffic from a variety of sources, each with its own processing logic. A common way of dealing with this type of data is to have a number of `if` conditions separating the traffic and processing each type differently. This approach can quickly get messy when configs are long and complex. | ||
|
||
An example distributor configuration is listed below: | ||
|
||
[source,yaml] | ||
---- | ||
# config/pipelines.yml | ||
- pipeline.id: beats-server | ||
config.string: | | ||
input { beats { port => 5044 } } | ||
output { | ||
if [type] == apache { | ||
pipeline { send_to => weblogs } | ||
} else if [type] == system { | ||
pipeline { send_to => syslog } | ||
} else { | ||
pipeline { send_to => fallback } | ||
} | ||
} | ||
- pipeline.id: weblog-processing | ||
config.string: | | ||
input { pipeline { address => weblogs } } | ||
filter { | ||
# Weblog filter statements here... | ||
} | ||
output { | ||
elasticsearch { hosts => [es_cluster_a_host] } | ||
} | ||
- pipeline.id: syslog-processing | ||
config.string: | | ||
input { pipeline { address => syslog } } | ||
filter { | ||
# Syslog filter statements here... | ||
} | ||
output { | ||
elasticsearch { hosts => [es_cluster_b_host] } | ||
} | ||
- pipeline.id: fallback-processing | ||
config.string: | | ||
input { pipeline { address => fallback } } | ||
output { elasticsearch { hosts => [es_cluster_b_host] } } | ||
---- | ||
|
||
Notice how following the flow of data is a simple due to the fact that each pipeline only works on a single specific task. | ||
|
||
[[output-isolator-pattern]] | ||
==== The output isolator pattern | ||
|
||
The output isolator pattern is used to prevent Logstash from blocking in the case where there are multiple outputs and one output experiences a temporary failure. For example, a server might be configured to send log data to both Elasticsearch and an HTTP endpoint. It might be the case that the HTTP endpoint is frequently unavailable due to regular service or some other reason. | ||
|
||
Logstash, by default, will block when any single output is down. This is an important behavior in guaranteeing at-least-once delivery of data. Unfortunately, in our above scenario this means that whenever the HTTP endpoint is down data is also paused from sending to Elasticsearch. Using the `pipeline` input and output, along with persistent queues, we can continue sending to Elasticsearch even when one output is down, by using the output isolator pattern. | ||
|
||
We could employ this pattern for the scenario described above with the following config: | ||
|
||
[[source,yaml]] | ||
---- | ||
# config/pipelines.yml | ||
- pipeline.id: intake | ||
queue.type: persisted | ||
config.string: | | ||
input { beats { port => 5044 } } | ||
output { pipeline { send_to => [es, http] } } | ||
- pipeline.id: buffered-es | ||
queue.type: persisted | ||
config.string: | | ||
input { pipeline { address => es } } | ||
output { elasticsearch { } } | ||
- pipeline.id: buffered-http | ||
queue.type: persisted | ||
config.string: | | ||
input { pipeline { address => http } } | ||
output { http { } } | ||
---- | ||
|
||
Please note, that in this architecture, each stage has its own queue, with its own tuning and settings. This would use up to three times as much disk space, and incur three times as much serialization / deserialization cost, as a single pipeline. | ||
|
||
[[forked-path-pattern]] | ||
==== The forked path pattern | ||
|
||
The forked path pattern is used in situations where a single event must be processed more than once according to different sets of rules. If not using the `pipeline` input and output this is commonly solved through creative use of the `clone` filter and `if/else` rules. | ||
|
||
As an example, let's imagine that we have a use case where we receive data, and index the full event in our own systems, but publish a redacted version of the data to a partner's S3 bucket. We might use the output isolator pattern described above to decouple our writes to either system. The distinguishing feature of the forked path pattern is the existence of additional rules in the downstream pipelines. | ||
|
||
An example of this pattern is in the following config: | ||
|
||
[[source,yaml]] | ||
---- | ||
# config/pipelines.yml | ||
- pipeline.id: intake | ||
queue.type: persisted | ||
config.string: | | ||
input { beats { port => 5044 } } | ||
output { pipeline { send_to => [es, http] } } | ||
- pipeline.id: buffered-es | ||
queue.type: persisted | ||
config.string: | | ||
input { pipeline { address => partner } } | ||
# Index the full event | ||
output { elasticsearch { } } | ||
- pipeline.id: partner | ||
queue.type: persisted | ||
config.string: | | ||
input { pipeline { address => s3 } } | ||
filter { | ||
# Remove the sensitive data | ||
mutate { remove_field => 'sensitive-data' } | ||
} | ||
output { s3 { } } # Output to partner's bucket | ||
---- | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note: we may want to add a "collector" pattern (the opposite of the distributor pattern, in many senses)
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! I'll add that :) |
||
|
||
[[collector-pattern]] | ||
==== The collector pattern | ||
|
||
The collector pattern is used in situations where you want to define a common set of outputs and pre-output filters that many disparate pipelines might use. This is the opposite of the distributor pattern. In this pattern many pipelines fan in to a single pipeline where outputs and other processing are shared. This pattern simplifies configuration at the cost of reducing isolation, since all data is sent through a single pipeline. | ||
|
||
An example of this pattern can be seen below: | ||
|
||
|
||
[[source,yaml]] | ||
---- | ||
# config/pipelines.yml | ||
- pipeline.id: beats | ||
config.string: | | ||
input { beats { port => 5044 } } | ||
output { pipeline { send_to => [commonOut] } } | ||
- pipeline.id: kafka | ||
config.string: | | ||
input { kafka { ... } } | ||
output { pipeline { send_to => [commonOut] } } | ||
- pipeline.id: partner | ||
# This common pipeline enforces the same logic whether data comes from Kafka or Beats | ||
config.string: | | ||
input { pipeline { address => commonOu } } | ||
filter { | ||
# Always remove sensitive data from all input sources | ||
mutate { remove_field => 'sensitive-data' } | ||
} | ||
output { elasticsearch { } } | ||
---- | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: how do we handle pipeline "reloads" when
ensure_delivery: false
? is the restarting pipeline considered blocked or unavailable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, its considered to be unavailable, so you'll lose messages. I do wonder if we should keep
ensure_delivery
. I feel like maybe it's more useful if it drops messages when blocked too, but could be confusing in its current limited incarnation?We need timeouts to have it drop on blocked pipelines. But, if we add that feature iteratively we'd need another option on top of
ensure_delivery
, likedrop_after_timeout
.WDYT? Do we keep it and have a dual option later? I feel like letting users configure a timeout is too fiddly, we probably just want an option that acts like UDP. If there's even a whiff of trouble we just drop processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking through it further, I'm OK with
ensure_delivery
. People may actually want to configure the timeout separately. I'm OK with those controls being a little weird but more precise.