Skip to content

Invalid index_id with ingest v2 returns 429 #5721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Instant;

use hyper::StatusCode;
use quickwit_common::rate_limited_error;
use quickwit_config::INGEST_V2_SOURCE_ID;
use quickwit_config::{validate_identifier, INGEST_V2_SOURCE_ID};
use quickwit_ingest::IngestRequestV2Builder;
use quickwit_proto::ingest::router::{
IngestFailureReason, IngestResponseV2, IngestRouterService, IngestRouterServiceClient,
Expand Down Expand Up @@ -91,6 +91,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
let mut lines = lines(&body.content).enumerate();
let mut per_subrequest_doc_handles: HashMap<u32, Vec<DocHandle>> = HashMap::new();
let mut action_count = 0;
let mut invalid_index_id_items = Vec::new();
while let Some((line_no, line)) = lines.next() {
let action = serde_json::from_slice::<BulkAction>(line).map_err(|error| {
ElasticsearchError::new(
Expand Down Expand Up @@ -121,6 +122,16 @@ pub(crate) async fn elastic_bulk_ingest_v2(
Some(ElasticException::ActionRequestValidation),
)
})?;

// Validate index ID early because propagating back the right error (400)
// from deeper ingest layers is harder
if validate_identifier("", &index_id).is_err() {
let invalid_item = make_invalid_index_id_item(index_id.clone(), meta.es_doc_id);
invalid_index_id_items.push((action_count, invalid_item));
action_count += 1;
continue;
}

let (subrequest_id, doc_uid) = ingest_request_builder.add_doc(index_id, doc);

let doc_handle = DocHandle {
Expand Down Expand Up @@ -151,6 +162,7 @@ pub(crate) async fn elastic_bulk_ingest_v2(
per_subrequest_doc_handles,
now,
action_count,
invalid_index_id_items,
)
}

Expand All @@ -159,6 +171,7 @@ fn make_elastic_bulk_response_v2(
mut per_subrequest_doc_handles: HashMap<u32, Vec<DocHandle>>,
now: Instant,
action_count: usize,
invalid_index_id_items: Vec<(usize, ElasticBulkItem)>,
) -> Result<ElasticBulkResponse, ElasticsearchError> {
let mut positioned_actions: Vec<(usize, ElasticBulkAction)> = Vec::with_capacity(action_count);
let mut errors = false;
Expand Down Expand Up @@ -308,6 +321,12 @@ fn make_elastic_bulk_response_v2(
"doc handles should be empty"
);

for (position, item) in invalid_index_id_items {
errors = true;
let action = ElasticBulkAction::Index(item);
positioned_actions.push((position, action));
}

assert_eq!(
positioned_actions.len(),
action_count,
Expand Down Expand Up @@ -344,6 +363,20 @@ fn remove_doc_handles(
})
}

fn make_invalid_index_id_item(index_id: String, es_doc_id: Option<String>) -> ElasticBulkItem {
let error = ElasticBulkError {
index_id: Some(index_id.clone()),
exception: ElasticException::IllegalArgument,
reason: format!("invalid index id [{}]", index_id),
};
ElasticBulkItem {
index_id,
es_doc_id,
status: StatusCode::BAD_REQUEST,
error: Some(error),
}
}

#[cfg(test)]
mod tests {
use bytesize::ByteSize;
Expand Down Expand Up @@ -707,6 +740,7 @@ mod tests {
HashMap::new(),
Instant::now(),
0,
Vec::new(),
)
.unwrap();

Expand Down Expand Up @@ -767,6 +801,7 @@ mod tests {
per_request_doc_handles,
Instant::now(),
3,
Vec::new(),
)
.unwrap();

Expand Down Expand Up @@ -832,4 +867,82 @@ mod tests {
.reply(&handler)
.await;
}

#[tokio::test]
async fn test_bulk_api_invalid_index_id() {
let mut mock_ingest_router = MockIngestRouterService::new();
mock_ingest_router
.expect_ingest()
.once()
.returning(|ingest_request| {
assert_eq!(ingest_request.subrequests.len(), 2);
Ok(IngestResponseV2 {
successes: vec![
IngestSuccess {
subrequest_id: 0,
index_uid: Some(IndexUid::for_test("my-index-1", 0)),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(1)),
replication_position_inclusive: Some(Position::offset(1u64)),
num_ingested_docs: 2,
parse_failures: Vec::new(),
},
IngestSuccess {
subrequest_id: 1,
index_uid: Some(IndexUid::for_test("my-index-2", 0)),
source_id: INGEST_V2_SOURCE_ID.to_string(),
shard_id: Some(ShardId::from(1)),
replication_position_inclusive: Some(Position::offset(0u64)),
num_ingested_docs: 1,
parse_failures: Vec::new(),
},
],
failures: Vec::new(),
})
});
let ingest_router = IngestRouterServiceClient::from_mock(mock_ingest_router);
let handler = es_compat_bulk_handler_v2(ingest_router, ByteSize::mb(10));

let payload = r#"
{"create": {"_index": "my-index-1"}}
{"ts": 1, "message": "my-message-1"}
{"create": {"_index": "bad!"}}
{"ts": 1, "message": "my-message-2"}
{"create": {"_index": "my-index-2", "_id" : "1"}}
{"ts": 1, "message": "my-message-3"}

"#;
let response = warp::test::request()
.path("/_elastic/_bulk")
.method("POST")
.body(payload)
.reply(&handler)
.await;
assert_eq!(response.status(), 200);

let bulk_response: ElasticBulkResponse = serde_json::from_slice(response.body()).unwrap();
assert!(bulk_response.errors);

let items = bulk_response
.actions
.into_iter()
.map(|action| match action {
ElasticBulkAction::Create(item) => item,
ElasticBulkAction::Index(item) => item,
})
.collect::<Vec<_>>();
assert_eq!(items.len(), 3);

assert_eq!(items[0].index_id, "my-index-1");
assert!(items[0].es_doc_id.is_none());
assert_eq!(items[0].status, StatusCode::CREATED);

assert_eq!(items[1].index_id, "bad!");
assert!(items[1].es_doc_id.is_none());
assert_eq!(items[1].status, StatusCode::BAD_REQUEST);

assert_eq!(items[2].index_id, "my-index-2");
assert_eq!(items[2].es_doc_id.as_ref().unwrap(), "1");
assert_eq!(items[2].status, StatusCode::CREATED);
}
}
10 changes: 9 additions & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use bytes::{Buf, Bytes};
use quickwit_config::{IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_config::{validate_identifier, IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{
CommitType, DocBatchBuilder, DocBatchV2Builder, FetchResponse, IngestRequest, IngestService,
IngestServiceClient, IngestServiceError, TailRequest,
Expand Down Expand Up @@ -215,6 +215,14 @@ async fn ingest_v2(
None
};

// Validate index ID early because propagating back the right error (400)
// from deeper ingest layers is harder
if validate_identifier("", &index_id).is_err() {
return Err(IngestServiceError::BadRequest(
"invalid index ID".to_string(),
));
}

let subrequest = IngestSubrequest {
subrequest_id: 0,
index_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
ndjson:
- index: { "_index": "test-index" }
- message: Hola, Mundo!
- index: { "_index": "test-index-pattern-11" }
- message: Hola, Mundo!
- index: { "_index": "test-index-pattern-&1" }
- message: Hola, Mundo!
status_code: 200
expected:
errors: true
items:
- index:
_index: test-index
status: 201
- index:
_index: test-index-pattern-11
status: 201
- index:
_index: test-index-pattern-&1
status: 400
Loading