From d90f9116ca7ef5ecbf2237d97c18d27b5ac97501 Mon Sep 17 00:00:00 2001 From: jangorecki Date: Sun, 18 Mar 2018 13:22:42 +0530 Subject: [PATCH] drop most suggests, simplify --- .Rbuildignore | 2 + .gitignore | 25 ++++- DESCRIPTION | 9 +- NAMESPACE | 2 - R/as.data.cube.R | 1 - R/as.dimension.R | 20 ++-- R/as.fact.R | 29 +----- R/data.cube.R | 98 +++++++++---------- R/data.table.R | 4 + R/dimension.R | 3 +- R/fact.R | 159 +++++++++---------------------- R/schema.R | 15 +-- man/as.fact.Rd | 7 -- tests/tests-big.data.cube.R | 183 ------------------------------------ tests/tests-data.cube.R | 3 +- tests/tests-method-query.R | 67 ------------- vignettes/big.data.cube.Rmd | 74 --------------- 17 files changed, 144 insertions(+), 557 deletions(-) delete mode 100644 tests/tests-big.data.cube.R delete mode 100644 tests/tests-method-query.R delete mode 100644 vignettes/big.data.cube.Rmd diff --git a/.Rbuildignore b/.Rbuildignore index f74e4dc..8e65cee 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -1,3 +1,5 @@ +^\.emacs\.desktop +^\.emacs\.desktop\.lock ^.*\.Rproj$ ^\.Rproj\.user$ ^\..+ diff --git a/.gitignore b/.gitignore index 15bbed8..9a33714 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,25 @@ -.Rproj.user -.Rhistory +# History files .RData +.Rhistory +.Rapp.history + +# Package build process +*-Ex.R +data.cube_*.tar.gz +data.cube.Rcheck + +# Emacs IDE files +.emacs.desktop +.emacs.desktop.lock + +# RStudio IDE files +.Rproj.user data.cube.Rproj + +# produced vignettes +vignettes/*.html +vignettes/*.pdf + +# object and shared objects +*.o +*.so diff --git a/DESCRIPTION b/DESCRIPTION index f530966..430a35b 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,14 +1,13 @@ Package: data.cube Type: Package Title: OLAP cube data type -Version: 0.3.0 -Date: 2016-04-27 +Version: 0.4.0 +Date: 2018-03-17 Author: Jan Gorecki Maintainer: Jan Gorecki Description: Extends array for OLAP operations on multidimensional hierarchical data powered by data.table. Depends: R (>= 3.1.0) -Imports: data.table (>= 1.9.7), R6 -Suggests: big.data.table (>= 0.3.4), RSclient, Rserve, logR (>= 2.1.4), knitr, rmarkdown +Imports: data.table (>= 1.9.8), R6 +Suggests: knitr License: GPL-3 -Additional_repositories: https://jangorecki.gitlab.io/big.data.table, https://jangorecki.gitlab.io/logR, https://Rdatatable.github.io/data.table VignetteBuilder: knitr diff --git a/NAMESPACE b/NAMESPACE index 03f29eb..5cbfe96 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -112,8 +112,6 @@ S3method(as.data.table, dimension) export(as.fact) S3method(as.fact, default) S3method(as.fact, data.table) -S3method(as.fact, list) -S3method(as.fact, big.data.table) # *.fact S3method(length, fact) diff --git a/R/as.data.cube.R b/R/as.data.cube.R index e9b1ac8..1622427 100644 --- a/R/as.data.cube.R +++ b/R/as.data.cube.R @@ -197,7 +197,6 @@ as.data.table.data.cube = function(x, na.fill = FALSE, dcast = FALSE, ...) { } as.array.data.cube = function(x, measure, na.fill = NA, ...) { - if (!x$fact$local) stop("Only local data.cube, not distributed ones, can be converted to array") if (missing(measure)) measure = x$fact$measure.vars[1L] if (length(measure) > 1L) stop("Your cube seems to have multiple measures, you must provide scalar column name as 'measure' argument to as.array.") dimcols = lapply(x$dimensions, function(x) x$id.vars) diff --git a/R/as.dimension.R b/R/as.dimension.R index 4ab13ef..0e60a3d 100644 --- a/R/as.dimension.R +++ b/R/as.dimension.R @@ -54,20 +54,20 @@ as.dimension.environment = function(x, ...){ } null.dimension = function(...){ - env = new.env() - env$data = data.table(NULL) - env$id.vars = character() - env$hierarchies = list() - env$levels = list() - env$fields = character() - as.dimension.environment(env) + ans = new.env() + ans$data = data.table(NULL) + ans$id.vars = character() + ans$hierarchies = list() + ans$levels = list() + ans$fields = character() + as.dimension.environment(ans) } # export as.data.table.dimension = function(x, lvls = names(x$levels), ...) { stopifnot(is.dimension(x)) - r = copy(x$data) - lookupv(dims = lapply(x$levels[lvls], as.data.table.level), r) - r[] + ans = copy(x$data) + lookupv(dims = lapply(x$levels[lvls], as.data.table.level), ans) + ans[] } diff --git a/R/as.fact.R b/R/as.fact.R index 13cdd86..759280b 100644 --- a/R/as.fact.R +++ b/R/as.fact.R @@ -35,31 +35,8 @@ as.fact.data.table = function(x, id.vars = as.character(key(x)), measure.vars = list(.fun.aggregate = sub.fun))) } -#' @rdname as.fact -#' @method as.fact list -as.fact.list = function(x, id.vars = as.character(key(x)), measure.vars = setdiff(names(x), id.vars), fun.aggregate = sum, ..., measures = NULL) { - sub.fun = substitute(fun.aggregate) - stopifnot(requireNamespace("big.data.table", quietly = TRUE), big.data.table::is.rscl(x)) - eval(substitute(fact$new(x, id.vars = id.vars, measure.vars = measure.vars, fun.aggregate = .fun.aggregate, ... = ..., measures = measures), - list(.fun.aggregate = sub.fun))) -} - -#' @rdname as.fact -#' @method as.fact big.data.table -as.fact.big.data.table = function(x, id.vars, measure.vars = setdiff(names(x), id.vars), fun.aggregate = sum, ..., measures = NULL) { - sub.fun = substitute(fun.aggregate) - stopifnot(requireNamespace("big.data.table", quietly = TRUE), big.data.table::is.big.data.table(x), is.character(id.vars)) - eval(substitute(fact$new(x, id.vars = id.vars, measure.vars = measure.vars, fun.aggregate = .fun.aggregate, ... = ..., measures = measures), - list(.fun.aggregate = sub.fun))) -} - -as.fact.environment = function(x, ...) { - fact$new(.env = x) -} - null.fact = function(...) { env = new.env() - env$local = TRUE env$id.vars = character() env$measure.vars = character() env$measures = list() @@ -72,4 +49,8 @@ null.fact = function(...) { as.data.table.fact = function(x, ...) { stopifnot(is.fact(x)) copy(x$data) -} \ No newline at end of file +} + +as.fact.environment = function(x, ...) { + fact$new(.env = x) +} diff --git a/R/data.cube.R b/R/data.cube.R index 3e43342..c3161cb 100644 --- a/R/data.cube.R +++ b/R/data.cube.R @@ -81,14 +81,14 @@ data.cube = R6Class( dict = self$schema() prnt = character() prnt["header"] = "" - #prnt["distributed"] = n.measures = length(self$fact$measure.vars) prnt["fact"] = dict[type=="fact", - sprintf("fact%s:\n %s rows x %s dimensions x %s measures (%.2f MB)", - if(!self$fact$local) sprintf(" (distributed on %s nodes)", length(attr(self$fact$data, "rscl"))) else "", + sprintf("fact:\n %s rows x %s dimensions x %s measures (%.2f MB)", nrow, ncol - n.measures, n.measures, mb)] if (length(self$dimensions)) { - dt = dict[type=="dimension", .(nrow = nrow[is.na(entity)], ncol = ncol[is.na(entity)], mb = sum(mb, na.rm = TRUE)), .(name)] + size = dict[type=="dimension", .(mb = sum(mb, na.rm = TRUE)), .(name)] + nrnc = dict[type=="dimension" & is.na(entity), .(nrow, ncol), .(name)] + dt = nrnc[size, on="name"] prnt["dims"] = paste0("dimensions:\n", paste(dt[, sprintf(" %s : %s entities x %s levels (%.2f MB)", name, nrow, ncol, mb)], collapse="\n")) } prnt["size"] = sprintf("total size: %.2f MB", dict[,sum(mb)]) @@ -96,17 +96,17 @@ data.cube = R6Class( invisible(self) }, denormalize = function(na.fill = FALSE, dims = names(self$dimensions)) { - r = as.data.table(self$fact) + ans = as.data.table.fact(self$fact) if (isTRUE(na.fill)) { # `nomatch` to be extended to raise error if fact has values not in dims, after data.table#857 resolved dn = dimnames(self) - r = r[i=do.call(CJ, c(dn, list(sorted=TRUE, unique=TRUE))), - nomatch=NA, - on=setNames(names(dn), nm=self$fact$id.vars)] + ans = ans[i=do.call(CJ, c(dn, list(sorted=TRUE, unique=TRUE))), + nomatch=NA, + on=setNames(names(dn), nm=self$fact$id.vars)] } - # lookup - lookupv(dims = lapply(self$dimensions[dims], as.data.table.dimension), r) - if (length(self$fact$id.vars)) setkeyv(r, self$fact$id.vars)[] else r[] + lookupv(dims = lapply(self$dimensions[dims], as.data.table.dimension), ans) + if (length(self$fact$id.vars)) setkeyv(ans, self$fact$id.vars) + ans[] }, schema = function() { rbindlist(list( @@ -246,32 +246,32 @@ data.cube = R6Class( } else self ) # returned object - r = new.env() + ans = new.env() # - [x] filter dimensions and levels while quering them to new environment - r$dimensions = sapply(names(self$dimensions), function(dim) { - if (dim %chin% names(i.sub)) self$dimensions[[dim]]$subset(i.sub = i.sub[[dim]]) - else if (dim %chin% names(i.grp)) self$dimensions[[dim]]$rollup(i.grp[[dim]]) + ans$dimensions = sapply(names(self$dimensions), function(dim) { + if (dim %chin% names(i.sub)) self$dimensions[[dim]]$subset(i.sub = i.sub[[dim]]) + else if (dim %chin% names(i.grp)) self$dimensions[[dim]]$rollup(i.grp[[dim]]) }, simplify=FALSE) - r$id.vars = self$id.vars + ans$id.vars = self$id.vars # - [x] filter fact - prepare index for subset fact filter.dims = sapply(i.sub, function(x) length(x) || is.null(x)) # NULL is valid empty subset notation, as in base R filter.dims = names(filter.dims)[as.logical(filter.dims)] # primary keys of dimensions after filtering - dimkeys = sapply(names(r$dimensions)[names(r$dimensions) %chin% filter.dims], function(dim) { - r$dimensions[[dim]]$data[[r$dimensions[[dim]]$id.vars]] + dimkeys = sapply(names(ans$dimensions)[names(ans$dimensions) %chin% filter.dims], function(dim) { + ans$dimensions[[dim]]$data[[ans$dimensions[[dim]]$id.vars]] }, simplify=FALSE) - stopifnot(names(dimkeys) %chin% names(r$dimensions)) # all names must match, before drop dims + stopifnot(names(dimkeys) %chin% names(ans$dimensions)) # all names must match, before drop dims # - [x] drop sliced dimensions if (drop) { len1.dims = names(dimkeys)[sapply(dimkeys, length)==1L] # if user provides multiple values to dimension filter key, it should not drop that dim even when only 1L was matched, base::array raises error on nomatch - filter.multkey = len1.dims[sapply(len1.dims, function(dim) length(i.sub[[dim]][[r$dimensions[[dim]]$id.vars]])) > 1L] - if.drop = names(r$dimensions) %chin% setdiff(len1.dims, filter.multkey) - r$dimensions[if.drop] = NULL - r$id.vars = r$id.vars[!if.drop] + filter.multkey = len1.dims[sapply(len1.dims, function(dim) length(i.sub[[dim]][[ans$dimensions[[dim]]$id.vars]])) > 1L] + if.drop = names(ans$dimensions) %chin% setdiff(len1.dims, filter.multkey) + ans$dimensions[if.drop] = NULL + ans$id.vars = ans$id.vars[!if.drop] } # - [x] subset fact - # - [ ] support for: filter `.`, collapse dim `-`, rollup `+`, cube `^` + # - [ ] support for:filter `.`, collapse dim `-`, rollup `+`, cube `^` dimcols = self$id.vars[names(self$dimensions) %chin% names(dimkeys)] stopifnot(length(dimcols) == length(dimkeys)) setattr(dimkeys, "names", dimcols) @@ -312,28 +312,23 @@ data.cube = R6Class( x = sapply(names(self$dimensions), function(x) self$dimensions[[x]]$rollup(i.ops = i.ops[[x]]), simplify=FALSE) # all fields used in grouping for each dimension new.fact = self$fact$rollup(x, collapse=collapse.cols, grouping.sets=groupingsets.cols, ops=i.ops, drop=drop) - # r$fact = new.fact + # ans$fact = new.fact } else { - r$fact = self$fact$subset(dimkeys, collapse=collapse.cols, drop=drop) + ans$fact = self$fact$subset(dimkeys, collapse=collapse.cols, drop=drop) } - stopifnot(ncol(r$fact$data) > 0L, length(collapse.cols)==length(collapse.dims)) + stopifnot(ncol(ans$fact$data) > 0L, length(collapse.cols)==length(collapse.dims)) if (length(collapse.dims)) { - r$dimensions[collapse.dims] = NULL - r$id.vars = setdiff(r$id.vars, collapse.cols) + ans$dimensions[collapse.dims] = NULL + ans$id.vars = setdiff(ans$id.vars, collapse.cols) } # - [x] return cube with all dimensions filtered and fact filtered - as.data.cube.environment(r) + as.data.cube.environment(ans) }, # setindex setindex = function(drop = FALSE) { - optional.logR = function(x, .log = getOption("datacube.log")) { - if(isTRUE(.log)) eval.parent(substitute(logR(x), list(x = substitute(x)))) else x - } - r = list( - fact = optional.logR(self$fact$setindex(drop=drop)), - dimensions = lapply(self$dimensions, function(x) optional.logR(x$setindex(drop=drop))) - ) # r - not used further but evaluated on lower classes + self$fact$setindex(drop=drop) + lapply(self$dimensions, function(x) x$setindex(drop=drop)) invisible(self) }, rollup = function(...) { @@ -390,6 +385,7 @@ is.data.cube = function(x) inherits(x, "data.cube") #' @param x data.cube object #' @param ... values to subset on corresponding dimensions, when wrapping in list it will refer to dimension hierarchies #' @param drop logical, default TRUE, drop redundant dimensions, same as \emph{drop} argument in \code{[.array}. +#' @details The following syntax has been propose to subset data.cube: TODO #6 #' @return data.cube class object "[.data.cube" = function(x, ..., drop = TRUE) { if (!is.logical(drop)) stop("`drop` argument to data.cube subset must be logical. If argument name conflicts with your dimension name then provide it without name, elements in ... are matched by positions - as in array method - not names.") @@ -408,8 +404,8 @@ is.data.cube = function(x) inherits(x, "data.cube") return(x) } # proceed subset, also proceed empty subset `dc[,]` or `dc[, drop=.]` - r = x$subset(.dots = .dots, drop = drop) - r + ans = x$subset(.dots = .dots, drop = drop) + ans } # @title Extract data.cube @@ -419,14 +415,14 @@ is.data.cube = function(x) inherits(x, "data.cube") # @param by expression/character vector to aggregate measures accroding to \emph{j} argument. # @return data.cube?? class object # "[[.data.cube" = function(x, i, j, by) { -# r = x$extract(by = by, .call = match.call()) -# r +# ans = x$extract(by = by, .call = match.call()) +# ans # } dimnames.data.cube = function(x) { - r = sapply(x$dimensions, dimnames, simplify=FALSE) - if (!length(r)) return(NULL) - r + ans = sapply(x$dimensions, dimnames, simplify=FALSE) + if (!length(ans)) return(NULL) + ans } str.data.cube = function(object, ...) { @@ -443,25 +439,25 @@ format.data.cube = function(x, na.fill = FALSE, measure.format = list(), dots.fo length(names(measure.format))==length(measure.format), names(measure.format) %in% measure.vars ) - r = x$denormalize(dims = character(0), na.fill = na.fill) - if (length(id.vars)) r = setorderv(r, cols = id.vars, order=1L, na.last=TRUE) + ans = x$denormalize(dims = character(0), na.fill = na.fill) + if (length(id.vars)) ans = setorderv(ans, cols = id.vars, order=1L, na.last=TRUE) if (!is.null(measure.format)) { # measure.format=NULL will stop any formatting for (mv in measure.vars) { if (mv %chin% names(measure.format)) { FUN = measure.format[[mv]] - set(r, i = NULL, j = mv, value = FUN(r[[mv]], ... = dots.format[[mv]])) + set(ans, i = NULL, j = mv, value = FUN(ans[[mv]], ... = dots.format[[mv]])) } else { if (!is.null(FUN <- x$fact$measures[[mv]]$fun.format)) { - set(r, i = NULL, j = mv, value = FUN(r[[mv]], ... = dots.format[[mv]])) + set(ans, i = NULL, j = mv, value = FUN(ans[[mv]], ... = dots.format[[mv]])) } } } } - if (isTRUE(dcast)) r = dcast.data.table(r, ...) - r[] + if (isTRUE(dcast)) ans = dcast.data.table(ans, ...) + ans[] } -head.data.cube = function(x, n = 6L, ...) x$head(n) +head.data.cube = function(x, n = 6L, ...) x$head(n = n) length.data.cube = function(x) as.integer(nrow(x$fact)) names.data.cube = function(x) as.character(names(x$fact)) diff --git a/R/data.table.R b/R/data.table.R index 174934b..1181483 100644 --- a/R/data.table.R +++ b/R/data.table.R @@ -1,3 +1,6 @@ + +## data.table helper to be not dependend on data.cube pkg + #' @title Convert array to data.table #' @param x array #' @param keep.rownames ignored @@ -105,3 +108,4 @@ lookupv = function(dims, fact) { lookup(fact, dim, setdiff(nd, nf)) }) } + diff --git a/R/dimension.R b/R/dimension.R index 94de2d2..22b1121 100644 --- a/R/dimension.R +++ b/R/dimension.R @@ -58,7 +58,7 @@ dimension = R6Class( rbindlist(list(dimension_data_schema, levels_schema)) }, head = function(n = 6L) { - list(base = head(self$data, n), levels = lapply(self$levels, function(x) x$head(n = n))) + list(base = head(self$data, n = n), levels = lapply(self$levels, function(x) x$head(n = n))) }, # subset subset = function(i.sub) { @@ -97,6 +97,7 @@ dimension = R6Class( invisible(self) }, rollup = function(x, i.ops) { + # TO DO reuse data.table browser() stopifnot(is.character(x), is.character(i.ops)) r = new.env() diff --git a/R/fact.R b/R/fact.R index b478a9c..9f881c9 100644 --- a/R/fact.R +++ b/R/fact.R @@ -1,12 +1,12 @@ #' @title Fact class #' @docType class #' @format An R6 class object. -#' @details Class stores fact table as local \code{data.table} or remote \code{big.data.table}. Measures can be provided manually to \code{measures} argument, useful for custom aggregate function per measure. +#' @details Class stores fact table as \code{data.table}. Measures can be provided manually to \code{measures} argument, useful for custom aggregate function per measure. #' @seealso \code{\link{as.fact}}, \code{\link{measure}}, \code{\link{dimension}}, \code{\link{data.cube}} fact = R6Class( classname = "fact", public = list( - local = logical(), + local = logical(), # TO REMOVE id.vars = character(), # foreign keys measure.vars = character(), measures = list(), @@ -14,7 +14,6 @@ fact = R6Class( initialize = function(x, id.vars = character(), measure.vars = character(), fun.aggregate = sum, ..., measures = NULL, .env) { if (!missing(.env)) { # skip heavy processing for env argument - self$local = .env$local self$id.vars = .env$id.vars self$measure.vars = .env$measure.vars self$measures = .env$measures @@ -36,22 +35,13 @@ fact = R6Class( } stopifnot( sapply(self$measures, inherits, "measure"), - TRUE - #unlist(lapply(self$measures, `[[`, "var")) %in% names(x) # 'x' not yet ready to use due to remote interface dev here + unlist(lapply(self$measures, `[[`, "var")) %in% names(x) ) # build `j` expression jj = self$build.j() # aggregate dtq = substitute(x <- x[, j = .jj,, keyby = .id.vars], list(.jj = jj, .id.vars = self$id.vars)) - self$local = is.data.table(x) - if (self$local) { - self$data = eval(dtq) - } else { - stopifnot(requireNamespace("big.data.table", quietly = TRUE), big.data.table::is.rscl(x) || big.data.table::is.big.data.table(x)) - if (big.data.table::is.rscl(x)) x = big.data.table::as.big.data.table(x) - x[[expr = dtq, lazy = FALSE, send = TRUE]] - self$data = x - } + self$data = eval(dtq) invisible(self) }, print = function() { @@ -69,10 +59,10 @@ fact = R6Class( jj }, schema = function() { - if (self$local) schema.data.table(self$data, empty = c("entity")) else schema.big.data.table(self$data, empty = c("entity")) + schema.data.table(self$data, empty = c("entity")) }, head = function(n = 6L) { - head(self$data, n) + head(self$data, n = n) }, subset = function(x, collapse=character(), drop=TRUE) { # if (!is.character(drop)) stop("Argument 'drop' to fact$subset must be character type, forein key column names.") @@ -81,121 +71,62 @@ fact = R6Class( stopifnot(is.list(x), is.character(collapse), names(x) %chin% self$id.vars, collapse %chin% self$id.vars) # must return fact, not a data.table r = new.env() - r$local = self$local r$id.vars = self$id.vars r$measure.vars = self$measure.vars r$measures = self$measures - if (self$local) { - anynull = any(sapply(x, is.null)) - dimlens = sapply(x, length) - II = integer() - if (!anynull) { # for any NULL there is no need to subset - for (dk in names(x)) { - ii = self$data[x[dk], nomatch=0L, on=dk, which=TRUE] - II = if (dk == names(x)[1L]) ii else intersect(II, ii) - if (!length(II)) break # skip further filters when already 0 rows - } + anynull = any(sapply(x, is.null)) + dimlens = sapply(x, length) + II = integer() + if (!anynull) { # for any NULL there is no need to subset + for (dk in names(x)) { + ii = self$data[x[dk], nomatch=0L, on=dk, which=TRUE] + II = if (dk == names(x)[1L]) ii else intersect(II, ii) + if (!length(II)) break # skip further filters when already 0 rows } - do.drop = (drop && any(dimlens==1L)) || length(collapse) - r$id.vars = if (do.drop) { - setdiff(self$id.vars, c(names(dimlens)[dimlens==1L], collapse)) - } else self$id.vars - # subset from fact - r$data = if (length(x)) { # `i` arg - if (length(r$id.vars)) self$data[II, eval(self$build.j()), by=c(r$id.vars)] # `by` arg - else self$data[II, eval(self$build.j())] # no `by` arg - } else { # no `i` arg - if (length(r$id.vars)) self$data[, eval(self$build.j()), by=c(r$id.vars)] # `by` arg - else self$data[, eval(self$build.j())] # no `by` arg - } - # sort data - if (length(r$id.vars)) setkeyv(r$data, r$id.vars) } - if (!self$local) { - stop("distributed processing for data.cube not yet implemented") + do.drop = (drop && any(dimlens==1L)) || length(collapse) + r$id.vars = if (do.drop) { + setdiff(self$id.vars, c(names(dimlens)[dimlens==1L], collapse)) + } else self$id.vars + # subset from fact + r$data = if (length(x)) { # `i` arg + if (length(r$id.vars)) self$data[II, eval(self$build.j()), by=c(r$id.vars)] # `by` arg + else self$data[II, eval(self$build.j())] # no `by` arg + } else { # no `i` arg + if (length(r$id.vars)) self$data[, eval(self$build.j()), by=c(r$id.vars)] # `by` arg + else self$data[, eval(self$build.j())] # no `by` arg } + # sort data + if (length(r$id.vars)) setkeyv(r$data, r$id.vars) as.fact.environment(r) }, setindex = function(drop = FALSE) { - if (self$local) { - setindexv(self$data, if (!drop) self$id.vars) - } else { - stop("TODO") - } + setindexv(self$data, if (!drop) self$id.vars) invisible(self) }, rollup = function(x, grouping.sets=list(), ...) { stopifnot(is.list(x), sapply(x, is.data.table), #if (length(x)) sapply(x, function(dt) names(dt)[1L]) %chin% names(self$id.vars) else TRUE, is.list(grouping.sets)) - if (self$local) { + do.create = !!length(grouping.sets) + if (length(x)) { # `i` arg browser() - # create new dimension with grouping sets or just query data - do.create = !!length(grouping.sets) - - if (length(x)) { # `i` arg - browser() - # lapply(grouping.sets, `[[`) - - self$data[II, eval(self$build.j())] - } else { # no `i` arg - rollup.dims = names(ops)[ops=="+"] - cube.dims = names(ops)[ops=="^"] - if (length(rollup.dims) && length(cube.dims)) stop("Cannot use both rollup and cube in single call.") - groupingcall = substitute( - data.table:::`.fun`(self$data, j=.j, by=.by), - list(.fun = as.name(if (length(rollup.dims)) "rollup.data.table" else if(length(cube.dims)) "cube.data.table"), - .j = self$build.j(), - .by = unique(unname(unlist(grouping.sets)))) - ) - browser() - # - [ ] TODO did not lookup higher attributes YET! - eval(groupingcall) - } - - } - if (!self$local) { - stop("distributed processing for data.cube not yet implemented") - # - [ ] must bind by grouping dimension when downloading from nodes - # - [ ] double check validation of totals re-calculated vs existing from nodes - } - }, - # fact$query method to be removed - currently used only in tests-big.data.cube.R and tests-method-query.R - see data.cube#12 - query = function(i, i.dt, by, measure.vars = self$measure.vars) { - - ii = substitute(i) - jj = self$build.j(measure.vars) - bb = substitute(by) - id = substitute(i.dt) - - l = list( - as.symbol("["), - x = call("$", as.name("self"), as.name("data")) # this can point to data.table or big.data.table - ) - stopifnot(!(!missing(i) & !missing(i.dt))) - if(!missing(i)){ - l[["i"]] = ii - } else if(!missing(i.dt)){ - l[["i"]] = id - } - l[["j"]] = jj - if(!missing(by)) l[["by"]] = bb - if(!missing(i.dt)){ - jn = copy(names(i.dt)) - l[["on"]] = setNames(nm = jn) - l[["nomatch"]] = 0L - } - dcq = as.call(l) - dt = eval(dcq) - if( !self$local ) { - # re-aggr - dcq["i"] = NULL - dcq["on"] = NULL - dcq["nomatch"] = 0L - dcq[["x"]] = as.name("dt") - dt = eval(dcq) + # lapply(grouping.sets, `[[`) + self$data[II, eval(self$build.j())] + } else { # no `i` arg + rollup.dims = names(ops)[ops=="+"] + cube.dims = names(ops)[ops=="^"] + if (length(rollup.dims) && length(cube.dims)) stop("Cannot use both rollup and cube in single call.") + groupingcall = substitute( + data.table:::`.fun`(self$data, j=.j, by=.by), + list(.fun = as.name(if (length(rollup.dims)) "rollup.data.table" else if(length(cube.dims)) "cube.data.table"), + .j = self$build.j(), + .by = unique(unname(unlist(grouping.sets)))) + ) + browser() + # - [ ] TODO did not lookup higher attributes YET! + eval(groupingcall) } - dt } ) ) diff --git a/R/schema.R b/R/schema.R index 6627f1c..7e24b81 100644 --- a/R/schema.R +++ b/R/schema.R @@ -1,17 +1,4 @@ -# stats collector for data.table and big.data.table -schema.big.data.table = function(x, empty) { - if (requireNamespace("big.data.table", quietly = TRUE)) { - rscl = attr(x, "rscl") - dm = dim(x) - mb = sum(big.data.table::rscl.eval(rscl, as.numeric(object.size(x))/(1024^2), simplify = TRUE)) - adr = NA_character_ - ky = big.data.table::rscl.eval(rscl[1L], if(haskey(x)) paste(key(x), collapse=", ") else NA_character_, simplify = TRUE) - dt = data.table(nrow = dm[1L], ncol = dm[2L], mb = mb, address = adr, sorted = ky) - nn = copy(names(dt)) - if (!missing(empty)) setcolorder(dt[, c(empty) := NA][], unique(c(empty, nn))) - dt - } else stop("schema.big.data.table can be only used with `big.data.table` package which is not installed.") -} +# stats collector for data.table schema.data.table = function(x, empty) { dm = dim(x) mb = as.numeric(object.size(x))/(1024^2) diff --git a/man/as.fact.Rd b/man/as.fact.Rd index e45f111..dc4a008 100644 --- a/man/as.fact.Rd +++ b/man/as.fact.Rd @@ -15,13 +15,6 @@ as.fact(x, ...) \method{as.fact}{data.table}(x, id.vars = as.character(key(x)), measure.vars = setdiff(names(x), id.vars), fun.aggregate = sum, ..., measures = NULL) - -\method{as.fact}{list}(x, id.vars = as.character(key(x)), - measure.vars = setdiff(names(x), id.vars), fun.aggregate = sum, ..., - measures = NULL) - -\method{as.fact}{big.data.table}(x, id.vars, measure.vars = setdiff(names(x), - id.vars), fun.aggregate = sum, ..., measures = NULL) } \arguments{ \item{x}{data.table build dimension based on that dataset.} diff --git a/tests/tests-big.data.cube.R b/tests/tests-big.data.cube.R deleted file mode 100644 index efbf5f0..0000000 --- a/tests/tests-big.data.cube.R +++ /dev/null @@ -1,183 +0,0 @@ -pkgs = c("Rserve","RSclient","big.data.table") -apkgs = sapply(pkgs, requireNamespace, quietly = TRUE) -print(apkgs) -if (!all(apkgs)) quit("no", status = 0) - -# start nodes ---- - -library(Rserve) - -port = 33311:33314 - -# shutdown any running nodes -rscl = lapply(setNames(port, port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w)) -invisible(lapply(rscl, function(rsc) if (inherits(rsc, "sockconn")) RSshutdown(rsc))) - -# start cluster -invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save")))) - -options("bigdatatable.log" = FALSE) # for interactive running tests -invisible(TRUE) -Sys.sleep(3) - -# fact ---- - -library(data.table) -library(big.data.table) -library(data.cube) - -rscl = big.data.table::rscl.connect(port = 33311:33314) -rscl.require(rscl, c("data.table","data.cube")) -X = populate_star(N = 1e5, surrogate.keys = FALSE) -bdt = as.big.data.table(X$fact$sales, rscl) -ff = as.fact(x = rscl, - id.vars = c("prod_name","cust_profile","curr_name","geog_abb","time_date"), - measure.vars = c("amount","value"), - fun.aggregate = sum, - na.rm = TRUE) -stopifnot( - is.big.data.table(ff$data), - inherits(ff, "fact"), - sapply(ff$measures, inherits, "measure"), - identical(ff$measure.vars, c("amount","value")) -) -print(ff$data) - -# fact$query ---- - -# by = geog_abb -r = ff$query(by = "geog_abb", measure.vars = "amount") -stopifnot( - is.data.table(r), - c("geog_abb","amount") %in% names(r), - nrow(r) == 50L -) - -# i %in% 1:2, by = geog_abb -r = ff$query(i = geog_abb %in% c("ND","TX"), by = "geog_abb", measure.vars = c("value")) -stopifnot( - is.data.table(r), - c("geog_abb","value") %in% names(r), - nrow(r) == 2L -) - -# i = CJ(1:2, 1:3), by = .(geog_abb, time_date) -r = ff$query(i.dt = CJ(geog_abb = c("ND","TX"), time_date = as.Date(c("2010-01-01","2010-01-02","2010-01-03")), unique = TRUE), by = .(geog_abb, time_date)) -stopifnot( - is.data.table(r), - c("geog_abb","time_date","amount","value") %in% names(r), - nrow(r) == 3L -) - -rscl.close(rscl) - -# data.cube ---- - -X = populate_star(N = 1e5, surrogate.keys = FALSE) -time = as.dimension(X$dims$time, - id.vars = "time_date", - hierarchies = list( - "monthly" = list( - "time_year" = character(), - "time_quarter" = c("time_quarter_name"), - "time_month" = c("time_month_name"), - "time_date" = c("time_month","time_quarter","time_year") - ), - "weekly" = list( - "time_year" = character(), - "time_week" = character(), - "time_date" = c("time_week","time_year") - ) - )) -geog = as.dimension(X$dims$geography, - id.vars = "geog_abb", - hierarchies = list( - list( - "geog_region_name" = character(), - "geog_division_name" = c("geog_region_name"), - "geog_abb" = c("geog_name","geog_division_name","geog_region_name") - ) - )) - -rscl = rscl.connect(port = 33311:33314) -rscl.require(rscl, c("data.table","data.cube")) -bdt = as.big.data.table(X$fact$sales, rscl) -ff = as.fact(x = rscl, - id.vars = c("geog_abb","time_date"), - measure.vars = c("amount","value"), - na.rm = TRUE) -rm(bdt) - -dc = as.data.cube(ff, list(time = time, geography = geog)) -stopifnot( - is.big.data.table(dc$fact$data), - # Normalization - identical(dim(dc$dimensions$geography$levels$geog_abb$data), c(50L, 4L)), - identical(dim(dc$dimensions$geography$levels$geog_region_name$data), c(4L, 1L)), - identical(dim(dc$dimensions$time$levels$time_month$data), c(12L, 2L)), - identical(dim(dc$dimensions$time$levels$time_year$data), c(5L, 1L)), - identical(dim(dc$dimensions$time$levels$time_date$data), c(1826L, 5L)), - all.equal(X$fact$sales[, .(value = sum(value))], dc$fact$data[, .(value = sum(value)), outer.aggregate = TRUE]) -) -rscl.close(rscl) - -# logR + data.cube ---- - -apkg = requireNamespace("logR", quietly = TRUE) -print(apkg) -if (apkg){ - - library(logR) - - ## for check on localhost use postgres service - # docker run --rm -p 127.0.0.1:5432:5432 -e POSTGRES_PASSWORD=postgres --name pg-data.cube postgres:9.5 - if (logR::logR_connect()){ - stopifnot(logR::logR_schema(drop = TRUE)) - on = options("bigdatatable.log" = TRUE, "logR.nano.debug" = TRUE) - - # connect - - X = populate_star(N = 1e5, surrogate.keys = FALSE) - rscl = big.data.table::rscl.connect(port = 33311:33314) - stopifnot(rscl.require(rscl, c("data.table", "logR"))) - stopifnot(rscl.eval(rscl, logR::logR_connect(quoted = TRUE), lazy = FALSE)) - bdt = as.big.data.table(X$fact$sales, rscl) - ff = as.fact(x = rscl, - id.vars = c("geog_abb","time_date"), - measure.vars = c("amount","value"), - na.rm = TRUE) - r = bdt[1L] - # TODO: add data.cube `[` queries when ready - options(on) - lr = logR::logR_dump() - stopifnot( - is.data.table(r), - nrow(r) == 4L, - is.big.data.table(ff$data), - nrow(lr) == 10L, - lr$status == "success", - all.equal(lr[, .(logr_id, parent_id)], data.table(logr_id = 1:10, parent_id = c(NA, rep(1L, 4L), NA, rep(6L, 4L)))), - lr[7:10, expr] == "x[1L]" - ) - rm(bdt) - # data.cube queries # TODO - - on = options("datatable.prettyprint.char" = 80L) - print(lr[]) - options(on) - - rscl.close(rscl) - } -} - -# shutdown nodes ---- - -library(RSclient) - -port = 33311:33314 - -# shutdown any running nodes -l = lapply(setNames(port, port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w)) -invisible(lapply(l, function(rsc) if (inherits(rsc, "sockconn")) RSshutdown(rsc))) - -invisible(TRUE) diff --git a/tests/tests-data.cube.R b/tests/tests-data.cube.R index 876c004..d4d8615 100644 --- a/tests/tests-data.cube.R +++ b/tests/tests-data.cube.R @@ -107,8 +107,7 @@ print.equal = function(x, dt) { x = lapply(strsplit(x, " +"), `[`, -1L) x[[2L]] = as.num(x[[2L]]) dt = lapply(strsplit(dt, " +"), `[`, -1L) - dt[[2L]] = as.num(dt[[2L]]) - + dt[[2L]] = as.num(dt[[2L]]) } else if (length(xdim) == 2L) { x = strsplit(x, " +")[-1L] x[[1L]] = c("", x[[1L]]) # match first line for below line processing diff --git a/tests/tests-method-query.R b/tests/tests-method-query.R deleted file mode 100644 index 7d10096..0000000 --- a/tests/tests-method-query.R +++ /dev/null @@ -1,67 +0,0 @@ -library(data.table) -library(data.cube) - -options("datacube.jj" = FALSE) - -# fact$initialize ---- - -X = populate_star(N = 1e3, surrogate.keys = FALSE) -ff = as.fact(x = X$fact$sales, - id.vars = c("prod_name","cust_profile","curr_name","geog_abb","time_date"), - measure.vars = c("amount","value"), - na.rm = TRUE) -stopifnot( - is.data.table(ff$data), - sapply(ff$measures, inherits, "measure") -) - -# fact$query ---- - -# by = geog_abb -r = ff$query(by = "geog_abb", measure.vars = c("amount")) -stopifnot( - is.data.table(r), - c("geog_abb","amount") %in% names(r), - nrow(r) == 50L -) - -# i %in% 1:2, by = geog_abb -r = ff$query(i = geog_abb %in% c("VT","AL"), by = "geog_abb", measure.vars = c("value")) -stopifnot( - is.data.table(r), - c("geog_abb","value") %in% names(r), - nrow(r) == 2L -) - -# i = CJ(1:3, 1:3), by = .(geog_abb, time_date) -r = ff$query(i.dt = CJ(geog_abb = c("VT","AL","TX"), time_date = as.Date(c("2010-01-01","2010-01-02","2010-01-03")), unique = TRUE), by = .(geog_abb, time_date)) -stopifnot( - is.data.table(r), - c("geog_abb","time_date","amount","value") %in% names(r), - nrow(r) == 3L -) - -# data.cube$initialize ---- - -X = populate_star(N = 1e3, surrogate.keys = FALSE, hierarchies = TRUE) - -dims = lapply(setNames(seq_along(X$dims), names(X$dims)), function(i){ - as.dimension(X$dims[[i]], - id.vars = key(X$dims[[i]]), - hierarchies = X$hierarchies[[i]]) -}) -ff = as.fact(x = X$fact$sales, - id.vars = sapply(dims, `[[`, "id.vars"), - measure.vars = c("amount","value"), - fun.aggregate = sum, - na.rm = TRUE) -dc = as.data.cube(ff, dims) - -# data.cube$subset ---- - -# dc[,,"BTC", c("TX","NY"), .(year = 2010L)] -# dc[,,"BTC", c("TX","NY")] # 4 args -# r = dc$subset() -# stopifnot( -# TRUE -# ) diff --git a/vignettes/big.data.cube.Rmd b/vignettes/big.data.cube.Rmd deleted file mode 100644 index 519a50d..0000000 --- a/vignettes/big.data.cube.Rmd +++ /dev/null @@ -1,74 +0,0 @@ ---- -vignette: > - %\VignetteIndexEntry{Distributed backend for data.cube} - %\VignetteEngine{knitr::rmarkdown} - %\VignetteEncoding{UTF-8} ---- - -# Distributed backend for data.cube -*Jan Gorecki, 2016-02-16* - -R *data.cube* class defined in [data.cube](https://gitlab.com/jangorecki/data.cube) package. - -```{r start_nodes, message=FALSE, echo=FALSE} -library(Rserve) - -port = 33311:33314 - -# shutdown any running nodes -rscl = lapply(setNames(port, port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w)) -invisible(lapply(rscl, function(rsc) if(inherits(rsc, "sockconn")) RSshutdown(rsc))) - -# start cluster -invisible(sapply(port, function(port) Rserve(debug = FALSE, port = port, args = c("--no-save")))) - -options("bigdatatable.log" = FALSE) # for interactive running tests -Sys.sleep(1) -``` - -# distributed fact table - -```{r fact, message=FALSE} -library(data.table) -library(big.data.table) -library(data.cube) - -rscl = rscl.connect(port = 33311:33314) -rscl.require(rscl, c("data.table","data.cube")) -X = populate_star(N = 1e5, surrogate.keys = FALSE) -bdt = as.big.data.table(X$fact$sales, rscl) -ff = as.fact(x = rscl, - id.vars = c("prod_name","cust_profile","curr_name","geog_abb","time_date"), - measure.vars = c("amount","value"), - na.rm = TRUE) -stopifnot( - is.big.data.table(ff$data), - inherits(ff, "fact"), - sapply(ff$measures, inherits, "measure"), - identical(ff$measure.vars, c("amount","value")) -) -invisible(rscl.close(rscl)) -``` - -```{r data.cube, message=FALSE} -# dc = as.data.cube(X) -# rscl.close(rscl) -``` - -```{r logr, message=FALSE} -# library(logR) - - -``` - -```{r shutdown_nodes, message=FALSE, echo=FALSE} -library(RSclient) - -port = 33311:33314 - -# shutdown any running nodes -l = lapply(setNames(port, port), function(port) tryCatch(RSconnect(port = port), error = function(e) e, warning = function(w) w)) -invisible(lapply(l, function(rsc) if(inherits(rsc, "sockconn")) RSshutdown(rsc))) -``` - -Any feedback/fixes/improvements submit to repo.