Skip to content

Commit fdab75c

Browse files
feat: simple read write new json type values (#7175)
feat: basic json read and write Signed-off-by: luofucong <[email protected]>
1 parent 4c07d2d commit fdab75c

File tree

35 files changed

+1049
-289
lines changed

35 files changed

+1049
-289
lines changed

src/api/src/helper.rs

Lines changed: 30 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,9 @@ use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth
2323
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
2424
use datatypes::prelude::{ConcreteDataType, ValueRef};
2525
use datatypes::types::{
26-
IntervalType, JsonFormat, StructField, StructType, TimeType, TimestampType,
27-
};
28-
use datatypes::value::{
29-
ListValue, ListValueRef, OrderedF32, OrderedF64, StructValue, StructValueRef, Value,
26+
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
3027
};
28+
use datatypes::value::{ListValueRef, OrderedF32, OrderedF64, StructValueRef, Value};
3129
use datatypes::vectors::VectorRef;
3230
use greptime_proto::v1::column_data_type_extension::TypeExt;
3331
use greptime_proto::v1::ddl_request::Expr;
@@ -82,6 +80,10 @@ impl ColumnDataTypeWrapper {
8280
pub fn to_parts(&self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
8381
(self.datatype, self.datatype_ext.clone())
8482
}
83+
84+
pub fn into_parts(self) -> (ColumnDataType, Option<ColumnDataTypeExtension>) {
85+
(self.datatype, self.datatype_ext)
86+
}
8587
}
8688

8789
impl From<ColumnDataTypeWrapper> for ConcreteDataType {
@@ -127,6 +129,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
127129
};
128130
ConcreteDataType::json_native_datatype(inner_type.into())
129131
}
132+
None => ConcreteDataType::Json(JsonType::null()),
130133
_ => {
131134
// invalid state, type extension is missing or invalid
132135
ConcreteDataType::null_datatype()
@@ -441,18 +444,22 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
441444
JsonFormat::Jsonb => Some(ColumnDataTypeExtension {
442445
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
443446
}),
444-
JsonFormat::Native(inner) => {
445-
let inner_type = ColumnDataTypeWrapper::try_from(
446-
ConcreteDataType::from(inner.as_ref()),
447-
)?;
448-
Some(ColumnDataTypeExtension {
449-
type_ext: Some(TypeExt::JsonNativeType(Box::new(
450-
JsonNativeTypeExtension {
451-
datatype: inner_type.datatype.into(),
452-
datatype_extension: inner_type.datatype_ext.map(Box::new),
453-
},
454-
))),
455-
})
447+
JsonFormat::Native(native_type) => {
448+
if native_type.is_null() {
449+
None
450+
} else {
451+
let native_type = ConcreteDataType::from(native_type.as_ref());
452+
let (datatype, datatype_extension) =
453+
ColumnDataTypeWrapper::try_from(native_type)?.into_parts();
454+
Some(ColumnDataTypeExtension {
455+
type_ext: Some(TypeExt::JsonNativeType(Box::new(
456+
JsonNativeTypeExtension {
457+
datatype: datatype as i32,
458+
datatype_extension: datatype_extension.map(Box::new),
459+
},
460+
))),
461+
})
462+
}
456463
}
457464
}
458465
} else {
@@ -887,111 +894,6 @@ pub fn is_column_type_value_eq(
887894
.unwrap_or(false)
888895
}
889896

890-
/// Convert value into proto's value.
891-
pub fn to_proto_value(value: Value) -> v1::Value {
892-
match value {
893-
Value::Null => v1::Value { value_data: None },
894-
Value::Boolean(v) => v1::Value {
895-
value_data: Some(ValueData::BoolValue(v)),
896-
},
897-
Value::UInt8(v) => v1::Value {
898-
value_data: Some(ValueData::U8Value(v.into())),
899-
},
900-
Value::UInt16(v) => v1::Value {
901-
value_data: Some(ValueData::U16Value(v.into())),
902-
},
903-
Value::UInt32(v) => v1::Value {
904-
value_data: Some(ValueData::U32Value(v)),
905-
},
906-
Value::UInt64(v) => v1::Value {
907-
value_data: Some(ValueData::U64Value(v)),
908-
},
909-
Value::Int8(v) => v1::Value {
910-
value_data: Some(ValueData::I8Value(v.into())),
911-
},
912-
Value::Int16(v) => v1::Value {
913-
value_data: Some(ValueData::I16Value(v.into())),
914-
},
915-
Value::Int32(v) => v1::Value {
916-
value_data: Some(ValueData::I32Value(v)),
917-
},
918-
Value::Int64(v) => v1::Value {
919-
value_data: Some(ValueData::I64Value(v)),
920-
},
921-
Value::Float32(v) => v1::Value {
922-
value_data: Some(ValueData::F32Value(*v)),
923-
},
924-
Value::Float64(v) => v1::Value {
925-
value_data: Some(ValueData::F64Value(*v)),
926-
},
927-
Value::String(v) => v1::Value {
928-
value_data: Some(ValueData::StringValue(v.as_utf8().to_string())),
929-
},
930-
Value::Binary(v) => v1::Value {
931-
value_data: Some(ValueData::BinaryValue(v.to_vec())),
932-
},
933-
Value::Date(v) => v1::Value {
934-
value_data: Some(ValueData::DateValue(v.val())),
935-
},
936-
Value::Timestamp(v) => match v.unit() {
937-
TimeUnit::Second => v1::Value {
938-
value_data: Some(ValueData::TimestampSecondValue(v.value())),
939-
},
940-
TimeUnit::Millisecond => v1::Value {
941-
value_data: Some(ValueData::TimestampMillisecondValue(v.value())),
942-
},
943-
TimeUnit::Microsecond => v1::Value {
944-
value_data: Some(ValueData::TimestampMicrosecondValue(v.value())),
945-
},
946-
TimeUnit::Nanosecond => v1::Value {
947-
value_data: Some(ValueData::TimestampNanosecondValue(v.value())),
948-
},
949-
},
950-
Value::Time(v) => match v.unit() {
951-
TimeUnit::Second => v1::Value {
952-
value_data: Some(ValueData::TimeSecondValue(v.value())),
953-
},
954-
TimeUnit::Millisecond => v1::Value {
955-
value_data: Some(ValueData::TimeMillisecondValue(v.value())),
956-
},
957-
TimeUnit::Microsecond => v1::Value {
958-
value_data: Some(ValueData::TimeMicrosecondValue(v.value())),
959-
},
960-
TimeUnit::Nanosecond => v1::Value {
961-
value_data: Some(ValueData::TimeNanosecondValue(v.value())),
962-
},
963-
},
964-
Value::IntervalYearMonth(v) => v1::Value {
965-
value_data: Some(ValueData::IntervalYearMonthValue(v.to_i32())),
966-
},
967-
Value::IntervalDayTime(v) => v1::Value {
968-
value_data: Some(ValueData::IntervalDayTimeValue(v.to_i64())),
969-
},
970-
Value::IntervalMonthDayNano(v) => v1::Value {
971-
value_data: Some(ValueData::IntervalMonthDayNanoValue(
972-
convert_month_day_nano_to_pb(v),
973-
)),
974-
},
975-
Value::Decimal128(v) => v1::Value {
976-
value_data: Some(ValueData::Decimal128Value(convert_to_pb_decimal128(v))),
977-
},
978-
Value::List(list_value) => v1::Value {
979-
value_data: Some(ValueData::ListValue(v1::ListValue {
980-
items: convert_list_to_pb_values(list_value),
981-
})),
982-
},
983-
Value::Struct(struct_value) => v1::Value {
984-
value_data: Some(ValueData::StructValue(v1::StructValue {
985-
items: convert_struct_to_pb_values(struct_value),
986-
})),
987-
},
988-
Value::Json(v) => v1::Value {
989-
value_data: Some(ValueData::JsonValue(encode_json_value(*v))),
990-
},
991-
Value::Duration(_) => v1::Value { value_data: None },
992-
}
993-
}
994-
995897
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
996898
fn helper(json: JsonVariant) -> v1::JsonValue {
997899
let value = match json {
@@ -1052,22 +954,6 @@ fn decode_json_value(value: &v1::JsonValue) -> JsonValueRef<'_> {
1052954
}
1053955
}
1054956

1055-
fn convert_list_to_pb_values(list_value: ListValue) -> Vec<v1::Value> {
1056-
list_value
1057-
.take_items()
1058-
.into_iter()
1059-
.map(to_proto_value)
1060-
.collect()
1061-
}
1062-
1063-
fn convert_struct_to_pb_values(struct_value: StructValue) -> Vec<v1::Value> {
1064-
struct_value
1065-
.take_items()
1066-
.into_iter()
1067-
.map(to_proto_value)
1068-
.collect()
1069-
}
1070-
1071957
/// Returns the [ColumnDataTypeWrapper] of the value.
1072958
///
1073959
/// If value is null, returns `None`.
@@ -1114,14 +1000,14 @@ pub fn vectors_to_rows<'a>(
11141000
let mut rows = vec![Row { values: vec![] }; row_count];
11151001
for column in columns {
11161002
for (row_index, row) in rows.iter_mut().enumerate() {
1117-
row.values.push(value_to_grpc_value(column.get(row_index)))
1003+
row.values.push(to_grpc_value(column.get(row_index)))
11181004
}
11191005
}
11201006

11211007
rows
11221008
}
11231009

1124-
pub fn value_to_grpc_value(value: Value) -> GrpcValue {
1010+
pub fn to_grpc_value(value: Value) -> GrpcValue {
11251011
GrpcValue {
11261012
value_data: match value {
11271013
Value::Null => None,
@@ -1161,15 +1047,15 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
11611047
let items = list_value
11621048
.take_items()
11631049
.into_iter()
1164-
.map(value_to_grpc_value)
1050+
.map(to_grpc_value)
11651051
.collect();
11661052
Some(ValueData::ListValue(v1::ListValue { items }))
11671053
}
11681054
Value::Struct(struct_value) => {
11691055
let items = struct_value
11701056
.take_items()
11711057
.into_iter()
1172-
.map(value_to_grpc_value)
1058+
.map(to_grpc_value)
11731059
.collect();
11741060
Some(ValueData::StructValue(v1::StructValue { items }))
11751061
}
@@ -1269,6 +1155,7 @@ mod tests {
12691155
use common_time::interval::IntervalUnit;
12701156
use datatypes::scalars::ScalarVector;
12711157
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
1158+
use datatypes::value::{ListValue, StructValue};
12721159
use datatypes::vectors::{
12731160
BooleanVector, DateVector, Float32Vector, PrimitiveVector, StringVector,
12741161
};
@@ -1872,7 +1759,7 @@ mod tests {
18721759
Arc::new(ConcreteDataType::boolean_datatype()),
18731760
));
18741761

1875-
let pb_value = to_proto_value(value);
1762+
let pb_value = to_grpc_value(value);
18761763

18771764
match pb_value.value_data.unwrap() {
18781765
ValueData::ListValue(pb_list_value) => {
@@ -1901,7 +1788,7 @@ mod tests {
19011788
.unwrap(),
19021789
);
19031790

1904-
let pb_value = to_proto_value(value);
1791+
let pb_value = to_grpc_value(value);
19051792

19061793
match pb_value.value_data.unwrap() {
19071794
ValueData::StructValue(pb_struct_value) => {

src/common/recordbatch/src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,13 @@ pub enum Error {
188188
#[snafu(implicit)]
189189
location: Location,
190190
},
191+
192+
#[snafu(display("Failed to align JSON array, reason: {reason}"))]
193+
AlignJsonArray {
194+
reason: String,
195+
#[snafu(implicit)]
196+
location: Location,
197+
},
191198
}
192199

193200
impl ErrorExt for Error {
@@ -203,7 +210,8 @@ impl ErrorExt for Error {
203210
| Error::ToArrowScalar { .. }
204211
| Error::ProjectArrowRecordBatch { .. }
205212
| Error::PhysicalExpr { .. }
206-
| Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal,
213+
| Error::RecordBatchSliceIndexOverflow { .. }
214+
| Error::AlignJsonArray { .. } => StatusCode::Internal,
207215

208216
Error::PollStream { .. } => StatusCode::EngineExecuteQuery,
209217

src/common/recordbatch/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub mod adapter;
1818
pub mod cursor;
1919
pub mod error;
2020
pub mod filter;
21-
mod recordbatch;
21+
pub mod recordbatch;
2222
pub mod util;
2323

2424
use std::fmt;

0 commit comments

Comments
 (0)