|
| 1 | +--- |
| 2 | +layout: default |
| 3 | +title: Pull-based ingestion |
| 4 | +parent: Document APIs |
| 5 | +has_children: true |
| 6 | +nav_order: 60 |
| 7 | +--- |
| 8 | + |
| 9 | +# Pull-based ingestion |
| 10 | +**Introduced 3.0** |
| 11 | +{: .label .label-purple } |
| 12 | + |
| 13 | +This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, join the discussion on the [OpenSearch forum](https://forum.opensearch.org/). |
| 14 | +{: .warning} |
| 15 | + |
| 16 | +Pull-based ingestion enables OpenSearch to ingest data from streaming sources such as Apache Kafka or Amazon Kinesis. Unlike traditional ingestion methods where clients actively push data to OpenSearch through REST APIs, pull-based ingestion allows OpenSearch to control the data flow by retrieving data directly from streaming sources. This approach provides exactly-once ingestion semantics and native backpressure handling, helping prevent server overload during traffic spikes. |
| 17 | + |
| 18 | +## Prerequisites |
| 19 | + |
| 20 | +Before using pull-based ingestion, ensure that the following prerequisites are met: |
| 21 | + |
| 22 | +* Install an ingestion plugin for your streaming source using the command `bin/opensearch-plugin install <plugin-name>`. For more information, see [Additional plugins]({{site.url}}{{site.baseurl}}/install-and-configure/additional-plugins/index/). OpenSearch supports the following ingestion plugins: |
| 23 | + - `ingestion-kafka` |
| 24 | + - `ingestion-kinesis` |
| 25 | +* Enable [segment replication]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/segment-replication/index/) with [remote-backed storage]({{site.url}}{{site.baseurl}}/tuning-your-cluster/availability-and-recovery/remote-store/index/). Pull-based ingestion is not compatible with document replication. |
| 26 | +* Configure pull-based ingestion during [index creation](#creating-an-index-for-pull-based-ingestion). You cannot convert an existing push-based index to a pull-based one. |
| 27 | + |
| 28 | +## Creating an index for pull-based ingestion |
| 29 | + |
| 30 | +To ingest data from a streaming source, first create an index with pull-based ingestion settings. The following request creates an index that pulls data from a Kafka topic: |
| 31 | + |
| 32 | +```json |
| 33 | +PUT /my-index |
| 34 | +{ |
| 35 | + "settings": { |
| 36 | + "ingestion_source": { |
| 37 | + "type": "kafka", |
| 38 | + "pointer.init.reset": "earliest", |
| 39 | + "param": { |
| 40 | + "topic": "test", |
| 41 | + "bootstrap_servers": "localhost:49353" |
| 42 | + } |
| 43 | + }, |
| 44 | + "index.number_of_shards": 1, |
| 45 | + "index.number_of_replicas": 1, |
| 46 | + "index": { |
| 47 | + "replication.type": "SEGMENT" |
| 48 | + } |
| 49 | + }, |
| 50 | + "mappings": { |
| 51 | + "properties": { |
| 52 | + "name": { |
| 53 | + "type": "text" |
| 54 | + }, |
| 55 | + "age": { |
| 56 | + "type": "integer" |
| 57 | + } |
| 58 | + } |
| 59 | + } |
| 60 | +} |
| 61 | +``` |
| 62 | +{% include copy-curl.html %} |
| 63 | + |
| 64 | +### Ingestion source parameters |
| 65 | + |
| 66 | +The `ingestion_source` parameters control how OpenSearch pulls data from the streaming source. A _poll_ is an operation in which OpenSearch actively requests a batch of data from the streaming source. The following table lists all parameters that `ingestion_source` supports. |
| 67 | + |
| 68 | +| Parameter | Description | |
| 69 | +| :--- | :--- | |
| 70 | +| `type` | The streaming source type. Required. Valid values are `kafka` or `kinesis`. | |
| 71 | +| `pointer.init.reset` | Determines where to start reading from the stream. Optional. Valid values are `earliest`, `latest`, `rewind_by_offset`, `rewind_by_timestamp`, or `none`. See [Stream position](#stream-position). | |
| 72 | +| `pointer.init.reset.value` | Required only for `rewind_by_offset` or `rewind_by_timestamp`. Specifies the offset value or timestamp in milliseconds. See [Stream position](#stream-position). | |
| 73 | +| `error_strategy` | How to handle failed messages. Optional. Valid values are `DROP` (failed messages are skipped and ingestion continues) and `BLOCK` (when a message fails, ingestion stops). Default is `DROP`. We recommend using `DROP` for the current experimental release. | |
| 74 | +| `max_batch_size` | The maximum number of records to retrieve in each poll operation. Optional. | |
| 75 | +| `poll.timeout` | The maximum time to wait for data in each poll operation. Optional. | |
| 76 | +| `num_processor_threads` | The number of threads for processing ingested data. Optional. Default is 1. | |
| 77 | +| `param` | Source-specific configuration parameters. Required. <br> • The `ingest-kafka` plugin requires:<br>  - `topic`: The Kafka topic to consume from<br>  - `bootstrap_servers`: The Kafka server addresses<br>  Optionally, you can provide additional standard Kafka consumer parameters (such as `fetch.min.bytes`). These parameters are passed directly to the Kafka consumer. <br> • The `ingest-kinesis` plugin requires:<br>  - `stream`: The Kinesis stream name<br>  - `region`: The AWS Region<br>  - `access_key`: The AWS access key<br>  - `secret_key`: The AWS secret key<br>  Optionally, you can provide an `endpoint_override`. | |
| 78 | + |
| 79 | +### Stream position |
| 80 | + |
| 81 | +When creating an index, you can specify where OpenSearch should start reading from the stream by configuring the `pointer.init.reset` and `pointer.init.reset.value` settings in the `ingestion_source` parameter. OpenSearch will resume reading from the last commited position for existing indexes. |
| 82 | + |
| 83 | +The following table provides the valid `pointer.init.reset` values and their corresponding `pointer.init.reset.value` values. |
| 84 | + |
| 85 | +| `pointer.init.reset` | Starting ingestion point | `pointer.init.reset.value` | |
| 86 | +| :--- | :--- | :--- | |
| 87 | +| `earliest` | Beginning of stream | None | |
| 88 | +| `latest` | Current end of stream | None | |
| 89 | +| `rewind_by_offset` | Specific offset in the stream | A positive integer offset. Required. | |
| 90 | +| `rewind_by_timestamp` | Specific point in time | A Unix timestamp in milliseconds. Required. <br> For Kafka streams, defaults to Kafka's `auto.offset.reset` policy if no messages are found for the given timestamp. | |
| 91 | +| `none` | Last committed position for existing indexes | None | |
| 92 | + |
| 93 | +### Stream partitioning |
| 94 | + |
| 95 | +When using partitioned streams (such as Kafka topics or Kinesis shards), note the following relationships between stream partitions and OpenSearch shards: |
| 96 | + |
| 97 | +- OpenSearch shards map one-to-one to stream partitions. |
| 98 | +- The number of index shards must be greater than or equal to the number of stream partitions. |
| 99 | +- Extra shards beyond the number of partitions remain empty. |
| 100 | +- Documents must be sent to the same partition for successful updates. |
| 101 | + |
| 102 | +When using pull-based ingestion, traditional REST API--based ingestion is disabled for the index. |
| 103 | +{: .note} |
| 104 | + |
| 105 | +### Updating the error policy |
| 106 | + |
| 107 | +You can use the [Update Settings API]({{site.url}}{{site.baseurl}}/api-reference/index-apis/update-settings/) to dynamically update the error policy by setting `index.ingestion_source.error_strategy` to either `DROP` or `BLOCK`. |
| 108 | + |
| 109 | +The following example demonstrates how to update the error policy: |
| 110 | + |
| 111 | +```json |
| 112 | +PUT /my-index/_settings |
| 113 | +{ |
| 114 | + "index.ingestion_source.error_strategy": "DROP" |
| 115 | +} |
| 116 | +``` |
| 117 | +{% include copy-curl.html %} |
| 118 | + |
| 119 | +## Message format |
| 120 | + |
| 121 | +To be correctly processed by OpenSearch, messages in the streaming source must have the following format: |
| 122 | + |
| 123 | +```json |
| 124 | +{"_id":"1", "_version":"1", "_source":{"name": "alice", "age": 30}, "_op_type": "index"} |
| 125 | +{"_id":"2", "_version":"2", "_source":{"name": "alice", "age": 30}, "_op_type": "delete"} |
| 126 | +``` |
| 127 | + |
| 128 | +Each data unit in the streaming source (Kafka message or Kinesis record) must include the following fields that specify how to create or modify an OpenSearch document. |
| 129 | + |
| 130 | +| Field | Data type | Required | Description | |
| 131 | +| :--- | :--- | :--- | :--- | |
| 132 | +| `_id` | String | No | A unique identifier for a document. If not provided, OpenSearch auto-generates an ID. Required for document updates or deletions. | |
| 133 | +| `_version` | Long | No | A document version number, which must be maintained externally. If provided, OpenSearch drops messages with versions earlier than the current document version. If not provided, no version checking occurs. | |
| 134 | +| `_op_type` | String | No | The operation to perform. Valid values are:<br>- `index`: Creates a new document or updates an existing one<br>- `delete`: Soft deletes a document | |
| 135 | +| `_source` | Object | Yes | The message payload containing the document data. | |
| 136 | + |
| 137 | +## Pull-based ingestion metrics |
| 138 | + |
| 139 | +Pull-based ingestion provides metrics that can be used to monitor the ingestion process. The `polling_ingest_stats` metric is currently supported and is available at the shard level. |
| 140 | + |
| 141 | +The following table lists the available `polling_ingest_stats` metrics. |
| 142 | + |
| 143 | +| Metric | Description | |
| 144 | +| :--- | :--- | |
| 145 | +| `message_processor_stats.total_processed_count` | The total number of messages processed by the message processor. | |
| 146 | +| `consumer_stats.total_polled_count` | The total number of messages polled from the stream consumer. | |
| 147 | + |
| 148 | +To retrieve shard-level pull-based ingestion metrics, use the [Nodes Stats API]({{site.url}}{{site.baseurl}}/api-reference/index-apis/update-settings/): |
| 149 | + |
| 150 | +```json |
| 151 | +GET /_nodes/stats/indices?level=shards&pretty |
| 152 | +``` |
| 153 | +{% include copy-curl.html %} |
| 154 | +``` |
0 commit comments