From 40f22ffe646cab275d5c80e9470276dd8100d4d7 Mon Sep 17 00:00:00 2001 From: Jeffrey Zhong Date: Fri, 27 Feb 2026 11:28:45 -0500 Subject: [PATCH 1/4] Parallelization in DaVinci with future first commit --- R/DAVINCHI.R | 5 +- R/new.R | 201 ++++++++++++++++++++++++++++----------------------- 2 files changed, 113 insertions(+), 93 deletions(-) diff --git a/R/DAVINCHI.R b/R/DAVINCHI.R index 8938461..9c89a28 100644 --- a/R/DAVINCHI.R +++ b/R/DAVINCHI.R @@ -14,7 +14,10 @@ pkgs <- c("Rcpp", "cluster", "mclust", "dendextend", -"BiocNeighbors") +"BiocNeighbors", +"future", +"future.apply", +"progressr",) .onAttach <- function(libname, pkgname){ diff --git a/R/new.R b/R/new.R index 452fb6c..2553578 100644 --- a/R/new.R +++ b/R/new.R @@ -615,8 +615,10 @@ Horizontal.Integration.Assemble <- function( mod.opt = "all", #all, common L2.in = "default", #default, L2norm, L2norm.joint random.seed = 123, - smooth = F - ){ + smooth = F, + cores = 1, # number of cores for parallelization + ram = 1 # in GB max RAM usage + ){ #sanity check @@ -624,63 +626,84 @@ Horizontal.Integration.Assemble <- function( if (is.null(dataset.opts)){ stop("Please include a vector of character names corresponding to the slices.") }#if - - + + + plan(multicore, workers = cores) + options(future.globals.maxSize = ram * 1024^3) + + # progress bar settings + handlers("progress") + + + options( + progressr.enable = TRUE, + progressr.enable_after = 0, + progressr.delay_stdout = FALSE + ) + + progressr::handlers(progressr::handler_txtprogressbar( + file = stdout(), + enable_after = 0 + )) + #self-contrastive learning step ######################################################### - exhaustive.list <- list() - - #ptm <- proc.time() - for (ii in 1:length(Y.list)){ - - message(paste0("Working on Sample ", ii, " / ", length(Y.list))) - proj <- list() - s0 <- NULL - + + exhaustive.list <- with_progress({ - count <- 0 - for (k.arg in k.arg.list){ - message("Working on k=", k.arg) - - if (is.null(s0)){ - - ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = k.arg, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - verbose = F, - random.seed = random.seed) - }else{ + p <- progressor(steps = length(Y.list) * length(k.arg.list)) - ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = k.arg, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - shur0 = s0, - verbose = F, - random.seed = random.seed) - }#else + future_lapply(seq_along(Y.list), function(ii) { - #saveRDS(ICAp.res, file = paste0(output.folder, "/", save.label, "@", normalize.version, "@k=", k.arg,".RDS") ) - count <- count+1 - proj[[count]] <- ICAp.res + message(paste0("Working on Sample ", ii, " / ", length(Y.list))) + proj <- list() + s0 <- NULL - if (is.null(s0)){ - s0 <- ICAp.res$shur0 - }#if - }#for k.arg + count <- 0 + for (k.arg in k.arg.list){ + message("Working on k=", k.arg) + + if (is.null(s0)){ + + ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = k.arg, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + verbose = F, + random.seed = random.seed) + }else{ - exhaustive.list[[ii]] <- proj - }#for ii - #print(proc.time()-ptm) + ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = k.arg, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + shur0 = s0, + verbose = F, + random.seed = random.seed) + }#else + + count <- count+1 + proj[[count]] <- ICAp.res + + p(sprintf("Sample %d: k=%d", ii, k.arg)) + + if (is.null(s0)){ + s0 <- ICAp.res$shur0 + }#if + + }#for k.arg + + return(proj) + }, future.seed = TRUE) + }) #self contrastive learning step @@ -702,55 +725,46 @@ Horizontal.Integration.Assemble <- function( #self contrastive learning ############################# - embed.list <- list() - for (ii in 1:length(exhaustive.list)){ - - embed.list[[ii]] <- self_deco(exhaustive.list[[ii]], - LVs.filter.thr = 0.9, - freq = 1, - opt = "B") - }#for ii + + embed.list <- future_lapply(seq_along(exhaustive.list), function(ii) { + self_deco(exhaustive.list[[ii]], + LVs.filter.thr = 0.9, + freq = 1, + opt = "B") + }, future.seed = TRUE) #run again to finetune ############################# - embed.list.finetune <- list() - for (ii in 1:length(embed.list)){ - - embed.list.finetune[[ii]] <- manifoldDecomp_adaptive(Y.list[[ii]], - L.list[[ii]], - k = nrow(embed.list[[ii]]$LVs), - B = embed.list[[ii]]$LVs, - L4 = L4.arg, - L4_adaptive = 2, - to_drop = T, - save.complete = T, - shur0 = exhaustive.list[[ii]][[1]]$shur0, - verbose = F, - random.seed = random.seed) - - }#for ii + embed.list.finetune <- future_lapply(seq_along(embed.list), function(ii) { + manifoldDecomp_adaptive(Y.list[[ii]], + L.list[[ii]], + k = nrow(embed.list[[ii]]$LVs), + B = embed.list[[ii]]$LVs, + L4 = L4.arg, + L4_adaptive = 2, + to_drop = T, + save.complete = T, + shur0 = exhaustive.list[[ii]][[1]]$shur0, + verbose = F, + random.seed = random.seed) + }, future.seed = TRUE) #Finetune after self-contrastive learning if (input.opt == "OnlyDeco"){ - dav.res.list <- list() - for (ii in 1:length(embed.list.finetune)){ - + dav.res.list <- future_lapply(seq_along(embed.list.finetune), function(ii) { ICAp.res <- embed.list.finetune[[ii]] #construct the pseudo ICAp.res objects - tmp <- list(Z = t(embed$LVs.pair), - B = embed$LVs, - L4 = ICAp.res$L4, - shur0 = ICAp.res$shur0, - L1 = ICAp.res$L1, - L2 = ICAp.res$L2) - #cor.res <- cor(t(embed$LVs), t(ICAp.res$B)) - - dav.res.list[[ii]] <- tmp - }#for ii + list(Z = t(embed$LVs.pair), + B = embed$LVs, + L4 = ICAp.res$L4, + shur0 = ICAp.res$shur0, + L1 = ICAp.res$L1, + L2 = ICAp.res$L2) + }, future.seed = TRUE) }else if (input.opt == "FineTune"){ @@ -836,11 +850,10 @@ Horizontal.Integration.Assemble <- function( }else{ mat.smooth <- mat - for (ii in 1:length(dataset.opts)){ + smooth_updates <- future_lapply(seq_along(dataset.opts), function(ii) { print(ii) subpart.index <- which(mat.slice.id==dataset.opts[ii]) - subpart <- mat[subpart.index,] subpart.smooth <- refinement.batch(subpart, as.matrix(coor.list[[ii]])[rownames(subpart),], @@ -848,9 +861,13 @@ Horizontal.Integration.Assemble <- function( neighbor.arg = 8, tasks = "continuous") - mat.smooth[subpart.index,] <- (subpart+t(subpart.smooth))/2 - - }#for ii + return(list(idx = subpart.index, + val = (subpart+t(subpart.smooth))/2)) + }, future.seed = TRUE) + + for (update in smooth_updates) { + mat.smooth[update$idx, ] <- update$val + } return(list(Y.list = Y.list, coor.list = coor.list, From 581ff799cae76839cc9a48439ac7e4dba2316f5c Mon Sep 17 00:00:00 2001 From: Jeffrey Zhong Date: Fri, 27 Feb 2026 15:37:26 -0500 Subject: [PATCH 2/4] fix import errors --- R/import.R | 3 +++ R/new.R | 34 ++++++++++++++++++++++------------ 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/R/import.R b/R/import.R index ed834e7..6da5b48 100644 --- a/R/import.R +++ b/R/import.R @@ -16,5 +16,8 @@ #' @importFrom stats wilcox.test #' @importFrom utils head #' @importFrom utils packageVersion +#' @importFrom future plan multicore +#' @importFrom future.apply future_lapply +#' @importFrom progressr handlers handler_txtprogressbar with_progress ## usethis namespace: end NULL \ No newline at end of file diff --git a/R/new.R b/R/new.R index 2553578..2424698 100644 --- a/R/new.R +++ b/R/new.R @@ -1,3 +1,7 @@ +library(future) +library(future.apply) +library(progressr) + #' Adaptive Leiden/Louvain clustering #' #' When the number of clusters is set, run louvain/leiden clustering in an adaptive manner. @@ -616,8 +620,9 @@ Horizontal.Integration.Assemble <- function( L2.in = "default", #default, L2norm, L2norm.joint random.seed = 123, smooth = F, - cores = 1, # number of cores for parallelization - ram = 1 # in GB max RAM usage + cores = 2, # number of cores for parallelization + ram = 1, # in GB max RAM usage + sequential = F ){ @@ -627,13 +632,18 @@ Horizontal.Integration.Assemble <- function( stop("Please include a vector of character names corresponding to the slices.") }#if + if (sequential){ + #sequential execution + future::plan(future::sequential) + }else{ + #parallel execution + future::plan(future::multicore, workers = cores) + } - plan(multicore, workers = cores) options(future.globals.maxSize = ram * 1024^3) # progress bar settings - handlers("progress") - + progressr::handlers("progress") options( progressr.enable = TRUE, @@ -649,11 +659,11 @@ Horizontal.Integration.Assemble <- function( #self-contrastive learning step ######################################################### - exhaustive.list <- with_progress({ + exhaustive.list <- progressr::with_progress({ - p <- progressor(steps = length(Y.list) * length(k.arg.list)) + p <- progressr::progressor(steps = length(Y.list) * length(k.arg.list)) - future_lapply(seq_along(Y.list), function(ii) { + future.apply::future_lapply(seq_along(Y.list), function(ii) { message(paste0("Working on Sample ", ii, " / ", length(Y.list))) proj <- list() @@ -726,7 +736,7 @@ Horizontal.Integration.Assemble <- function( #self contrastive learning ############################# - embed.list <- future_lapply(seq_along(exhaustive.list), function(ii) { + embed.list <- future.apply::future_lapply(seq_along(exhaustive.list), function(ii) { self_deco(exhaustive.list[[ii]], LVs.filter.thr = 0.9, freq = 1, @@ -737,7 +747,7 @@ Horizontal.Integration.Assemble <- function( #run again to finetune ############################# - embed.list.finetune <- future_lapply(seq_along(embed.list), function(ii) { + embed.list.finetune <- future.apply::future_lapply(seq_along(embed.list), function(ii) { manifoldDecomp_adaptive(Y.list[[ii]], L.list[[ii]], k = nrow(embed.list[[ii]]$LVs), @@ -755,7 +765,7 @@ Horizontal.Integration.Assemble <- function( #Finetune after self-contrastive learning if (input.opt == "OnlyDeco"){ - dav.res.list <- future_lapply(seq_along(embed.list.finetune), function(ii) { + dav.res.list <- future.apply::future_lapply(seq_along(embed.list.finetune), function(ii) { ICAp.res <- embed.list.finetune[[ii]] #construct the pseudo ICAp.res objects list(Z = t(embed$LVs.pair), @@ -850,7 +860,7 @@ Horizontal.Integration.Assemble <- function( }else{ mat.smooth <- mat - smooth_updates <- future_lapply(seq_along(dataset.opts), function(ii) { + smooth_updates <- future.apply::future_lapply(seq_along(dataset.opts), function(ii) { print(ii) subpart.index <- which(mat.slice.id==dataset.opts[ii]) From 84df557cd1a7a95b4f8a0fd2e24238aba337d5f1 Mon Sep 17 00:00:00 2001 From: Jeffrey Zhong Date: Fri, 27 Feb 2026 16:45:44 -0500 Subject: [PATCH 3/4] fixed windows compatability --- R/new.R | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/R/new.R b/R/new.R index 2424698..5dc0555 100644 --- a/R/new.R +++ b/R/new.R @@ -620,9 +620,10 @@ Horizontal.Integration.Assemble <- function( L2.in = "default", #default, L2norm, L2norm.joint random.seed = 123, smooth = F, - cores = 2, # number of cores for parallelization + workers = 2, # number of cores/workers for parallelization ram = 1, # in GB max RAM usage - sequential = F + sequential = F, + multi = "multicore" # multicore (mac and linux), multisession (for windows) ){ @@ -637,7 +638,12 @@ Horizontal.Integration.Assemble <- function( future::plan(future::sequential) }else{ #parallel execution - future::plan(future::multicore, workers = cores) + if (multi == "multisession"){ + future::plan(future::multisession, workers = workers) + }else{ + future::plan(future::multicore, workers = workers) + } + pkgs_to_load <- if (multi == "multisession") c("DaVinci", "progressr") else NULL } options(future.globals.maxSize = ram * 1024^3) @@ -711,7 +717,7 @@ Horizontal.Integration.Assemble <- function( }#for k.arg return(proj) - }, future.seed = TRUE) + }, future.seed = TRUE, future.packages = pkgs_to_load) }) @@ -741,7 +747,7 @@ Horizontal.Integration.Assemble <- function( LVs.filter.thr = 0.9, freq = 1, opt = "B") - }, future.seed = TRUE) + }, future.seed = TRUE, future.packages = pkgs_to_load) #run again to finetune @@ -759,7 +765,7 @@ Horizontal.Integration.Assemble <- function( shur0 = exhaustive.list[[ii]][[1]]$shur0, verbose = F, random.seed = random.seed) - }, future.seed = TRUE) + }, future.seed = TRUE, future.packages = pkgs_to_load) #Finetune after self-contrastive learning @@ -774,7 +780,7 @@ Horizontal.Integration.Assemble <- function( shur0 = ICAp.res$shur0, L1 = ICAp.res$L1, L2 = ICAp.res$L2) - }, future.seed = TRUE) + }, future.seed = TRUE, future.packages = pkgs_to_load) }else if (input.opt == "FineTune"){ @@ -873,7 +879,7 @@ Horizontal.Integration.Assemble <- function( return(list(idx = subpart.index, val = (subpart+t(subpart.smooth))/2)) - }, future.seed = TRUE) + }, future.seed = TRUE, future.packages = pkgs_to_load) for (update in smooth_updates) { mat.smooth[update$idx, ] <- update$val @@ -895,6 +901,8 @@ Horizontal.Integration.Assemble <- function( }#else + future::plan(sequential) # Explicitly close multisession workers by switching plan + }#Horizontal.Integration.Assemble From 9a8ea74872ddb1c55c220c62534ac596538fe570 Mon Sep 17 00:00:00 2001 From: Jeffrey Zhong Date: Fri, 27 Feb 2026 17:24:25 -0500 Subject: [PATCH 4/4] increase default ram size --- R/new.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/new.R b/R/new.R index 5dc0555..1361986 100644 --- a/R/new.R +++ b/R/new.R @@ -621,7 +621,7 @@ Horizontal.Integration.Assemble <- function( random.seed = 123, smooth = F, workers = 2, # number of cores/workers for parallelization - ram = 1, # in GB max RAM usage + ram = 15, # in GB max RAM usage sequential = F, multi = "multicore" # multicore (mac and linux), multisession (for windows) ){