Description
replaces #745
Bulk rejections are a constant pain point when running very large Hadoop/Spark jobs. Each outgoing bulk request is split on the Elasticsearch network thread and sent to each node's bulk queue as a transport bulk request. This means that for 300 Spark nodes sending parallel requests to 30 ES nodes/shards, then there could potentially at maximum still be 300 transport bulk actions after splitting, assuming the worst case scenario of each request splitting its docs evenly over each shard. Our advice in the past has been to lower the number of concurrent writers to Elasticsearch to keep the bulk queues from overflowing. Increasing the bulk queue size or infinitely retrying can lead to even worse congestion and longer job times.
One way to smooth this out would be to provide a "best-effort" routing mechanism, and the ability to partition requests by the primary shard they would be sent to. This could help limit the impact of a Hadoop/Spark job on bulk request queues overflowing. The prerequisites and steps to take would be as follows:
Approach:
- Add a best effort routing table to the job. At startup when we determine the locations of the primary shards, we could read from the cluster state endpoint to determine the best way to route the request at that point in time. When the total size of the queues reach the bulk flushing threshold, we launch a bulk request to each node (preferably async if possible). If the routing changes during the job, each node will simply forward the bulk request on to the correct node as usual.
- Sporadically update the routing table. For streaming technologies like Storm, where we launch the job and never update anything, we might be able to have each task register a periodic best effort routing table update request.
- Partitioner Implementation. Adding best effort routing is not enough to get bulk queue overflowing down. It simply removes the need for an ES node to forward the request in most cases. To actually get the bulk queue overflow under control we should introduce partitioning functionality in the execution framework that allows for routing the documents to writers based on the shards they are expected to land on. This, coupled with the best-effort routing, should handle most issues pertaining to out of date routing information (should the cluster state change during the job run).
Requirements:
- Requires effective IDs. For every data element that passes through the partitioner/writer there needs to be a document ID, a routing ID, or both present. Without these, the routing cannot be determined. We will not be extracting Elasticsearch's ID generation code. We just wont.
- Requires static indices for writes. If an index does not exist at the start of the job, then it is impossible to know which shard (which is yet to be created and allocated) that a data element should go to.