Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not use $lookup for query with variables if only single variable set #147

Draft
wants to merge 10 commits into
base: v1
Choose a base branch
from
Draft
3 changes: 2 additions & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
push:
branches:
- main
- 'v*'
tags:
- 'v*'

Expand Down Expand Up @@ -34,7 +35,7 @@ jobs:
run: nix build --print-build-logs

- name: Create release 🚀
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: mongodb-connector
path: result/bin/mongodb-connector
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ This changelog documents the changes between release versions.

## [Unreleased]

### Changed

- Query plans for remote joins no longer use `$lookup` stage if there is exactly one incoming variable set - this allows use of `$vectorSearch` in native queries in remote joins in certain circumstances ([#147](https://github.com/hasura/ndc-mongodb/pull/147))

## [1.6.0] - 2025-01-17

### Added
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions crates/integration-tests/src/tests/native_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,65 @@ async fn runs_native_query_with_variable_sets() -> anyhow::Result<()> {
);
Ok(())
}

#[tokio::test]
async fn runs_native_query_with_a_single_variable_set() -> anyhow::Result<()> {
assert_yaml_snapshot!(
run_connector_query(
Connector::SampleMflix,
query_request()
.variables([[("count", 3)]])
.collection("title_word_frequency")
.query(
query()
.predicate(binop("_eq", target!("count"), variable!(count)))
.order_by([asc!("_id")])
.limit(20)
.fields([field!("_id"), field!("count")]),
)
)
.await?
);
Ok(())
}

#[tokio::test]
async fn runs_native_query_without_input_collection_with_variable_sets() -> anyhow::Result<()> {
assert_yaml_snapshot!(
run_connector_query(
Connector::SampleMflix,
query_request()
.variables([[("type", "decimal")], [("type", "date")]])
.collection("extended_json_test_data")
.query(
query()
.predicate(binop("_eq", target!("type"), variable!(type)))
.order_by([asc!("type"), asc!("value")])
.fields([field!("type"), field!("value")]),
)
)
.await?
);
Ok(())
}

#[tokio::test]
async fn runs_native_query_without_input_collection_with_single_variable_set() -> anyhow::Result<()>
{
assert_yaml_snapshot!(
run_connector_query(
Connector::SampleMflix,
query_request()
.variables([[("type", "decimal")]])
.collection("extended_json_test_data")
.query(
query()
.predicate(binop("_eq", target!("type"), variable!(type)))
.order_by([asc!("type"), asc!("value")])
.fields([field!("type"), field!("value")]),
)
)
.await?
);
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
source: crates/integration-tests/src/tests/native_query.rs
expression: "run_connector_query(Connector::SampleMflix,\nquery_request().variables([[(\"count\",\n3)]]).collection(\"title_word_frequency\").query(query().predicate(binop(\"_eq\",\ntarget!(\"count\"),\nvariable!(count))).order_by([asc!(\"_id\")]).limit(20).fields([field!(\"_id\"),\nfield!(\"count\")]),)).await?"
---
- rows:
- _id: "#1"
count: 3
- _id: "'n"
count: 3
- _id: "'n'"
count: 3
- _id: (Not)
count: 3
- _id: "100"
count: 3
- _id: 10th
count: 3
- _id: "15"
count: 3
- _id: "174"
count: 3
- _id: "23"
count: 3
- _id: 3-D
count: 3
- _id: "42"
count: 3
- _id: "420"
count: 3
- _id: "72"
count: 3
- _id: Abandoned
count: 3
- _id: Abendland
count: 3
- _id: Absence
count: 3
- _id: Absent
count: 3
- _id: Abu
count: 3
- _id: Accident
count: 3
- _id: Accidental
count: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
source: crates/integration-tests/src/tests/native_query.rs
expression: "run_connector_query(Connector::SampleMflix,\nquery_request().variables([[(\"type\",\n\"decimal\")]]).collection(\"extended_json_test_data\").query(query().predicate(binop(\"_eq\",\ntarget!(\"type\"),\nvariable!(type))).order_by([asc!(\"type\"),\nasc!(\"value\")]).fields([field!(\"type\"), field!(\"value\")]),)).await?"
---
- rows:
- type: decimal
value:
$numberDecimal: "1"
- type: decimal
value:
$numberDecimal: "2"
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
source: crates/integration-tests/src/tests/native_query.rs
expression: "run_connector_query(Connector::SampleMflix,\nquery_request().variables([[(\"type\", \"decimal\")],\n[(\"type\",\n\"date\")]]).collection(\"extended_json_test_data\").query(query().predicate(binop(\"_eq\",\ntarget!(\"type\"),\nvariable!(type))).order_by([asc!(\"type\"),\nasc!(\"value\")]).fields([field!(\"type\"), field!(\"value\")]),)).await?"
---
- rows:
- type: decimal
value:
$numberDecimal: "1"
- type: decimal
value:
$numberDecimal: "2"
- rows:
- type: date
value:
$date:
$numberLong: "1637571600000"
- type: date
value:
$date:
$numberLong: "1724164680000"
32 changes: 20 additions & 12 deletions crates/mongodb-agent-common/src/explain.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::collections::BTreeMap;

use mongodb::bson::{doc, to_bson, Bson};
use mongodb_support::aggregate::AggregateCommand;
use ndc_models::{ExplainResponse, QueryRequest};
use ndc_query_plan::plan_for_query_request;

use crate::{
interface_types::MongoAgentError,
mongo_query_plan::MongoConfiguration,
query::{self, QueryTarget},
interface_types::MongoAgentError, mongo_query_plan::MongoConfiguration, query,
state::ConnectorState,
};

Expand All @@ -19,19 +18,28 @@ pub async fn explain_query(
let db = state.database();
let query_plan = plan_for_query_request(config, query_request)?;

let pipeline = query::pipeline_for_query_request(config, &query_plan)?;
let AggregateCommand {
collection,
pipeline,
let_vars,
} = query::command_for_query_request(config, &query_plan)?;
let pipeline_bson = to_bson(&pipeline)?;

let target = QueryTarget::for_request(config, &query_plan);
let aggregate_target = match (target.input_collection(), query_plan.has_variables()) {
(Some(collection_name), false) => Bson::String(collection_name.to_string()),
_ => Bson::Int32(1),
let aggregate_target = match collection {
Some(collection_name) => Bson::String(collection_name.to_string()),
None => Bson::Int32(1),
};

let query_command = doc! {
"aggregate": aggregate_target,
"pipeline": pipeline_bson,
"cursor": {},
let query_command = {
let mut cmd = doc! {
"aggregate": aggregate_target,
"pipeline": pipeline_bson,
"cursor": {},
};
if let Some(let_vars) = let_vars {
cmd.insert("let", let_vars);
}
cmd
};

let explain_command = doc! {
Expand Down
38 changes: 21 additions & 17 deletions crates/mongodb-agent-common/src/query/execute_query_request.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use futures::Stream;
use futures_util::TryStreamExt as _;
use mongodb::bson;
use mongodb_support::aggregate::Pipeline;
use mongodb::{bson, options::AggregateOptions};
use mongodb_support::aggregate::AggregateCommand;
use ndc_models::{QueryRequest, QueryResponse};
use ndc_query_plan::plan_for_query_request;
use tracing::{instrument, Instrument};

use super::{pipeline::pipeline_for_query_request, response::serialize_query_response};
use super::{pipeline::command_for_query_request, response::serialize_query_response};
use crate::{
interface_types::MongoAgentError,
mongo_query_plan::{MongoConfiguration, QueryPlan},
mongodb::{CollectionTrait as _, DatabaseTrait},
query::QueryTarget,
};

type Result<T> = std::result::Result<T, MongoAgentError>;
Expand All @@ -31,8 +30,8 @@ pub async fn execute_query_request(
);
let query_plan = preprocess_query_request(config, query_request)?;
tracing::debug!(?query_plan, "abstract query plan");
let pipeline = pipeline_for_query_request(config, &query_plan)?;
let documents = execute_query_pipeline(database, config, &query_plan, pipeline).await?;
let command = command_for_query_request(config, &query_plan)?;
let documents = execute_query_command(database, command).await?;
let response = serialize_query_response(config.extended_json_mode(), &query_plan, documents)?;
Ok(response)
}
Expand All @@ -47,32 +46,37 @@ fn preprocess_query_request(
}

#[instrument(name = "Execute Query Pipeline", skip_all, fields(internal.visibility = "user"))]
async fn execute_query_pipeline(
async fn execute_query_command(
database: impl DatabaseTrait,
config: &MongoConfiguration,
query_plan: &QueryPlan,
pipeline: Pipeline,
AggregateCommand {
collection,
pipeline,
let_vars,
}: AggregateCommand,
) -> Result<Vec<bson::Document>> {
let target = QueryTarget::for_request(config, query_plan);
tracing::debug!(
?target,
?collection,
pipeline = %serde_json::to_string(&pipeline).unwrap(),
let_vars = %serde_json::to_string(&let_vars).unwrap(),
"executing query"
);

let aggregate_options =
let_vars.map(|let_vars| AggregateOptions::builder().let_vars(let_vars).build());

// The target of a query request might be a collection, or it might be a native query. In the
// latter case there is no collection to perform the aggregation against. So instead of sending
// the MongoDB API call `db.<collection>.aggregate` we instead call `db.aggregate`.
//
// If the query request includes variable sets then instead of specifying the target collection
// up front that is deferred until the `$lookup` stage of the aggregation pipeline. That is
// another case where we call `db.aggregate` instead of `db.<collection>.aggregate`.
let documents = match (target.input_collection(), query_plan.has_variables()) {
(Some(collection_name), false) => {
let documents = match collection {
Some(collection_name) => {
let collection = database.collection(collection_name.as_str());
collect_response_documents(
collection
.aggregate(pipeline, None)
.aggregate(pipeline, aggregate_options)
.instrument(tracing::info_span!(
"MongoDB Aggregate Command",
internal.visibility = "user"
Expand All @@ -81,10 +85,10 @@ async fn execute_query_pipeline(
)
.await
}
_ => {
None => {
collect_response_documents(
database
.aggregate(pipeline, None)
.aggregate(pipeline, aggregate_options)
.instrument(tracing::info_span!(
"MongoDB Aggregate Command",
internal.visibility = "user"
Expand Down
Loading
Loading