Skip to content

Refine the logical and physical plan serialization and deserialization #4659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 45 additions & 111 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ message ListingTableScanNode {
repeated string paths = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
datafusion.Schema schema = 5;
repeated datafusion.LogicalExprNode filters = 6;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
repeated string table_partition_cols = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
Expand All @@ -110,13 +110,13 @@ message ListingTableScanNode {
ParquetFormat parquet = 11;
AvroFormat avro = 12;
}
repeated datafusion.LogicalExprNode file_sort_order = 13;
repeated LogicalExprNode file_sort_order = 13;
}

message ViewTableScanNode {
string table_name = 1;
LogicalPlanNode input = 2;
datafusion.Schema schema = 3;
Schema schema = 3;
ProjectionColumns projection = 4;
string definition = 5;
}
Expand All @@ -125,27 +125,27 @@ message ViewTableScanNode {
message CustomTableScanNode {
string table_name = 1;
ProjectionColumns projection = 2;
datafusion.Schema schema = 3;
repeated datafusion.LogicalExprNode filters = 4;
Schema schema = 3;
repeated LogicalExprNode filters = 4;
bytes custom_table_data = 5;
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
repeated LogicalExprNode expr = 2;
oneof optional_alias {
string alias = 3;
}
}

message SelectionNode {
LogicalPlanNode input = 1;
datafusion.LogicalExprNode expr = 2;
LogicalExprNode expr = 2;
}

message SortNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
repeated LogicalExprNode expr = 2;
// Maximum number of highest/lowest rows to fetch; negative means no limit
int64 fetch = 3;
}
Expand All @@ -159,7 +159,7 @@ message RepartitionNode {
}

message HashRepartition {
repeated datafusion.LogicalExprNode hash_expr = 1;
repeated LogicalExprNode hash_expr = 1;
uint64 partition_count = 2;
}

Expand All @@ -173,7 +173,7 @@ message CreateExternalTableNode {
string location = 2;
string file_type = 3;
bool has_header = 4;
datafusion.DfSchema schema = 5;
DfSchema schema = 5;
repeated string table_partition_cols = 6;
bool if_not_exists = 7;
string delimiter = 8;
Expand All @@ -191,13 +191,13 @@ message PrepareNode {
message CreateCatalogSchemaNode {
string schema_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
DfSchema schema = 3;
}

message CreateCatalogNode {
string catalog_name = 1;
bool if_not_exists = 2;
datafusion.DfSchema schema = 3;
DfSchema schema = 3;
}

message CreateViewNode {
Expand All @@ -212,7 +212,7 @@ message CreateViewNode {
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
message ValuesNode {
uint64 n_cols = 1;
repeated datafusion.LogicalExprNode values_list = 2;
repeated LogicalExprNode values_list = 2;
}

message AnalyzeNode {
Expand All @@ -227,13 +227,13 @@ message ExplainNode {

message AggregateNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode group_expr = 2;
repeated datafusion.LogicalExprNode aggr_expr = 3;
repeated LogicalExprNode group_expr = 2;
repeated LogicalExprNode aggr_expr = 3;
}

message WindowNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode window_expr = 2;
repeated LogicalExprNode window_expr = 2;
}

enum JoinType {
Expand Down Expand Up @@ -285,7 +285,7 @@ message LimitNode {
}

message SelectionExecNode {
datafusion.LogicalExprNode expr = 1;
LogicalExprNode expr = 1;
}

message SubqueryAliasNode {
Expand Down Expand Up @@ -679,7 +679,7 @@ message WindowFrameBound {
///////////////////////////////////////////////////////////////////////////////////////////////////

message Schema {
repeated datafusion.Field columns = 1;
repeated Field columns = 1;
}

message Field {
Expand Down Expand Up @@ -993,7 +993,7 @@ message PhysicalExprNode {
// column references
PhysicalColumn column = 1;

datafusion.ScalarValue literal = 2;
ScalarValue literal = 2;

// binary expressions
PhysicalBinaryExprNode binary_expr = 3;
Expand Down Expand Up @@ -1026,19 +1026,19 @@ message PhysicalExprNode {
message PhysicalScalarUdfNode {
string name = 1;
repeated PhysicalExprNode args = 2;
datafusion.ArrowType return_type = 4;
ArrowType return_type = 4;
}

message PhysicalAggregateExprNode {
datafusion.AggregateFunction aggr_function = 1;
AggregateFunction aggr_function = 1;
repeated PhysicalExprNode expr = 2;
bool distinct = 3;
}

message PhysicalWindowExprNode {
oneof window_function {
datafusion.AggregateFunction aggr_function = 1;
datafusion.BuiltInWindowFunction built_in_function = 2;
AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
// udaf = 3
}
PhysicalExprNode expr = 4;
Expand Down Expand Up @@ -1098,19 +1098,19 @@ message PhysicalCaseNode {

message PhysicalScalarFunctionNode {
string name = 1;
datafusion.ScalarFunction fun = 2;
ScalarFunction fun = 2;
repeated PhysicalExprNode args = 3;
datafusion.ArrowType return_type = 4;
ArrowType return_type = 4;
}

message PhysicalTryCastNode {
PhysicalExprNode expr = 1;
datafusion.ArrowType arrow_type = 2;
ArrowType arrow_type = 2;
}

message PhysicalCastNode {
PhysicalExprNode expr = 1;
datafusion.ArrowType arrow_type = 2;
ArrowType arrow_type = 2;
}

message PhysicalNegativeNode {
Expand All @@ -1133,17 +1133,24 @@ message ScanLimit {

message FileScanExecConf {
repeated FileGroup file_groups = 1;
datafusion.Schema schema = 2;
Schema schema = 2;
repeated uint32 projection = 4;
ScanLimit limit = 5;
Statistics statistics = 6;
repeated string table_partition_cols = 7;
string object_store_url = 8;
repeated PhysicalSortExprNode output_ordering = 9;
repeated ConfigOption options = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I general I am not sure it is a good idea to serialize the config options as upon deserialization they will be their own copy.

However that being said, perhaps the issue is that FileScanExecConfig has a copy of the config options in the first place. I think that should get better over time as we consolidate the configuration more

}

message ConfigOption {
string key = 1;
ScalarValue value = 2;
}

message ParquetScanExecNode {
FileScanExecConf base_conf = 1;
datafusion.LogicalExprNode pruning_predicate = 2;
LogicalExprNode pruning_predicate = 2;
}

message CsvScanExecNode {
Expand All @@ -1166,7 +1173,7 @@ message HashJoinExecNode {
PhysicalPlanNode left = 1;
PhysicalPlanNode right = 2;
repeated JoinOn on = 3;
datafusion.JoinType join_type = 4;
JoinType join_type = 4;
PartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
Expand All @@ -1177,8 +1184,8 @@ message UnionExecNode {
}

message ExplainExecNode {
datafusion.Schema schema = 1;
repeated datafusion.StringifiedPlan stringified_plans = 2;
Schema schema = 1;
repeated StringifiedPlan stringified_plans = 2;
bool verbose = 3;
}

Expand All @@ -1199,7 +1206,7 @@ message JoinOn {

message EmptyExecNode {
bool produce_one_row = 1;
datafusion.Schema schema = 2;
Schema schema = 2;
}

message ProjectionExecNode {
Expand All @@ -1218,7 +1225,7 @@ message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalExprNode window_expr = 2;
repeated string window_expr_name = 3;
datafusion.Schema input_schema = 4;
Schema input_schema = 4;
}

message AggregateExecNode {
Expand All @@ -1229,7 +1236,7 @@ message AggregateExecNode {
repeated string group_expr_name = 5;
repeated string aggr_expr_name = 6;
// we need the input schema to the partial aggregate to pass to the final aggregate
datafusion.Schema input_schema = 7;
Schema input_schema = 7;
repeated PhysicalExprNode null_expr = 8;
repeated bool groups = 9;
}
Expand Down Expand Up @@ -1285,7 +1292,7 @@ message RepartitionExecNode{
message JoinFilter{
PhysicalExprNode expression = 1;
repeated ColumnIndex column_indices = 2;
datafusion.Schema schema = 3;
Schema schema = 3;
}

message ColumnIndex{
Expand Down Expand Up @@ -1330,77 +1337,4 @@ message ColumnStats {
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
}

message PartitionLocation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 -- I believe these are ballista specific

// partition_id of the map stage who produces the shuffle.
uint32 map_partition_id = 1;
// partition_id of the shuffle, a composition of(job_id + map_stage_id + partition_id).
PartitionId partition_id = 2;
ExecutorMetadata executor_meta = 3;
PartitionStats partition_stats = 4;
string path = 5;
}

// Unique identifier for a materialized partition of data
message PartitionId {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 4;
}

// Used by scheduler
message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

// Used by grpc
message ExecutorRegistration {
string id = 1;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof optional_host {
string host = 2;
}
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

message ExecutorHeartbeat {
string executor_id = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
repeated ExecutorMetric metrics = 3;
ExecutorStatus status = 4;
}

message ExecutorSpecification {
repeated ExecutorResource resources = 1;
}

message ExecutorResource {
// TODO add more resources
oneof resource {
uint32 task_slots = 1;
}
}

message ExecutorMetric {
// TODO add more metrics
oneof metric {
uint64 available_memory = 1;
}
}

message ExecutorStatus {
oneof status {
string active = 1;
string dead = 2;
string unknown = 3;
}
}
}
Loading