Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion R/DAVINCHI.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pkgs <- c("Rcpp",
"cluster",
"mclust",
"dendextend",
"BiocNeighbors")
"BiocNeighbors",
"future",
"future.apply",
"progressr",)


.onAttach <- function(libname, pkgname){
Expand Down
3 changes: 3 additions & 0 deletions R/import.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@
#' @importFrom stats wilcox.test
#' @importFrom utils head
#' @importFrom utils packageVersion
#' @importFrom future plan multicore
#' @importFrom future.apply future_lapply
#' @importFrom progressr handlers handler_txtprogressbar with_progress
## usethis namespace: end
NULL
219 changes: 127 additions & 92 deletions R/new.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
library(future)
library(future.apply)
library(progressr)

#' Adaptive Leiden/Louvain clustering
#'
#' When the number of clusters is set, run louvain/leiden clustering in an adaptive manner.
Expand Down Expand Up @@ -615,72 +619,107 @@ Horizontal.Integration.Assemble <- function(
mod.opt = "all", #all, common
L2.in = "default", #default, L2norm, L2norm.joint
random.seed = 123,
smooth = F
){
smooth = F,
workers = 2, # number of cores/workers for parallelization
ram = 15, # in GB max RAM usage
sequential = F,
multi = "multicore" # multicore (mac and linux), multisession (for windows)
){


#sanity check
#########################################################
if (is.null(dataset.opts)){
stop("Please include a vector of character names corresponding to the slices.")
}#if



if (sequential){
#sequential execution
future::plan(future::sequential)
}else{
#parallel execution
if (multi == "multisession"){
future::plan(future::multisession, workers = workers)
}else{
future::plan(future::multicore, workers = workers)
}
pkgs_to_load <- if (multi == "multisession") c("DaVinci", "progressr") else NULL
}

options(future.globals.maxSize = ram * 1024^3)

# progress bar settings
progressr::handlers("progress")

options(
progressr.enable = TRUE,
progressr.enable_after = 0,
progressr.delay_stdout = FALSE
)

progressr::handlers(progressr::handler_txtprogressbar(
file = stdout(),
enable_after = 0
))

#self-contrastive learning step
#########################################################
exhaustive.list <- list()

#ptm <- proc.time()
for (ii in 1:length(Y.list)){

message(paste0("Working on Sample ", ii, " / ", length(Y.list)))
proj <- list()
s0 <- NULL


exhaustive.list <- progressr::with_progress({

count <- 0
for (k.arg in k.arg.list){
message("Working on k=", k.arg)

if (is.null(s0)){

ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = k.arg,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
verbose = F,
random.seed = random.seed)
}else{
p <- progressr::progressor(steps = length(Y.list) * length(k.arg.list))

ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = k.arg,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
shur0 = s0,
verbose = F,
random.seed = random.seed)
}#else
future.apply::future_lapply(seq_along(Y.list), function(ii) {

#saveRDS(ICAp.res, file = paste0(output.folder, "/", save.label, "@", normalize.version, "@k=", k.arg,".RDS") )
count <- count+1
proj[[count]] <- ICAp.res
message(paste0("Working on Sample ", ii, " / ", length(Y.list)))
proj <- list()
s0 <- NULL

if (is.null(s0)){
s0 <- ICAp.res$shur0
}#if

}#for k.arg
count <- 0
for (k.arg in k.arg.list){
message("Working on k=", k.arg)

if (is.null(s0)){

ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = k.arg,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
verbose = F,
random.seed = random.seed)
}else{

exhaustive.list[[ii]] <- proj
}#for ii
#print(proc.time()-ptm)
ICAp.res <- manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = k.arg,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
shur0 = s0,
verbose = F,
random.seed = random.seed)
}#else

count <- count+1
proj[[count]] <- ICAp.res

p(sprintf("Sample %d: k=%d", ii, k.arg))

if (is.null(s0)){
s0 <- ICAp.res$shur0
}#if

}#for k.arg

return(proj)
}, future.seed = TRUE, future.packages = pkgs_to_load)

})


#self contrastive learning step
Expand All @@ -702,55 +741,46 @@ Horizontal.Integration.Assemble <- function(

#self contrastive learning
#############################
embed.list <- list()
for (ii in 1:length(exhaustive.list)){

embed.list[[ii]] <- self_deco(exhaustive.list[[ii]],
LVs.filter.thr = 0.9,
freq = 1,
opt = "B")
}#for ii

embed.list <- future.apply::future_lapply(seq_along(exhaustive.list), function(ii) {
self_deco(exhaustive.list[[ii]],
LVs.filter.thr = 0.9,
freq = 1,
opt = "B")
}, future.seed = TRUE, future.packages = pkgs_to_load)


#run again to finetune
#############################

embed.list.finetune <- list()
for (ii in 1:length(embed.list)){

embed.list.finetune[[ii]] <- manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = nrow(embed.list[[ii]]$LVs),
B = embed.list[[ii]]$LVs,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
shur0 = exhaustive.list[[ii]][[1]]$shur0,
verbose = F,
random.seed = random.seed)

}#for ii
embed.list.finetune <- future.apply::future_lapply(seq_along(embed.list), function(ii) {
manifoldDecomp_adaptive(Y.list[[ii]],
L.list[[ii]],
k = nrow(embed.list[[ii]]$LVs),
B = embed.list[[ii]]$LVs,
L4 = L4.arg,
L4_adaptive = 2,
to_drop = T,
save.complete = T,
shur0 = exhaustive.list[[ii]][[1]]$shur0,
verbose = F,
random.seed = random.seed)
}, future.seed = TRUE, future.packages = pkgs_to_load)


#Finetune after self-contrastive learning
if (input.opt == "OnlyDeco"){
dav.res.list <- list()

for (ii in 1:length(embed.list.finetune)){

dav.res.list <- future.apply::future_lapply(seq_along(embed.list.finetune), function(ii) {
ICAp.res <- embed.list.finetune[[ii]]
#construct the pseudo ICAp.res objects
tmp <- list(Z = t(embed$LVs.pair),
B = embed$LVs,
L4 = ICAp.res$L4,
shur0 = ICAp.res$shur0,
L1 = ICAp.res$L1,
L2 = ICAp.res$L2)
#cor.res <- cor(t(embed$LVs), t(ICAp.res$B))

dav.res.list[[ii]] <- tmp
}#for ii
list(Z = t(embed$LVs.pair),
B = embed$LVs,
L4 = ICAp.res$L4,
shur0 = ICAp.res$shur0,
L1 = ICAp.res$L1,
L2 = ICAp.res$L2)
}, future.seed = TRUE, future.packages = pkgs_to_load)

}else if (input.opt == "FineTune"){

Expand Down Expand Up @@ -836,21 +866,24 @@ Horizontal.Integration.Assemble <- function(
}else{
mat.smooth <- mat

for (ii in 1:length(dataset.opts)){
smooth_updates <- future.apply::future_lapply(seq_along(dataset.opts), function(ii) {
print(ii)

subpart.index <- which(mat.slice.id==dataset.opts[ii])

subpart <- mat[subpart.index,]
subpart.smooth <- refinement.batch(subpart,
as.matrix(coor.list[[ii]])[rownames(subpart),],
neighbor.option = "KNN",
neighbor.arg = 8,
tasks = "continuous")

mat.smooth[subpart.index,] <- (subpart+t(subpart.smooth))/2

}#for ii
return(list(idx = subpart.index,
val = (subpart+t(subpart.smooth))/2))
}, future.seed = TRUE, future.packages = pkgs_to_load)

for (update in smooth_updates) {
mat.smooth[update$idx, ] <- update$val
}

return(list(Y.list = Y.list,
coor.list = coor.list,
Expand All @@ -868,6 +901,8 @@ Horizontal.Integration.Assemble <- function(

}#else

future::plan(sequential) # Explicitly close multisession workers by switching plan

}#Horizontal.Integration.Assemble


Expand Down