diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfo.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfo.scala index 7035ce541..79f49c5d7 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfo.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfo.scala @@ -1,7 +1,11 @@ package edu.uci.ics.cloudberry.zion.model.impl -import edu.uci.ics.cloudberry.zion.model.schema.{Query, Schema} +import edu.uci.ics.cloudberry.zion.model.datastore.JsonRequestException +import edu.uci.ics.cloudberry.zion.model.schema._ import org.joda.time.{DateTime, Interval} +import play.api.libs.functional.syntax._ +import play.api.libs.json.Reads._ +import play.api.libs.json._ case class Stats(createTime: DateTime, lastModifyTime: DateTime, @@ -13,3 +17,128 @@ case class DataSetInfo(name: String, schema: Schema, dataInterval: Interval, stats: Stats) + +object DataSetInfo { + + def parse(json: JsValue): DataSetInfo = { + json.validate[DataSetInfo] match { + case js: JsSuccess[DataSetInfo] => js.get + case e: JsError => throw JsonRequestException(JsError.toJson(e).toString()) + } + } + + def write(dataSetInfo: DataSetInfo): JsValue = Json.toJson(dataSetInfo) + + implicit val intervalFormat: Format[Interval] = new Format[Interval] { + override def reads(json: JsValue) = { + val start = (json \ "start").as[DateTime] + val end = (json \ "end").as[DateTime] + JsSuccess(new Interval(start.getMillis, end.getMillis)) + } + + override def writes(interval: Interval): JsValue = { + JsObject(List("start" -> Json.toJson(interval.getStart), "end" -> Json.toJson(interval.getEnd))) + } + } + + //Used by: HierarchyField: "levels" -> Json.toJson(hierarchy.levels))) to write from Seq[(String,String)] to JSON + implicit def tuple2Writes: Writes[Tuple2[String, String]] = Writes[(String, String)](t => Json.obj("level" -> t._1, "field" -> t._2)) + + def parseLevels(levelSeq: Seq[Map[String, String]]): Seq[(String, String)] = { + levelSeq.map { + levelMap => (levelMap.get("level").get, levelMap.get("field").get) + } + } + + implicit val fieldFormat: Format[Field] = new Format[Field] { + override def reads(json: JsValue): JsResult[Field] = { + val name = (json \ "name").as[String] + val isOptional = (json \ "isOptional").as[Boolean] + DataType.withName((json \ "datatype").as[String]) match { + case DataType.Number => + JsSuccess(NumberField(name, isOptional)) + case DataType.Record => ??? + //TODO think about Record type later + case DataType.Point => + JsSuccess(PointField(name, isOptional)) + case DataType.Bag => + val innerType = (json \ "innerType").as[String] + JsSuccess(BagField(name, DataType.withName(innerType), isOptional)) + case DataType.Boolean => + JsSuccess(BooleanField(name, isOptional)) + case DataType.Hierarchy => + val innerType = (json \ "innerType").as[String] + val levelSeq = (json \ "levels").as[Seq[Map[String, String]]] + JsSuccess(HierarchyField(name, DataType.withName(innerType), parseLevels(levelSeq))) + case DataType.Text => + JsSuccess(TextField(name, isOptional)) + case DataType.String => + JsSuccess(StringField(name, isOptional)) + case DataType.Time => + JsSuccess(TimeField(name, isOptional)) + case unknown: DataType.Value => JsError(s"field datatype invalid: $unknown") + } + } + + override def writes(field: Field): JsValue = { + val name = field.name + val isOptional = field.isOptional + val dataType = field.dataType.toString + field match { + case record: RecordField => JsNull + case bag: BagField => JsObject(List( + "name" -> JsString(name), + "isOptional" -> JsBoolean(isOptional), + "datatype" -> JsString(dataType), + "innerType" -> JsString(bag.innerType.toString))) + case hierarchy: HierarchyField => JsObject(List( + "name" -> JsString(name), + "isOptional" -> JsBoolean(isOptional), + "datatype" -> JsString(dataType), + "innerType" -> JsString(hierarchy.innerType.toString), + "levels" -> Json.toJson(hierarchy.levels))) + case basicField: Field => JsObject(List( + "name" -> JsString(name), + "isOptional" -> JsBoolean(isOptional), + "datatype" -> JsString(dataType))) + case any: Field => throw JsonRequestException(s"Field $any unsupported") + } + + } + } + + implicit val datetimeFormat: Format[DateTime] = new Format[DateTime] { + override def reads(json: JsValue) = { + val datetime = IQuery.TimeFormat.parseDateTime(json.as[String]) + JsSuccess(datetime) + } + + override def writes(dateTime: DateTime): JsValue = JsString(dateTime.toString(IQuery.TimeFormat)) + + } + + implicit val statsFormat: Format[Stats] = ( + (JsPath \ "createTime").format[DateTime] and + (JsPath \ "lastModifyTime").format[DateTime] and + (JsPath \ "lastReadTime").format[DateTime] and + (JsPath \ "cardinality").format[Int] + ) (Stats.apply, unlift(Stats.unapply)) + + implicit val queryFormat: Format[Query] = JSONParser.queryFormat + + implicit val schemaFormat: Format[Schema] = ( + (JsPath \ "typeName").format[String] and + (JsPath \ "dimension").format[Seq[Field]] and + (JsPath \ "measurement").format[Seq[Field]] and + (JsPath \ "primaryKey").format[Seq[String]] and + (JsPath \ "timeField").format[String] + ) (Schema.apply, unlift(Schema.unapply)) + + implicit val dataSetInfoFormat: Format[DataSetInfo] = ( + (JsPath \ "name").format[String] and + (JsPath \ "createQuery").formatNullable[Query] and + (JsPath \ "schema").format[Schema] and + (JsPath \ "dataInterval").format[Interval] and + (JsPath \ "stats").format[Stats] + ) (DataSetInfo.apply, unlift(DataSetInfo.unapply)) +} \ No newline at end of file diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParser.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParser.scala index 07d266760..03dbc4389 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParser.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParser.scala @@ -4,8 +4,7 @@ import edu.uci.ics.cloudberry.zion.model.datastore.{IJSONParser, JsonRequestExce import edu.uci.ics.cloudberry.zion.model.schema.Relation.Relation import edu.uci.ics.cloudberry.zion.model.schema._ import play.api.libs.functional.syntax._ -import play.api.libs.json.Reads._ -import play.api.libs.json._ +import play.api.libs.json.{JsObject, _} class JSONParser extends IJSONParser { @@ -22,7 +21,7 @@ class JSONParser extends IJSONParser { object JSONParser { //Warn: the order of implicit values matters. The dependence should be initialized earlier - implicit val seqAnyValue: Reads[Seq[Any]] = new Reads[Seq[Any]] { + implicit val seqAnyValue: Format[Seq[Any]] = new Format[Seq[Any]] { override def reads(json: JsValue): JsResult[Seq[Any]] = { json.asOpt[JsArray] match { case Some(array) => @@ -51,13 +50,30 @@ object JSONParser { case None => JsSuccess(Seq.empty) } } + + override def writes(seq: Seq[Any]): JsValue = { + JsArray(seq.map { + case b: Boolean => JsBoolean(b) + case s: String => JsString(s) + case i: Int => JsNumber(i) + case d: Double => JsNumber(d) + case l: Long => JsNumber(l) + case fs: FilterStatement => filterFormat.writes(fs) + case by: ByStatement => byFormat.writes(by) + case ags: AggregateStatement => aggFormat.writes(ags) + case unS: UnnestStatement => unnestFormat.writes(unS) + case other: JsValue => throw JsonRequestException(s"unknown data type: $other") + }) + } } - implicit val transformFuncReads: Reads[TransformFunc] = new Reads[TransformFunc] { + implicit val transformFuncFormat: Format[TransformFunc] = new Format[TransformFunc] { override def reads(json: JsValue): JsResult[TransformFunc] = ??? + + override def writes(transformFunc: TransformFunc): JsValue = ??? } - implicit val relationReads: Reads[Relation] = new Reads[Relation] { + implicit val relationFormat: Format[Relation] = new Format[Relation] { override def reads(json: JsValue): JsResult[Relation] = { try { JsSuccess(Relation.withName(json.as[String])) @@ -65,12 +81,15 @@ object JSONParser { case e: NoSuchElementException => JsError(s"unknown relation: $json") } } - } + override def writes(relation: Relation): JsValue = JsString(relation.toString) + } - implicit val groupFuncReads: Reads[GroupFunc] = new Reads[GroupFunc] { + implicit val groupFuncFormat: Format[GroupFunc] = new Format[GroupFunc] { override def reads(json: JsValue): JsResult[GroupFunc] = (json \ "name").as[String] match { - case GroupFunc.Bin => ??? + case GroupFunc.Bin => + val scale = (json \ "args" \ "scale").as[Int] + JsSuccess(Bin(scale)) case GroupFunc.Level => val level = (json \ "args" \ "level").as[String] JsSuccess(Level(level)) @@ -89,83 +108,112 @@ object JSONParser { case GroupFunc.GeoCellThousandth => JsSuccess(GeoCellThousandth) case unknown: String => JsError(s"group function not found: $unknown") } + + override def writes(groupFunc: GroupFunc): JsValue = { + groupFunc match { + case fBin: Bin => JsObject(Seq("name" -> JsString(fBin.name), "args" -> JsObject(Seq("scale" -> JsNumber(fBin.scale))))) + case fLevel: Level => JsObject(Seq("name" -> JsString(fLevel.name), "args" -> JsObject(Seq("level" -> JsString(fLevel.levelTag))))) + case fInterval: Interval => JsObject(Seq("name" -> JsString(fInterval.name), "args" -> JsObject(Seq("unit" -> JsString(fInterval.unit.toString))))) + case fGeoCellScale: GeoCellScale => JsObject(Seq("name" -> JsString(groupFunc.name))) + } + } } - implicit val aggFuncReads: Reads[AggregateFunc] = new Reads[AggregateFunc] { + implicit val aggFuncFormat: Format[AggregateFunc] = new Format[AggregateFunc] { override def reads(json: JsValue): JsResult[AggregateFunc] = { (json \ "name").as[String] match { case AggregateFunc.Count => JsSuccess(Count) case AggregateFunc.TopK => ??? - case AggregateFunc.Sum => ??? + case AggregateFunc.Sum => JsSuccess(Sum) case AggregateFunc.Max => JsSuccess(Max) case AggregateFunc.Min => JsSuccess(Min) - case AggregateFunc.Avg => ??? + case AggregateFunc.Avg => JsSuccess(Avg) case AggregateFunc.DistinctCount => ??? case unknown: String => JsError(s"unknown aggregation function: $unknown") } } + + override def writes(aggregateFunc: AggregateFunc): JsValue = { + JsObject(List("name" -> JsString(aggregateFunc.name))) + } } - implicit val aggReads: Reads[AggregateStatement] = { - (JsPath \ "field").read[String] and - (JsPath \ "apply").read[AggregateFunc] and - (JsPath \ "as").read[String] - }.apply(AggregateStatement.apply _) - - implicit val byReads: Reads[ByStatement] = { - (JsPath \ "field").read[String] and - (JsPath \ "apply").readNullable[GroupFunc] and - (JsPath \ "as").readNullable[String] - }.apply(ByStatement.apply _) - - implicit val groupReads: Reads[GroupStatement] = { - (JsPath \ "by").read[Seq[ByStatement]] and - (JsPath \ "aggregate").read[Seq[AggregateStatement]] - }.apply(GroupStatement.apply _) - - implicit val globalReads: Reads[GlobalAggregateStatement] = { - (JsPath \ "globalAggregate").read[AggregateStatement].map(GlobalAggregateStatement.apply) + implicit val aggFormat: Format[AggregateStatement] = ( + (JsPath \ "field").format[String] and + (JsPath \ "apply").format[AggregateFunc] and + (JsPath \ "as").format[String] + ) (AggregateStatement.apply, unlift(AggregateStatement.unapply)) + + implicit val byFormat: Format[ByStatement] = ( + (JsPath \ "field").format[String] and + (JsPath \ "apply").formatNullable[GroupFunc] and + (JsPath \ "as").formatNullable[String] + ) (ByStatement.apply, unlift(ByStatement.unapply)) + + implicit val groupFormat: Format[GroupStatement] = ( + (JsPath \ "by").format[Seq[ByStatement]] and + (JsPath \ "aggregate").format[Seq[AggregateStatement]] + ) (GroupStatement.apply, unlift(GroupStatement.unapply)) + + implicit val globalFormat: Format[GlobalAggregateStatement] = { + (JsPath \ "globalAggregate").format[AggregateStatement].inmap(GlobalAggregateStatement.apply, unlift(GlobalAggregateStatement.unapply)) } - implicit val selectReads: Reads[SelectStatement] = { - (JsPath \ "order").read[Seq[String]] and - (JsPath \ "limit").read[Int] and - (JsPath \ "offset").read[Int] and - (JsPath \ "field").readNullable[Seq[String]].map(_.getOrElse(Seq.empty)) - }.apply(SelectStatement.apply _) - - implicit val lookupReads: Reads[LookupStatement] = { - (JsPath \ "sourceKey").read[Seq[String]] and - (JsPath \ "dataset").read[String] and - (JsPath \ "lookupKey").read[Seq[String]] and - (JsPath \ "select").read[Seq[String]] and - (JsPath \ "as").read[Seq[String]] - }.apply(LookupStatement.apply _) - - implicit val unnestReads: Reads[Seq[UnnestStatement]] = new Reads[Seq[UnnestStatement]] { - override def reads(json: JsValue): JsResult[Seq[UnnestStatement]] = { + implicit val selectFormat: Format[SelectStatement] = ( + (JsPath \ "order").format[Seq[String]] and + (JsPath \ "limit").format[Int] and + (JsPath \ "offset").format[Int] and + (JsPath \ "field").formatNullable[Seq[String]].inmap[Seq[String]]( + o => o.getOrElse(Seq.empty[String]), + s => if (s.isEmpty) None else Some(s) + ) + ) (SelectStatement.apply, unlift(SelectStatement.unapply)) + + implicit val lookupFormat: Format[LookupStatement] = ( + (JsPath \ "sourceKey").format[Seq[String]] and + (JsPath \ "dataset").format[String] and + (JsPath \ "lookupKey").format[Seq[String]] and + (JsPath \ "select").format[Seq[String]] and + (JsPath \ "as").format[Seq[String]] + ) (LookupStatement.apply, unlift(LookupStatement.unapply)) + + implicit val unnestFormat: Format[UnnestStatement] = new Format[UnnestStatement] { + override def reads(json: JsValue): JsResult[UnnestStatement] = { JsSuccess(json.as[JsObject].value.map { case (key, jsValue: JsValue) => UnnestStatement(key, jsValue.as[String]) - }.toSeq) + }.head) + } + + override def writes(unnestStatement: UnnestStatement): JsValue = { + JsObject(Seq(unnestStatement.fieldName -> JsString(unnestStatement.as))) } } - implicit val filterReads: Reads[FilterStatement] = { - (JsPath \ "field").read[String] and - (JsPath \ "apply").readNullable[TransformFunc] and - (JsPath \ "relation").read[Relation] and - (JsPath \ "values").read[Seq[Any]] - }.apply(FilterStatement.apply _) + implicit val filterFormat: Format[FilterStatement] = ( + (JsPath \ "field").format[String] and + (JsPath \ "apply").formatNullable[TransformFunc] and + (JsPath \ "relation").format[Relation] and + (JsPath \ "values").format[Seq[Any]] + ) (FilterStatement.apply, unlift(FilterStatement.unapply)) // TODO find better name for 'global' - implicit val queryReads: Reads[Query] = { - (JsPath \ "dataset").read[String] and - (JsPath \ "lookup").readNullable[Seq[LookupStatement]].map(_.getOrElse(Seq.empty)) and - (JsPath \ "filter").readNullable[Seq[FilterStatement]].map(_.getOrElse(Seq.empty)) and - (JsPath \ "unnest").readNullable[Seq[UnnestStatement]].map(_.getOrElse(Seq.empty)) and - (JsPath \ "group").readNullable[GroupStatement] and - (JsPath \ "select").readNullable[SelectStatement] and - (JsPath \ "global").readNullable[GlobalAggregateStatement] - }.apply(Query.apply _) + implicit val queryFormat: Format[Query] = ( + (JsPath \ "dataset").format[String] and + (JsPath \ "lookup").formatNullable[Seq[LookupStatement]].inmap[Seq[LookupStatement]]( + o => o.getOrElse(Seq.empty[LookupStatement]), + s => if (s.isEmpty) None else Some(s) + ) and + (JsPath \ "filter").formatNullable[Seq[FilterStatement]].inmap[Seq[FilterStatement]]( + o => o.getOrElse(Seq.empty[FilterStatement]), + s => if (s.isEmpty) None else Some(s) + ) and + (JsPath \ "unnest").formatNullable[Seq[UnnestStatement]].inmap[Seq[UnnestStatement]]( + o => o.getOrElse(Seq.empty[UnnestStatement]), + s => if (s.isEmpty) None else Some(s) + ) and + (JsPath \ "group").formatNullable[GroupStatement] and + (JsPath \ "select").formatNullable[SelectStatement] and + (JsPath \ "global").formatNullable[GlobalAggregateStatement] + ) (Query.apply, unlift(Query.unapply)) } diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala index e0f9a98da..97503cc52 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Query.scala @@ -10,7 +10,7 @@ trait IQuery { } object IQuery { - val TimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'") + val TimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") } case class Query(dataset: String, diff --git a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Schema.scala b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Schema.scala index 136e4f753..663b77c11 100644 --- a/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Schema.scala +++ b/zion/src/main/scala/edu/uci/ics/cloudberry/zion/model/schema/Schema.scala @@ -1,12 +1,19 @@ package edu.uci.ics.cloudberry.zion.model.schema import edu.uci.ics.cloudberry.zion.model.schema.DataType.DataType -import org.joda.time.DateTime //TODO support nested type object DataType extends Enumeration { type DataType = Value - val Number, Time, Point, Boolean, String, Text, Bag, Hierarchy, Record = Value + val Number = Value("Number") + val Time = Value("Time") + val Point = Value("Point") + val Boolean = Value("Boolean") + val String = Value("String") + val Text = Value("Text") + val Bag = Value("Bag") + val Hierarchy = Value("Hierarchy") + val Record = Value("Record") } object Relation extends Enumeration { diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/AQLGeneratorTest.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/AQLGeneratorTest.scala index d790cd1d8..b07d64335 100644 --- a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/AQLGeneratorTest.scala +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/AQLGeneratorTest.scala @@ -18,7 +18,7 @@ class AQLGeneratorTest extends Specification { removeEmptyLine(result) must_== unifyNewLine( """ |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |let $taggr := $t |group by $g0 := get-interval-start-datetime(interval-bin($t.'create_at', datetime('1990-01-01T00:00:00.000Z'), day-time-duration("PT1H") )) with $taggr |return { @@ -73,7 +73,7 @@ class AQLGeneratorTest extends Specification { """ |for $t in dataset twitter.ds_tweet |where similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 - |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') and true + |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') and true |for $setgeo_tag_stateID in [ 37,51,24,11,10,34,42,9,44 ] |where $t.'geo_tag'.'stateID' = $setgeo_tag_stateID |let $taggr := $t @@ -92,7 +92,7 @@ class AQLGeneratorTest extends Specification { """ |for $t in dataset twitter.ds_tweet |where similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 - |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') and true + |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') and true |for $setgeo_tag_stateID in [ 37,51,24,11,10,34,42,9,44 ] |where $t.'geo_tag'.'stateID' = $setgeo_tag_stateID |order by $t.'create_at' desc @@ -113,7 +113,7 @@ class AQLGeneratorTest extends Specification { |for $g in ( |for $t in dataset twitter.ds_tweet |where similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 - |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') and true + |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') and true |for $setgeo_tag_stateID in [ 37,51,24,11,10,34,42,9,44 ] |where $t.'geo_tag'.'stateID' = $setgeo_tag_stateID |where not(is-null($t.'hashtags')) @@ -140,7 +140,7 @@ class AQLGeneratorTest extends Specification { removeEmptyLine(result) must_== unifyNewLine( """ |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |let $taggr := $t.'id' |group by $g0 := get-interval-start-datetime(interval-bin($t.'create_at', datetime('1990-01-01T00:00:00.000Z'), day-time-duration("PT1H") )) with $taggr |return { @@ -157,7 +157,7 @@ class AQLGeneratorTest extends Specification { removeEmptyLine(result) must_== unifyNewLine( """ |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |let $taggr := $t.'id' |group by $g0 := get-interval-start-datetime(interval-bin($t.'create_at', datetime('1990-01-01T00:00:00.000Z'), day-time-duration("PT1H") )) with $taggr |return { @@ -174,7 +174,7 @@ class AQLGeneratorTest extends Specification { removeEmptyLine(result) must_== unifyNewLine( """ |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |let $taggr := $t.'id' |group by $g0 := get-interval-start-datetime(interval-bin($t.'create_at', datetime('1990-01-01T00:00:00.000Z'), day-time-duration("PT1H") )) with $taggr |return { @@ -191,7 +191,7 @@ class AQLGeneratorTest extends Specification { removeEmptyLine(result) must_== unifyNewLine( """ |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |let $taggr := $t.'id' |group by $g0 := get-interval-start-datetime(interval-bin($t.'create_at', datetime('1990-01-01T00:00:00.000Z'), day-time-duration("PT1H") )) with $taggr |return { @@ -449,7 +449,7 @@ class AQLGeneratorTest extends Specification { """{"count": count ( |for $c in ( |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |return $t |) |return $c @@ -467,7 +467,7 @@ class AQLGeneratorTest extends Specification { """{"min": min ( |for $c in ( |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') |return $t |) |return $c.'id' @@ -489,7 +489,7 @@ class AQLGeneratorTest extends Specification { |for $g in ( |for $t in dataset twitter.ds_tweet |where similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 - |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') and true + |and contains($t.'text', "virus") and $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') and true |for $setgeo_tag_stateID in [ 37,51,24,11,10,34,42,9,44 ] |where $t.'geo_tag'.'stateID' = $setgeo_tag_stateID |where not(is-null($t.'hashtags')) @@ -594,7 +594,7 @@ class AQLGeneratorTest extends Specification { """ |upsert into dataset zika ( |for $t in dataset twitter.ds_tweet - |where $t.'create_at' >= datetime('2016-01-01T00:00:00Z') and $t.'create_at' < datetime('2016-12-01T00:00:00Z') and similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 + |where $t.'create_at' >= datetime('2016-01-01T00:00:00.000Z') and $t.'create_at' < datetime('2016-12-01T00:00:00.000Z') and similarity-jaccard(word-tokens($t.'text'), word-tokens('zika')) > 0.0 |return $t |) """.stripMargin.trim) diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfoTest.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfoTest.scala new file mode 100644 index 000000000..7192db831 --- /dev/null +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/DataSetInfoTest.scala @@ -0,0 +1,84 @@ +package edu.uci.ics.cloudberry.zion.model.impl + + +import edu.uci.ics.cloudberry.zion.model.impl.TestDataSetInfo._ +import org.specs2.mutable.Specification + +class DataSetInfoTest extends Specification { + + val parser = DataSetInfo + + "DataSetInfo" should { + "read a sample dataSetInfo" in { + val actualDataSetInfoy = parser.parse(simpleDataSetInfoJSON) + val expectDataSetInfo = simpleDataSetInfo + actualDataSetInfoy must_== expectDataSetInfo + } + "read dataSetInfo containing Schema fields" in { + val actualDataSetInfoy = parser.parse(fieldsDataSetInfoJSON) + val expectDataSetInfo = fieldsDataSetInfo + actualDataSetInfoy must_== expectDataSetInfo + } + "read dataSetInfo containing a createQuery" in { + val actualDataSetInfoy = parser.parse(queryDataSetInfoJSON) + val expectDataSetInfo = queryDataSetInfo + actualDataSetInfoy must_== expectDataSetInfo + } + "read dataSetInfo with createQuery by (state, hour) count request" in { + val actualDataSetInfo = parser.parse(complexQueryDataSetInfoJSON) + val expectDataSetInfo = berryAggrByTagViewDataSetInfo + actualDataSetInfo must_== expectDataSetInfo + } + "read dataSetInfo containing dimension and measurement fields" in { + val actualDataSetInfo = parser.parse(sourceDataSetInfoJSON) + val expectDataSetInfo = TestQuery.sourceInfo + actualDataSetInfo must_== expectDataSetInfo + } + "read dataSetInfo containing twitter schema and zika filter query" in { + val actualDataSetInfo = parser.parse(zikaDataSetInfoJSON) + val expectDataSetInfo = TestQuery.zikaHalfYearViewInfo + actualDataSetInfo must_== expectDataSetInfo + } + "write a sample dataSetInfo" in { + val expectJSON = parser.write(simpleDataSetInfo) + val actualJSON = simpleDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo containing Schema fields" in { + val actualJSON = parser.write(fieldsDataSetInfo) + val expectJSON = fieldsDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo containing a simple createQuery" in { + val actualJSON = parser.write(queryDataSetInfo) + val expectJSON = queryDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo with createQuery by (state, hour) count request" in { + val actualJSON = parser.write(berryAggrByTagViewDataSetInfo) + val expectJSON = complexQueryDataSetInfoJSON + actualJSON must_== expectJSON + } + + "write dataSetInfo with createQuery by topK hashtag request" in { + val actualJSON = parser.write(unnestQueryDataSetInfo) + val expectJSON = unnestQueryDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo containing dimension and measurement fields" in { + val actualJSON = parser.write(TestQuery.sourceInfo) + val expectJSON = sourceDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo containing twitter schema and zika filter query" in { + val actualJSON = parser.write(TestQuery.zikaHalfYearViewInfo) + val expectJSON = zikaDataSetInfoJSON + actualJSON must_== expectJSON + } + "write dataSetInfo containing twitter schema and group by bin query" in { + val actualJSON = parser.write(byBinDataSetInfo) + val expectJSON = BinDataSetInfoJSON + actualJSON must_== expectJSON + } + } +} diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParserTest.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParserTest.scala index c0f6dad7a..bf508b6de 100644 --- a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParserTest.scala +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/JSONParserTest.scala @@ -37,6 +37,11 @@ class JSONParserTest extends Specification { val expectQuery = Query(TwitterDataSet, Seq.empty, filter, Seq.empty, None, Some(selectRecent)) actualQuery must_== expectQuery } + "parse the group by bin" in { + val actualQuery = parser.parse(groupByBinJSON) + val expectQuery = Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(GroupStatement(Seq(byBin), Seq(aggrCount))), None) + actualQuery must_== expectQuery + } "parse int values " in { val actualQuery = parser.parse(intValuesJSON) val expectQuery = new Query(TwitterDataSet, Seq.empty, Seq(intFilter), Seq.empty, None, None) diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestDataSetInfo.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestDataSetInfo.scala new file mode 100644 index 000000000..fdc6533d6 --- /dev/null +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestDataSetInfo.scala @@ -0,0 +1,258 @@ +package edu.uci.ics.cloudberry.zion.model.impl + +import edu.uci.ics.cloudberry.zion.model.impl.TestQuery._ +import edu.uci.ics.cloudberry.zion.model.schema._ +import org.joda.time.{DateTime, DateTimeZone, Interval} +import play.api.libs.json.Json + +object TestDataSetInfo { + + DateTimeZone.setDefault(DateTimeZone.UTC) + val starDateTime = new DateTime(2004, 12, 25, 0, 0, 0, 0) + val endDateTime = new DateTime(2016, 1, 1, 0, 0, 0, 0) + val endTimeString = "2016-01-01T00:00:00.000Z" + val startTimeString = "2004-12-25T00:00:00.000Z" + + val interval = new Interval(starDateTime, endDateTime) + val fields = Seq(NumberField("id"), StringField("name")) + val globalAggr = GlobalAggregateStatement(aggrCount) + val filter = Seq(stateFilter, timeFilter, textFilter) + val group = GroupStatement(Seq(byState, byHour), Seq(aggrCount)) + val groupByTag = GroupStatement(Seq(byTag), Seq(aggrCount)) + val groupByBin = GroupStatement(Seq(byBin), Seq(aggrCount)) + + val createQuery = new Query(dataset = TwitterDataSet, globalAggr = Some(globalAggr)) + val berryAggrByTagQuery = new Query(TwitterDataSet, Seq.empty, filter, Seq.empty, Some(group), None) + val unnestQuery = new Query(TwitterDataSet, Seq.empty, filter, Seq(unnestHashTag), Some(groupByTag), Some(selectTop10Tag)) + val groupByBinQuery = new Query(TwitterDataSet, Seq.empty, Seq.empty, Seq.empty, Some(groupByBin), None) + + + val simpleDataSetInfo = new DataSetInfo("twitter.ds_tweet", None, Schema("tweet", Seq.empty, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + val fieldsDataSetInfo = new DataSetInfo("twitter.ds_tweet", None, Schema("tweet", fields, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + val queryDataSetInfo = new DataSetInfo("twitter.ds_tweet", Some(createQuery), Schema("tweet", Seq.empty, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + val berryAggrByTagViewDataSetInfo = new DataSetInfo("twitter.ds_tweet", Some(berryAggrByTagQuery), Schema("tweet", Seq.empty, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + val unnestQueryDataSetInfo = new DataSetInfo("twitter.ds_tweet", Some(unnestQuery), Schema("tweet", Seq.empty, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + val byBinDataSetInfo = new DataSetInfo("twitter.ds_tweet", Some(groupByBinQuery), Schema("tweet", Seq.empty, Seq.empty, Seq.empty, ""), interval, new Stats(endDateTime, endDateTime, endDateTime, 0)) + + + val simpleDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "schema": { + | "typeName": "tweet", + | "dimension": [], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + + val fieldsDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "schema": { + | "typeName": "tweet", + | "dimension": [{ + | "name": "id", + | "isOptional": false, + | "datatype": "Number"}, + | { + | "name": "name", + | "isOptional": false, + | "datatype": "String"} + | ], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + + val queryDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "createQuery": + |$globalCountJSON, + | "schema": { + | "typeName": "tweet", + | "dimension": [], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + + val BinDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "createQuery": + |$groupByBinJSON, + | "schema": { + | "typeName": "tweet", + | "dimension": [], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + + val complexQueryDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "createQuery": + |$filterSelectJSON, + | "schema": { + | "typeName": "tweet", + | "dimension": [], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + val unnestQueryDataSetInfoJSON = Json.parse( + s""" + |{ + | "name": "twitter.ds_tweet", + | "createQuery": + |$topKHashTagJSON, + | "schema": { + | "typeName": "tweet", + | "dimension": [], + | "measurement": [], + | "primaryKey": [], + | "timeField": "" + | }, + | "dataInterval": {"start":"$startTimeString", + | "end":"$endTimeString"}, + | "stats": { "createTime": "$endTimeString", + | "lastModifyTime": "$endTimeString", + | "lastReadTime": "$endTimeString", + | "cardinality": 0 + | } + |} + """.stripMargin) + + val sourceDataSetInfoJSON = Json.parse( + s""" + |{ + |"name":"twitter.ds_tweet", + |"schema":{ + | "typeName":"twitter.typeTweet", + | "dimension":[{"name":"create_at","isOptional":false,"datatype":"Time"}, + | {"name":"id","isOptional":false,"datatype":"Number"}, + | {"name":"coordinate","isOptional":false,"datatype":"Point"}, + | {"name":"lang","isOptional":false,"datatype":"String"}, + | {"name":"is_retweet","isOptional":false,"datatype":"Boolean"}, + | {"name":"hashtags","isOptional":true,"datatype":"Bag","innerType":"String"}, + | {"name":"user_mentions","isOptional":true,"datatype":"Bag","innerType":"Number"}, + | {"name":"user.id","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.stateID","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.countyID","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.cityID","isOptional":false,"datatype":"Number"}, + | {"name":"geo","isOptional":false,"datatype":"Hierarchy","innerType":"Number", + | "levels":[{"level":"state","field":"geo_tag.stateID"}, + | {"level":"county","field":"geo_tag.countyID"}, + | {"level":"city","field":"geo_tag.cityID"}]}], + | "measurement":[{"name":"text","isOptional":false,"datatype":"Text"}, + | {"name":"in_reply_to_status","isOptional":false,"datatype":"Number"}, + | {"name":"in_reply_to_user","isOptional":false,"datatype":"Number"}, + | {"name":"favorite_count","isOptional":false,"datatype":"Number"}, + | {"name":"retweet_count","isOptional":false,"datatype":"Number"}, + | {"name":"user.status_count","isOptional":false,"datatype":"Number"}], + | "primaryKey":["id"],"timeField":"create_at"}, + |"dataInterval":{"start":"2015-01-01T00:00:00.000Z", + | "end":"2017-01-01T00:00:00.000Z"}, + |"stats":{"createTime":"2015-01-01T00:00:00.000Z", + | "lastModifyTime":"2017-01-01T00:00:00.000Z", + | "lastReadTime":"2017-01-01T00:00:00.000Z", + | "cardinality":10000 + | } + |} + """.stripMargin) + + val zikaDataSetInfoJSON = Json.parse( + s""" + |{ + |"name":"zika", + |"createQuery": + |$zikaJSON, + |"schema":{ + | "typeName":"twitter.typeTweet", + | "dimension":[{"name":"create_at","isOptional":false,"datatype":"Time"}, + | {"name":"id","isOptional":false,"datatype":"Number"}, + | {"name":"coordinate","isOptional":false,"datatype":"Point"}, + | {"name":"lang","isOptional":false,"datatype":"String"}, + | {"name":"is_retweet","isOptional":false,"datatype":"Boolean"}, + | {"name":"hashtags","isOptional":true,"datatype":"Bag","innerType":"String"}, + | {"name":"user_mentions","isOptional":true,"datatype":"Bag","innerType":"Number"}, + | {"name":"user.id","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.stateID","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.countyID","isOptional":false,"datatype":"Number"}, + | {"name":"geo_tag.cityID","isOptional":false,"datatype":"Number"}, + | {"name":"geo","isOptional":false,"datatype":"Hierarchy","innerType":"Number", + | "levels":[{"level":"state","field":"geo_tag.stateID"}, + | {"level":"county","field":"geo_tag.countyID"}, + | {"level":"city","field":"geo_tag.cityID"}]}], + | "measurement":[{"name":"text","isOptional":false,"datatype":"Text"}, + | {"name":"in_reply_to_status","isOptional":false,"datatype":"Number"}, + | {"name":"in_reply_to_user","isOptional":false,"datatype":"Number"}, + | {"name":"favorite_count","isOptional":false,"datatype":"Number"}, + | {"name":"retweet_count","isOptional":false,"datatype":"Number"}, + | {"name":"user.status_count","isOptional":false,"datatype":"Number"}], + | "primaryKey":["id"],"timeField":"create_at"}, + |"dataInterval":{"start":"2015-01-01T00:00:00.000Z", + | "end":"2016-06-01T00:00:00.000Z"}, + |"stats":{"createTime":"2015-01-01T00:00:00.000Z", + | "lastModifyTime":"2016-06-01T00:00:00.000Z", + | "lastReadTime":"2016-06-01T00:00:00.000Z", + | "cardinality":50 + | } + |} + """.stripMargin) + + +} \ No newline at end of file diff --git a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala index 5319f177c..0c49b8af9 100644 --- a/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala +++ b/zion/src/test/scala/edu/uci/ics/cloudberry/zion/model/impl/TestQuery.scala @@ -1,15 +1,16 @@ package edu.uci.ics.cloudberry.zion.model.impl import edu.uci.ics.cloudberry.zion.model.schema._ -import org.joda.time.DateTime +import org.joda.time.{DateTime, DateTimeZone} import play.api.libs.json.Json object TestQuery { + DateTimeZone.setDefault(DateTimeZone.UTC) val TwitterDataSet = TwitterDataStore.DatasetName val schema = TwitterDataStore.TwitterSchema - val startTime = "2016-01-01T00:00:00Z" - val endTime = "2016-12-01T00:00:00Z" + val startTime = "2016-01-01T00:00:00.000Z" + val endTime = "2016-12-01T00:00:00.000Z" val textValue = Seq("zika", "virus") val stateValue = Seq(37, 51, 24, 11, 10, 34, 42, 9, 44) @@ -95,6 +96,17 @@ object TestQuery { | ] """.stripMargin + val filterZikaJSON = + s""" + |"filter": [ + | { + | "field": "text", + | "relation": "contains", + | "values": ["zika"] + | } + | ] + """.stripMargin + val filterWrongValueJSON = s""" |"filter": [ @@ -225,13 +237,42 @@ object TestQuery { |} """.stripMargin ) + val groupByBinJSON = Json.parse( + s""" + |{ + | "dataset": "twitter.ds_tweet", + | "group": { + | "by": [ + | { + | "field": "geo_tag.stateID", + | "apply": { + | "name": "bin", + | "args": { + | "scale": 10 + | } + | }, + | "as": "state" + | } + | ], + | "aggregate": [ + | { + | "field": "*", + | "apply": { + | "name": "count" + | }, + | "as": "count" + | } + | ] + | } + |} + """.stripMargin) val topKHashTagJSON = Json.parse( s""" |{ | "dataset": "twitter.ds_tweet", | $filterJSON, - | "unnest" : { "hashtags": "tag"}, + | "unnest" : [{ "hashtags": "tag"}], | "group": { | "by": [ | { @@ -526,6 +567,14 @@ object TestQuery { |} """.stripMargin) + val zikaJSON = Json.parse( + s""" + |{ + | "dataset": "twitter.ds_tweet", + | $filterZikaJSON + |} + """.stripMargin) + def removeEmptyLine(string: String): String = string.split("\\r?\\n").filterNot(_.trim.isEmpty).mkString("\n") def unifyNewLine(string: String): String = string.replaceAll("\\r?\\n", "\n")