Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
8276e71
Bug fixes for the missing ensemble.size error.
Oct 3, 2025
05f82da
Implement joint input sampling in the SDA workflows.
Oct 3, 2025
884a69d
Update the joint inputs sampling function.
Oct 3, 2025
f7f7736
Merge branch 'PecanProject:develop' into SDA_data
DongchenZ Oct 5, 2025
447b08d
Update the ensemble configs function.
Oct 5, 2025
92cd3f1
Clean the multisite sda function.
Oct 5, 2025
f2a96d1
Revert the changes.
Oct 5, 2025
40701fe
Revert changes.
Oct 5, 2025
4893c11
revert changes.
Oct 5, 2025
f6d473a
Revert changes.
Oct 5, 2025
f787a17
Add documentation and bug fixes.
Oct 5, 2025
c82acac
Update document.
Oct 5, 2025
f6f2232
Update the function to work with the newest functions.
Oct 5, 2025
a30c9e8
Remove the samples argument from the function.
Oct 5, 2025
74d5a1e
Add changes to the sda workflow.
Oct 6, 2025
70cfcb1
Add namespace.
Oct 7, 2025
1f87740
Bug fix.
Oct 7, 2025
8b0a640
Apply Mike's comments.
Oct 8, 2025
fe6bdca
Add the merge nc feature.
Oct 8, 2025
660186b
Improve scripts.
Oct 8, 2025
dd0ee2e
Bug fix.
Oct 8, 2025
5ddcaa7
Update the parallel registration.
Oct 15, 2025
ea8332a
Fix the dimension error when YN equals one.
Oct 15, 2025
5cb5a38
Update the parallel registration. And add the else control when it's …
Oct 15, 2025
fdacd5f
Convert from SoilMoist to SoilMoistFrac.
Oct 15, 2025
abecd42
Add the recursive file deletion.
Oct 15, 2025
fa18299
Revert changes to resolve conflicts.
Oct 15, 2025
81f035b
revert change.
Oct 15, 2025
3fc6907
Pull from pecan develop.
Oct 15, 2025
6fba821
change by make.
Oct 15, 2025
02ca610
Combine previous changes with the debias implementation.
Oct 15, 2025
3575463
Merge the debiasing feature into the parallel SDA workflow.
Oct 15, 2025
bebf893
Update document.
Oct 15, 2025
1409564
Update document.
Oct 15, 2025
870593f
Add the debiasing feature to the parallel job submission.
Oct 15, 2025
10310bc
map forecast results to the variable boundary.
Oct 15, 2025
300a900
Update comments.
Oct 15, 2025
408a390
update observation paths.
Oct 15, 2025
5b3cc63
Update computation configuration.
Oct 15, 2025
393c16e
Bug fix.
Oct 15, 2025
4ac6250
Bug fix.
Oct 16, 2025
50c2324
Revert change.
Oct 16, 2025
993e98b
Revert change.
Oct 16, 2025
78e4702
Add debias code.
Oct 16, 2025
d6d06f3
Move the previous debias script to the inst folder.
Oct 18, 2025
460e0ee
remove previous debias functions.
Oct 18, 2025
6025d83
Add the main SDA debias workflow.
Oct 18, 2025
85c59fe
Add the new debias workflow to the multi-site version.
Oct 18, 2025
ec38802
Add the new bias correction workflow to the sda_local script.
Oct 18, 2025
f1d10a1
Add the new SDA debiasing workflow to the SDA job submission function.
Oct 18, 2025
77bda48
Move out of the if control.
Oct 19, 2025
9c3fd8e
Bug fix.
Oct 19, 2025
e6ef100
Bug fix
Oct 19, 2025
087f18a
Remove the condition for the wrapping where we have more than 20 char…
Oct 19, 2025
5a04563
Update the runner script.
Oct 19, 2025
7ba598e
remove arg.
Oct 20, 2025
19db026
Update script.
Oct 20, 2025
2f14acc
Revert "Move out of the if control."
Oct 20, 2025
caefddf
Resolve conflicts.
Oct 20, 2025
6da0cd6
Revert back.
Oct 20, 2025
cb2379a
Revert to using the new debias workflow.
Oct 21, 2025
668c4b1
Update document.
Oct 21, 2025
e543c23
Update document.
Oct 21, 2025
a44399d
Update the runner script.
Oct 21, 2025
4701b41
Update the bias correction function.
Oct 21, 2025
2754b91
Update the usage of the bias correction workflow.
Oct 21, 2025
459c945
Merge branch 'PecanProject:develop' into SDA_data
DongchenZ Oct 21, 2025
0e08b79
Update observation paths.
Oct 21, 2025
af5c081
Update namespace.
Oct 21, 2025
300dfae
Merge branch 'SDA_data' of https://github.com/DongchenZ/pecan into SD…
Oct 21, 2025
c8a8206
Modify the test to match the change in the logger.message function.
Oct 21, 2025
441f732
Change the default value of wrap in the logger message function.
Oct 21, 2025
9099aa0
Revert back.
Oct 21, 2025
986930b
Merge branch 'PecanProject:develop' into SDA_data
DongchenZ Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions modules/assim.sequential/R/Analysis_sda_block.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov,
# grab cores from settings.
cores <- as.numeric(settings$state.data.assimilation$batch.settings$general.job$cores)
# if we didn't assign number of CPUs in the settings.
if (is.null(cores)) {
cores <- parallel::detectCores() - 1
# if we only have one CPU.
if (cores < 1) cores <- 1
if (length(cores) == 0 | is.null(cores)) {
cores <- parallel::detectCores()
}
cores <- cores - 1
# if we only have one CPU.
if (cores < 1) cores <- 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the parallel registration.

#convert from vector values to block lists.
if ("try-error" %in% class(try(block.results <- build.block.xy(settings = settings,
block.list.all = block.list.all,
Expand Down Expand Up @@ -570,6 +571,7 @@ update_q <- function (block.list.all, t, nt, aqq.Init = NULL, bqq.Init = NULL, M
#if it's an update.
if (is.null(MCMC_dat)) {
#loop over blocks
#if t=1 or if it's a fresh run.
if (t == 1) {
for (i in seq_along(block.list)) {
nvar <- length(block.list[[i]]$data$muf)
Expand Down
1,079 changes: 432 additions & 647 deletions modules/assim.sequential/R/sda.enkf_MultiSite.R

Large diffs are not rendered by default.

200 changes: 142 additions & 58 deletions modules/assim.sequential/R/sda.enkf_parallel.R

Large diffs are not rendered by default.

143 changes: 143 additions & 0 deletions modules/assim.sequential/R/sda_bias_correction.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#' @description
#' This function helps to correct the forecasts' biases based on
#' ML (random forest) training on the previous time point.
#' @title sda_bias_correction
#'
#' @param site.locs data.frame: data.frame that contains longitude and latitude in its first and second column.
#' @param t numeric: the current number of time points (e.g., t=1 for the beginning time point).
#' @param pre.X data.frame: data frame of model forecast at the previous time point
#' that has n (ensemble size) rows and n.var (number of variables) times n.site (number of locations) columns.
#' (e.g., 100 ensembles, 4 variables, and 8,000 locations will end up with data.frame of 100 rows and 32,000 columns)
#' @param X data.frame: data frame of model forecast at the current time point.
#' @param obs.mean List: lists of date times named by time points, which contains lists of sites named by site ids,
#' which contains observation means for each state variables of each site for each time point.
#' @param state.interval matrix: containing the upper and lower boundaries for each state variable.
#' @param cov.dir character: physical path to the directory contains the time series covariate maps.
#' @param py.init R function: R function to initialize the python functions. Default is NULL.
#' the default random forest will be used if `py.init` is NULL.
#' @param pre.states list: containing previous covariates for each location.
#'
#' @return list: the current X after the bias-correction; the ML model for each variable; predicted residuals.
#'
#' @author Dongchen Zhang
#' @importFrom dplyr %>%

sda_bias_correction <- function (site.locs,
t, pre.X, X,
obs.mean,
state.interval,
cov.dir,
pre.states,
py.init = NULL) {
# if we have prescribed python script to use.
if (!is.null(py.init)) {
# load python functions.
py <- py.init()
}
# grab variable names.
var.names <- rownames(state.interval)
# create terra spatial points.
pts <- terra::vect(cbind(site.locs$Lon, site.locs$Lat), crs = "epsg:4326")
# grab the current year.
y <- lubridate::year(names(obs.mean))[t]
# if we don't have previous extracted information.
# grab the covariate file path.
cov.file.pre <- list.files(cov.dir, full.names = T)[which(grepl(y-1, list.files(cov.dir)))] # previous covaraites.
# extract covariates for the previous time point.
cov.pre <- terra::extract(x = terra::rast(cov.file.pre), y = pts)[,-1] # remove the first ID column.
# factorize land cover band.
if ("LC" %in% colnames(cov.pre)) {
cov.pre[,"LC"] <- factor(cov.pre[,"LC"])
}
# extract covariates for the current time point.
cov.file <- list.files(cov.dir, full.names = T)[which(grepl(y, list.files(cov.dir)))] # current covaraites.
cov.current <- terra::extract(x = terra::rast(cov.file), y = pts)[,-1] # remove the first ID column.
complete.inds <- which(stats::complete.cases(cov.current))
cov.current <- cov.current[complete.inds,]
# factorize land cover band.
if ("LC" %in% colnames(cov.current)) {
cov.current[,"LC"] <- factor(cov.current[,"LC"])
}
cov.names <- colnames(cov.current) # grab band names for the covariate map.
# loop over variables.
# initialize model list for each variable.
models <- res.vars <- vector("list", length = length(var.names)) %>% purrr::set_names(var.names)
for (v in var.names) {
message(paste("processing", v))
# train residuals on the previous time point.
# grab column index for the current variable.
inds <- which(grepl(v, colnames(pre.X)))
# grab observations for the current variable.
obs.v <- obs.mean[[t-1]] %>% purrr::map(function(obs){
if (is.null(obs[[v]])) {
return(NA)
} else {
return(obs[[v]])
}
}) %>% unlist
# calculate residuals for the previous time point.
res.pre <- colMeans(pre.X[,inds]) - obs.v
# prepare training data set.
ml.df <- cbind(cov.pre, colMeans(pre.X)[inds], res.pre)
colnames(ml.df)[length(ml.df)-1] <- "raw_dat" # rename the column name.
ml.df <- rbind(pre.states[[v]], ml.df) # grab previous covariates.
ml.df <- ml.df[which(stats::complete.cases(ml.df)),]
pre.states[[v]] <- ml.df # store the historical covariates for future use.
# prepare predicting covariates.
cov.df <- cbind(cov.current, colMeans(X)[inds[complete.inds]])
colnames(cov.df)[length(cov.df)] <- "raw_dat"
if (nrow(ml.df) == 0) next # jump to the next loop if we have zero records.
if (is.null(py.init)) {
# random forest training.
formula <- stats::as.formula(paste("res.pre", "~", paste(cov.names, collapse = " + ")))
model <- randomForest::randomForest(formula,
data = ml.df,
ntree = 1000,
na.action = stats::na.omit,
keep.forest = TRUE,
importance = TRUE)
var.imp <- randomForest::importance(model)
models[[v]] <- var.imp # store the variable importance.
# predict residuals for the current time point.
res.current <- stats::predict(model, cov.df)
} else {
# using functions from the python script.
# training.
fi_ret <- py$train_full_model(
name = as.character(v), # current variable name.
X = as.matrix(ml.df[,-length(ml.df)]), # covariates + previous forecast means.
y = as.numeric(ml.df[["res.pre"]]), # residuals.
feature_names = colnames(ml.df[,-length(ml.df)])
)
# predicting.
res.current <- py$predict_residual(as.character(v), as.matrix(cov.df))
# store model outputs.
# weights.
w_now <- try(py$get_model_weights(as.character(v)), silent = TRUE)
w_now <- min(max(as.numeric(w_now), 0), 1)
w_named <- c(KNN = w_now, TREE = 1 - w_now)
# var importance.
fi_ret <- tryCatch(reticulate::py_to_r(fi_ret), error = function(e) fi_ret)
fn <- as.character(unlist(fi_ret[["names"]], use.names = FALSE))
fv <- as.numeric(unlist(fi_ret[["importances"]], use.names = FALSE)) %>% purrr::set_names(fn)
models[[v]] <- list(weights = w_named, var.imp = fv) # store the variable importance.
}
# assign NAs to places with no observations in the previous time point.
res <- rep(NA, length(obs.v)) %>% purrr::set_names(unique(attributes(X)$Site))
res[complete.inds] <- res.current
res[which(is.na(obs.v))] <- NA
res.vars[[v]] <- res
# correct the current forecasts.
for (i in seq_along(inds)) {
if (is.na(res[i])) next
X[,inds[i]] <- X[,inds[i]] - res[i]
}
}
# map forecasts towards the prescribed variable boundaries.
for(i in 1:ncol(X)){
int.save <- state.interval[which(startsWith(colnames(X)[i], var.names)),]
X[X[,i] < int.save[1],i] <- int.save[1]
X[X[,i] > int.save[2],i] <- int.save[2]
}
return(list(X = X, models = models, res = res.vars, pre.states = pre.states))
}
23 changes: 15 additions & 8 deletions modules/assim.sequential/inst/anchor/SDA_NA_runner.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ setwd("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/")
settings_dir <- "/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/pecan.xml"
settings <- PEcAn.settings::read.settings(settings_dir)

# update settings with the actual PFTs.
settings <- PEcAn.settings::prepare.settings(settings)

# setup the batch job settings.
general.job <- list(cores = 28, folder.num = 80)
general.job <- list(cores = 28, folder.num = 35)
batch.settings = structure(list(
general.job = general.job,
qsub.cmd = "qsub -l h_rt=24:00:00 -l mem_per_core=4G -l buyin -pe omp @CORES@ -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash"
))
settings$state.data.assimilation$batch.settings <- batch.settings

# alter the ensemble size.
settings$ensemble$size <- 10

# update settings with the actual PFTs.
settings <- PEcAn.settings::prepare.settings(settings)
settings$ensemble$size <- 100

# load observations.
load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs_mean.Rdata")
load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs_cov.Rdata")
load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs.mean.Rdata")
load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs.cov.Rdata")

# replace zero observations and variances with small numbers.
for (i in 1:length(obs.mean)) {
Expand Down Expand Up @@ -83,4 +83,11 @@ PEcAnAssimSequential::qsub_sda(settings = settings,
send_email = NULL,
keepNC = FALSE,
forceRun = TRUE,
MCMC.args = NULL))
MCMC.args = NULL,
merge_nc = TRUE),
block.index = NULL,
debias = list(cov.dir = "/projectnb/dietzelab/dongchen/anchorSites/NA_runs/covariates_lc_ts/covariates_nolatlon/", start.year = 2014))

# export sda output.
PEcAnAssimSequential::sda_assemble("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/batch",
"/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site")
42 changes: 0 additions & 42 deletions modules/assim.sequential/man/debias_cov_by_columns.Rd

This file was deleted.

52 changes: 0 additions & 52 deletions modules/assim.sequential/man/debias_get_covariates_for_date.Rd

This file was deleted.

Loading
Loading