Skip to content

Commit 7fed871

Browse files
committed
Change back to non async for try_into_logical_plan
1 parent 7559c44 commit 7fed871

File tree

7 files changed

+35
-40
lines changed

7 files changed

+35
-40
lines changed

datafusion/core/src/datasource/datasource.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,5 @@ pub trait TableProvider: Sync + Send {
8686
#[async_trait]
8787
pub trait TableProviderFactory: Sync + Send {
8888
/// Create a TableProvider with the given url
89-
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
89+
fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
9090
}

datafusion/core/src/execution/context.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ impl SessionContext {
429429
cmd.file_type
430430
))
431431
})?;
432-
let table = (*factory).create(cmd.location.as_str()).await?;
432+
let table = (*factory).create(cmd.location.as_str())?;
433433
self.register_table(cmd.name.as_str(), table)?;
434434
let plan = LogicalPlanBuilder::empty(false).build()?;
435435
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))

datafusion/core/src/test_util.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,7 @@ pub struct TestTableFactory {}
328328

329329
#[async_trait]
330330
impl TableProviderFactory for TestTableFactory {
331-
async fn create(
332-
&self,
333-
url: &str,
334-
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
331+
fn create(&self, url: &str) -> datafusion_common::Result<Arc<dyn TableProvider>> {
335332
Ok(Arc::new(TestTableProvider {
336333
url: url.to_string(),
337334
}))

datafusion/proto/examples/plan_serde.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async fn main() -> Result<()> {
2626
.await?;
2727
let plan = ctx.table("t1")?.to_logical_plan()?;
2828
let bytes = logical_plan_to_bytes(&plan)?;
29-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
29+
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
3030
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
3131
Ok(())
3232
}

datafusion/proto/src/bytes/mod.rs

+10-13
Original file line numberDiff line numberDiff line change
@@ -136,35 +136,32 @@ pub fn logical_plan_to_bytes_with_extension_codec(
136136

137137
/// Deserialize a LogicalPlan from json
138138
#[cfg(feature = "json")]
139-
pub async fn logical_plan_from_json(
140-
json: &str,
141-
ctx: &SessionContext,
142-
) -> Result<LogicalPlan> {
139+
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
143140
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
144141
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?;
145142
let extension_codec = DefaultExtensionCodec {};
146-
back.try_into_logical_plan(ctx, &extension_codec).await
143+
back.try_into_logical_plan(ctx, &extension_codec)
147144
}
148145

149146
/// Deserialize a LogicalPlan from bytes
150-
pub async fn logical_plan_from_bytes(
147+
pub fn logical_plan_from_bytes(
151148
bytes: &[u8],
152149
ctx: &SessionContext,
153150
) -> Result<LogicalPlan> {
154151
let extension_codec = DefaultExtensionCodec {};
155-
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await
152+
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
156153
}
157154

158155
/// Deserialize a LogicalPlan from bytes
159-
pub async fn logical_plan_from_bytes_with_extension_codec(
156+
pub fn logical_plan_from_bytes_with_extension_codec(
160157
bytes: &[u8],
161158
ctx: &SessionContext,
162159
extension_codec: &dyn LogicalExtensionCodec,
163160
) -> Result<LogicalPlan> {
164161
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
165162
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
166163
})?;
167-
protobuf.try_into_logical_plan(ctx, extension_codec).await
164+
protobuf.try_into_logical_plan(ctx, extension_codec)
168165
}
169166

170167
#[derive(Debug)]
@@ -189,7 +186,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
189186
))
190187
}
191188

192-
async fn try_decode_table_provider(
189+
fn try_decode_table_provider(
193190
&self,
194191
_buf: &[u8],
195192
_schema: SchemaRef,
@@ -243,12 +240,12 @@ mod test {
243240
assert_eq!(actual, expected);
244241
}
245242

246-
#[tokio::test]
243+
#[test]
247244
#[cfg(feature = "json")]
248-
async fn json_to_plan() {
245+
fn json_to_plan() {
249246
let input = r#"{"emptyRelation":{}}"#.to_string();
250247
let ctx = SessionContext::new();
251-
let actual = logical_plan_from_json(&input, &ctx).await.unwrap();
248+
let actual = logical_plan_from_json(&input, &ctx).unwrap();
252249
let result = matches!(actual, LogicalPlan::EmptyRelation(_));
253250
assert!(result, "Should parse empty relation");
254251
}

datafusion/proto/src/lib.rs

+8-9
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ mod roundtrip_tests {
130130
let bytes =
131131
logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?;
132132
let logical_round_trip =
133-
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)
134-
.await?;
133+
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
135134
assert_eq!(
136135
format!("{:?}", topk_plan),
137136
format!("{:?}", logical_round_trip)
@@ -172,7 +171,7 @@ mod roundtrip_tests {
172171
))
173172
}
174173

175-
async fn try_decode_table_provider(
174+
fn try_decode_table_provider(
176175
&self,
177176
buf: &[u8],
178177
_schema: SchemaRef,
@@ -189,7 +188,7 @@ mod roundtrip_tests {
189188
.get("testtable")
190189
.expect("Unable to find testtable factory")
191190
.clone();
192-
let provider = (*factory).create(msg.url.as_str()).await?;
191+
let provider = (*factory).create(msg.url.as_str())?;
193192
Ok(provider)
194193
}
195194

@@ -229,7 +228,7 @@ mod roundtrip_tests {
229228
let scan = ctx.table("t")?.to_logical_plan()?;
230229
let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?;
231230
let logical_round_trip =
232-
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?;
231+
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
233232
assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip));
234233
Ok(())
235234
}
@@ -257,7 +256,7 @@ mod roundtrip_tests {
257256
println!("{:?}", plan);
258257

259258
let bytes = logical_plan_to_bytes(&plan)?;
260-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
259+
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
261260
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
262261

263262
Ok(())
@@ -270,7 +269,7 @@ mod roundtrip_tests {
270269
.await?;
271270
let plan = ctx.table("t1")?.to_logical_plan()?;
272271
let bytes = logical_plan_to_bytes(&plan)?;
273-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
272+
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
274273
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
275274
Ok(())
276275
}
@@ -284,7 +283,7 @@ mod roundtrip_tests {
284283
.await?;
285284
let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
286285
let bytes = logical_plan_to_bytes(&plan)?;
287-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
286+
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
288287
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
289288
Ok(())
290289
}
@@ -431,7 +430,7 @@ mod roundtrip_tests {
431430
}
432431
}
433432

434-
async fn try_decode_table_provider(
433+
fn try_decode_table_provider(
435434
&self,
436435
_buf: &[u8],
437436
_schema: SchemaRef,

datafusion/proto/src/logical_plan.rs

+13-11
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
8686
B: BufMut,
8787
Self: Sized;
8888

89-
async fn try_into_logical_plan(
89+
fn try_into_logical_plan(
9090
&self,
9191
ctx: &SessionContext,
9292
extension_codec: &dyn LogicalExtensionCodec,
@@ -130,7 +130,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
130130
buf: &mut Vec<u8>,
131131
) -> Result<(), DataFusionError>;
132132

133-
async fn try_decode_table_provider(
133+
fn try_decode_table_provider(
134134
&self,
135135
buf: &[u8],
136136
schema: SchemaRef,
@@ -170,7 +170,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
170170
))
171171
}
172172

173-
async fn try_decode_table_provider(
173+
fn try_decode_table_provider(
174174
&self,
175175
_buf: &[u8],
176176
_schema: SchemaRef,
@@ -196,7 +196,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
196196
macro_rules! into_logical_plan {
197197
($PB:expr, $CTX:expr, $CODEC:expr) => {{
198198
if let Some(field) = $PB.as_ref() {
199-
field.as_ref().try_into_logical_plan($CTX, $CODEC).await
199+
field.as_ref().try_into_logical_plan($CTX, $CODEC)
200200
} else {
201201
Err(proto_error("Missing required field in protobuf"))
202202
}
@@ -301,7 +301,7 @@ impl AsLogicalPlan for LogicalPlanNode {
301301
})
302302
}
303303

304-
async fn try_into_logical_plan(
304+
fn try_into_logical_plan(
305305
&self,
306306
ctx: &SessionContext,
307307
extension_codec: &dyn LogicalExtensionCodec,
@@ -487,9 +487,11 @@ impl AsLogicalPlan for LogicalPlanNode {
487487
.iter()
488488
.map(|expr| parse_expr(expr, ctx))
489489
.collect::<Result<Vec<_>, _>>()?;
490-
let provider = extension_codec
491-
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)
492-
.await?;
490+
let provider = extension_codec.try_decode_table_provider(
491+
&scan.custom_table_data,
492+
schema,
493+
ctx,
494+
)?;
493495

494496
LogicalPlanBuilder::scan_with_filters(
495497
&scan.table_name,
@@ -591,7 +593,7 @@ impl AsLogicalPlan for LogicalPlanNode {
591593
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
592594
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
593595
)))?
594-
.try_into_logical_plan(ctx, extension_codec).await?;
596+
.try_into_logical_plan(ctx, extension_codec)?;
595597
let definition = if !create_view.definition.is_empty() {
596598
Some(create_view.definition.clone())
597599
} else {
@@ -716,7 +718,7 @@ impl AsLogicalPlan for LogicalPlanNode {
716718
LogicalPlanType::Union(union) => {
717719
let mut input_plans: Vec<LogicalPlan> = vec![];
718720
for i in union.inputs.iter() {
719-
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
721+
let res = i.try_into_logical_plan(ctx, extension_codec)?;
720722
input_plans.push(res);
721723
}
722724

@@ -744,7 +746,7 @@ impl AsLogicalPlan for LogicalPlanNode {
744746
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
745747
let mut input_plans: Vec<LogicalPlan> = vec![];
746748
for i in inputs.iter() {
747-
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
749+
let res = i.try_into_logical_plan(ctx, extension_codec)?;
748750
input_plans.push(res);
749751
}
750752

0 commit comments

Comments
 (0)