diff --git a/R/DAVINCHI.R b/R/DAVINCHI.R index 8938461..9c89a28 100644 --- a/R/DAVINCHI.R +++ b/R/DAVINCHI.R @@ -14,7 +14,10 @@ pkgs <- c("Rcpp", "cluster", "mclust", "dendextend", -"BiocNeighbors") +"BiocNeighbors", +"future", +"future.apply", +"progressr",) .onAttach <- function(libname, pkgname){ diff --git a/R/import.R b/R/import.R index ed834e7..6da5b48 100644 --- a/R/import.R +++ b/R/import.R @@ -16,5 +16,8 @@ #' @importFrom stats wilcox.test #' @importFrom utils head #' @importFrom utils packageVersion +#' @importFrom future plan multicore +#' @importFrom future.apply future_lapply +#' @importFrom progressr handlers handler_txtprogressbar with_progress ## usethis namespace: end NULL \ No newline at end of file diff --git a/R/new.R b/R/new.R index 452fb6c..1361986 100644 --- a/R/new.R +++ b/R/new.R @@ -1,3 +1,7 @@ +library(future) +library(future.apply) +library(progressr) + #' Adaptive Leiden/Louvain clustering #' #' When the number of clusters is set, run louvain/leiden clustering in an adaptive manner. @@ -615,8 +619,12 @@ Horizontal.Integration.Assemble <- function( mod.opt = "all", #all, common L2.in = "default", #default, L2norm, L2norm.joint random.seed = 123, - smooth = F - ){ + smooth = F, + workers = 2, # number of cores/workers for parallelization + ram = 15, # in GB max RAM usage + sequential = F, + multi = "multicore" # multicore (mac and linux), multisession (for windows) + ){ #sanity check @@ -624,63 +632,94 @@ Horizontal.Integration.Assemble <- function( if (is.null(dataset.opts)){ stop("Please include a vector of character names corresponding to the slices.") }#if - - + + if (sequential){ + #sequential execution + future::plan(future::sequential) + }else{ + #parallel execution + if (multi == "multisession"){ + future::plan(future::multisession, workers = workers) + }else{ + future::plan(future::multicore, workers = workers) + } + pkgs_to_load <- if (multi == "multisession") c("DaVinci", "progressr") else NULL + } + + options(future.globals.maxSize = ram * 1024^3) + + # progress bar settings + progressr::handlers("progress") + + options( + progressr.enable = TRUE, + progressr.enable_after = 0, + progressr.delay_stdout = FALSE + ) + + progressr::handlers(progressr::handler_txtprogressbar( + file = stdout(), + enable_after = 0 + )) + #self-contrastive learning step ######################################################### - exhaustive.list <- list() - - #ptm <- proc.time() - for (ii in 1:length(Y.list)){ - - message(paste0("Working on Sample ", ii, " / ", length(Y.list))) - proj <- list() - s0 <- NULL - + + exhaustive.list <- progressr::with_progress({ - count <- 0 - for (k.arg in k.arg.list){ - message("Working on k=", k.arg) - - if (is.null(s0)){ - - ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = k.arg, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - verbose = F, - random.seed = random.seed) - }else{ + p <- progressr::progressor(steps = length(Y.list) * length(k.arg.list)) - ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = k.arg, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - shur0 = s0, - verbose = F, - random.seed = random.seed) - }#else + future.apply::future_lapply(seq_along(Y.list), function(ii) { - #saveRDS(ICAp.res, file = paste0(output.folder, "/", save.label, "@", normalize.version, "@k=", k.arg,".RDS") ) - count <- count+1 - proj[[count]] <- ICAp.res + message(paste0("Working on Sample ", ii, " / ", length(Y.list))) + proj <- list() + s0 <- NULL - if (is.null(s0)){ - s0 <- ICAp.res$shur0 - }#if - }#for k.arg + count <- 0 + for (k.arg in k.arg.list){ + message("Working on k=", k.arg) + + if (is.null(s0)){ + + ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = k.arg, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + verbose = F, + random.seed = random.seed) + }else{ - exhaustive.list[[ii]] <- proj - }#for ii - #print(proc.time()-ptm) + ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = k.arg, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + shur0 = s0, + verbose = F, + random.seed = random.seed) + }#else + + count <- count+1 + proj[[count]] <- ICAp.res + p(sprintf("Sample %d: k=%d", ii, k.arg)) + + if (is.null(s0)){ + s0 <- ICAp.res$shur0 + }#if + + }#for k.arg + + return(proj) + }, future.seed = TRUE, future.packages = pkgs_to_load) + + }) #self contrastive learning step @@ -702,55 +741,46 @@ Horizontal.Integration.Assemble <- function( #self contrastive learning ############################# - embed.list <- list() - for (ii in 1:length(exhaustive.list)){ - - embed.list[[ii]] <- self_deco(exhaustive.list[[ii]], - LVs.filter.thr = 0.9, - freq = 1, - opt = "B") - }#for ii + + embed.list <- future.apply::future_lapply(seq_along(exhaustive.list), function(ii) { + self_deco(exhaustive.list[[ii]], + LVs.filter.thr = 0.9, + freq = 1, + opt = "B") + }, future.seed = TRUE, future.packages = pkgs_to_load) #run again to finetune ############################# - embed.list.finetune <- list() - for (ii in 1:length(embed.list)){ - - embed.list.finetune[[ii]] <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = nrow(embed.list[[ii]]$LVs), - B = embed.list[[ii]]$LVs, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - shur0 = exhaustive.list[[ii]][[1]]$shur0, - verbose = F, - random.seed = random.seed) - - }#for ii + embed.list.finetune <- future.apply::future_lapply(seq_along(embed.list), function(ii) { + manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = nrow(embed.list[[ii]]$LVs), + B = embed.list[[ii]]$LVs, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + shur0 = exhaustive.list[[ii]][[1]]$shur0, + verbose = F, + random.seed = random.seed) + }, future.seed = TRUE, future.packages = pkgs_to_load) #Finetune after self-contrastive learning if (input.opt == "OnlyDeco"){ - dav.res.list <- list() - for (ii in 1:length(embed.list.finetune)){ - + dav.res.list <- future.apply::future_lapply(seq_along(embed.list.finetune), function(ii) { ICAp.res <- embed.list.finetune[[ii]] #construct the pseudo ICAp.res objects - tmp <- list(Z = t(embed$LVs.pair), - B = embed$LVs, - L4 = ICAp.res$L4, - shur0 = ICAp.res$shur0, - L1 = ICAp.res$L1, - L2 = ICAp.res$L2) - #cor.res <- cor(t(embed$LVs), t(ICAp.res$B)) - - dav.res.list[[ii]] <- tmp - }#for ii + list(Z = t(embed$LVs.pair), + B = embed$LVs, + L4 = ICAp.res$L4, + shur0 = ICAp.res$shur0, + L1 = ICAp.res$L1, + L2 = ICAp.res$L2) + }, future.seed = TRUE, future.packages = pkgs_to_load) }else if (input.opt == "FineTune"){ @@ -836,11 +866,10 @@ Horizontal.Integration.Assemble <- function( }else{ mat.smooth <- mat - for (ii in 1:length(dataset.opts)){ + smooth_updates <- future.apply::future_lapply(seq_along(dataset.opts), function(ii) { print(ii) subpart.index <- which(mat.slice.id==dataset.opts[ii]) - subpart <- mat[subpart.index,] subpart.smooth <- refinement.batch(subpart, as.matrix(coor.list[[ii]])[rownames(subpart),], @@ -848,9 +877,13 @@ Horizontal.Integration.Assemble <- function( neighbor.arg = 8, tasks = "continuous") - mat.smooth[subpart.index,] <- (subpart+t(subpart.smooth))/2 - - }#for ii + return(list(idx = subpart.index, + val = (subpart+t(subpart.smooth))/2)) + }, future.seed = TRUE, future.packages = pkgs_to_load) + + for (update in smooth_updates) { + mat.smooth[update$idx, ] <- update$val + } return(list(Y.list = Y.list, coor.list = coor.list, @@ -868,6 +901,8 @@ Horizontal.Integration.Assemble <- function( }#else + future::plan(sequential) # Explicitly close multisession workers by switching plan + }#Horizontal.Integration.Assemble