Skip to content
Snippets Groups Projects
Commit 58778aee authored by Todor Kondic's avatar Todor Kondic
Browse files

Parallel sw works now

The main issue is that I have removed type='FORK' everywhere it
clearly produced issues. I should probably do that in presc, too.
parent 6757657e
No related branches found
No related tags found
No related merge requests found
...@@ -101,8 +101,6 @@ gen_comp_list<-function(src_fn,dest_fn) { ...@@ -101,8 +101,6 @@ gen_comp_list<-function(src_fn,dest_fn) {
##' @return result of RMassBank::loadRmbSettings ##' @return result of RMassBank::loadRmbSettings
##' @author Todor Kondić ##' @author Todor Kondić
gen_stgs_and_load <- function(fn_data,stgs,wd) { gen_stgs_and_load <- function(fn_data,stgs,wd) {
wd <- normalizePath(wd)
fn_data <- normalizePath(fn_data)
stgs<-if (is.character(stgs)) yaml::yaml.load_file(stgs) else stgs stgs<-if (is.character(stgs)) yaml::yaml.load_file(stgs) else stgs
sfn<-file.path(wd,paste(basename(fn_data),".ini",sep='')) sfn<-file.path(wd,paste(basename(fn_data),".ini",sep=''))
mk_sett_file(stgs,sfn) mk_sett_file(stgs,sfn)
...@@ -201,22 +199,19 @@ presc.single <- function(fn_data,stgs_alist,wd,mode,fn_cmpd_l,ppm_lim_fine=10,EI ...@@ -201,22 +199,19 @@ presc.single <- function(fn_data,stgs_alist,wd,mode,fn_cmpd_l,ppm_lim_fine=10,EI
##' @param lastStep The last step in the workflow. Default is eight. ##' @param lastStep The last step in the workflow. Default is eight.
##' @return MsmsWorkspace object. ##' @return MsmsWorkspace object.
##' @author Todor Kondić ##' @author Todor Kondić
single.sw<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8) { sw.single<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8) {
## Generate settings file and load. ## Generate settings file and load.
stgs_alist<-if (is.character(stgs_alist)) yaml::yaml.load_file(stgs_alist) else stgs_alist no_drama_mkdir(wd)
sfn<-file.path(wd,paste(fn_data,".ini",sep='')) wd <- normalizePath(wd)
mk_sett_file(stgs_alist,sfn) gen_stgs_and_load(fn_data,stgs_alist,wd)
RMassBank::loadRmbSettings(sfn)
## Generate and load the compound list. ## Generate and load the compound list.
fn_comp<-file.path(wd,paste(fn_data,".comp.csv",sep='')) x <- gen_cmpdl_and_load(fn_data,wd,fn_cmpd_list)
n_cmpd<-gen_comp_list(fn_cmpd_list,fn_comp) fn_comp <- x$fn_cmpdl
RMassBank::loadList(fn_comp) n_cmpd <- x$n
## Generate file table. ## Generate file table.
df_table<-data.frame(Files=rep(fn_data,n_cmpd),ID=1:n_cmpd) fn_table <- gen_file_table(fn_data,n_cmpd,wd)
fn_table<-file.path(wd,paste("fn-table.",fn_data,".csv",sep=''))
write.csv(x=df_table,file=fn_table,row.names=F)
## Make empty workspace. ## Make empty workspace.
w <- RMassBank::newMsmsWorkspace() w <- RMassBank::newMsmsWorkspace()
...@@ -224,11 +219,70 @@ single.sw<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",arc ...@@ -224,11 +219,70 @@ single.sw<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",arc
message(paste("Reading in file:",fn_data)) message(paste("Reading in file:",fn_data))
w <-RMassBank::msmsRead(w,filetable=fn_table,readMethod="mzR",mode=mode) w <-RMassBank::msmsRead(w,filetable=fn_table,readMethod="mzR",mode=mode)
archdir<-file.path(wd,archdir) archdir<-file.path(wd,archdir)
if (!dir.exists(archdir)) dir.create(archdir) no_drama_mkdir(archdir)
fn_arch<-file.path(archdir,paste(fn_data,".archive",sep='')) fn_arch<-file.path(archdir,paste(basename(fn_data),".archive",sep=''))
RMassBank::msmsWorkflow(w, mode=mode, steps=2:lastStep,archivename=fn_arch) RMassBank::msmsWorkflow(w, mode=mode, steps=2:lastStep,archivename=fn_arch)
} }
##' Runs the first step of the compound mixture workflow on a single mzML file.
##'
##' @title RMassBank Spectral Workflow on a Single Compound Mixture (step 1)
##' @param fn_data A mzML data file.
##' @param stgs_alist RMassBank settings. It can either be a named
##' list of settings, or a filename of a YAML file.
##' @param wd The name of the work directory.
##' @param fn_cmpd_list The file name of he compound list
##' corresponding to `fn_data`.
##' @param mode Modes as described in the standard workflow vignette
##' of RMassBank.
##' @param readMethod Default read method is "mzR". Consult the
##' documentation of `msmsRead` for details.
##' @return MsmsWorkspace object.
##' @author Todor Kondić
sw.single.1 <- function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,
readMethod="mzR") {
## Generate settings file and load.
no_drama_mkdir(wd)
wd <- normalizePath(wd)
gen_stgs_and_load(fn_data,stgs_alist,wd)
## Generate and load the compound list.
x <- gen_cmpdl_and_load(fn_data,wd,fn_cmpd_list)
fn_comp <- x$fn_cmpdl
n_cmpd <- x$n
## Generate file table.
fn_table <- gen_file_table(fn_data,n_cmpd,wd)
## Make empty workspace.
w <- RMassBank::newMsmsWorkspace()
## Run the workflow.
message(paste("Reading in file:",fn_data))
RMassBank::msmsRead(w,filetable=fn_table,readMethod="mzR",mode=mode)}
##' Runs steps after sw.single.1 of compound mixture workflow on a
##' single mzML file.
##'
##' @title RMassBank Spectral Workflow on a Single Compound Mixture (after step 1)
##' @param w The msmsWorkspace object that was processed by
##' sw.single.1.
##' @param wd The current working dir.
##' @param archdir The directory to store R objects created during
##' workflow execution.
##' @param lastStep The last step in the workflow. Default is eight.
##' @return MsmsWorkspace object.
##' @author Todor Kondić
sw.single.next<-function(w,wd,archdir="archive",lastStep=8) {
archdir<-file.path(wd,archdir)
no_drama_mkdir(archdir)
fn_arch<-file.path(archdir,paste(wd,".archive",sep=''))
RMassBank::loadRmbSettings(file.path(wd,basename(wd),".mzML.ini"))
RMassBank::loadList(file.path(wd,basename(wd),".mzML.comp.csv"))
RMassBank::msmsWorkflow(w, mode=mode, steps=2:lastStep,archivename=fn_arch)
}
##' Prepare single mbWorkspace object based on the workspace, the ##' Prepare single mbWorkspace object based on the workspace, the
##' infolist name and RMassBank settings. ##' infolist name and RMassBank settings.
...@@ -327,7 +381,7 @@ presc.v<-function(fn_data,fn_cmpd_l,mode,ppm_lim_fine=10,EIC_limit=0.001) { ...@@ -327,7 +381,7 @@ presc.v<-function(fn_data,fn_cmpd_l,mode,ppm_lim_fine=10,EIC_limit=0.001) {
##' @author Todor Kondić ##' @author Todor Kondić
v<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8,combine=F) { v<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8,combine=F) {
idir<-function(n) file.path(".",stripext(n)) idir<-function(n) file.path(".",stripext(n))
f<-Vectorize(single.sw,vectorize.args=c("wd","fn_data","stgs_alist"),SIMPLIFY=F) f<-Vectorize(sw.single,vectorize.args=c("wd","fn_data","stgs_alist"),SIMPLIFY=F)
rootdir <- getwd() rootdir <- getwd()
if (combine) { if (combine) {
z<-f(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod=readMethod,archdir=archdir,lastStep=7) z<-f(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod=readMethod,archdir=archdir,lastStep=7)
...@@ -560,6 +614,7 @@ presc.plot <- function(wd,out="prescreen.pdf",pal="Dark2",cex=0.75,digits=6) { ...@@ -560,6 +614,7 @@ presc.plot <- function(wd,out="prescreen.pdf",pal="Dark2",cex=0.75,digits=6) {
##' ##'
##' ##'
##' @title Parallel Spectral Workflow. ##' @title Parallel Spectral Workflow.
##' @param cl Cluster.
##' @param fn_data A sequence of mzML input files. ##' @param fn_data A sequence of mzML input files.
##' @param stgs_alist A list of named list of settings, or a list of ##' @param stgs_alist A list of named list of settings, or a list of
##' filenames of YAML files containing the settings. ##' filenames of YAML files containing the settings.
...@@ -571,37 +626,33 @@ presc.plot <- function(wd,out="prescreen.pdf",pal="Dark2",cex=0.75,digits=6) { ...@@ -571,37 +626,33 @@ presc.plot <- function(wd,out="prescreen.pdf",pal="Dark2",cex=0.75,digits=6) {
##' @param lastStep The last step in spectral workflow. ##' @param lastStep The last step in spectral workflow.
##' @param combine If TRUE, use combineMultiplicies to merge ##' @param combine If TRUE, use combineMultiplicies to merge
##' workspaces corresponding to different collisional energies. ##' workspaces corresponding to different collisional energies.
##' @param cl Cluster. ##' @param combdest Combine destination directory.
##' @return A named list of spectral workspaces. The names are derived ##' @return A named list of spectral workspaces. The names are derived
##' from data filenames. ##' from data filenames.
##' @author Todor Kondić ##' @author Todor Kondić
p.sw<-function(fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8,combine=F,cl=NULL) { p.sw<-function(cl,fn_data,stgs_alist,wd,fn_cmpd_list,mode,readMethod="mzR",archdir="archive",lastStep=8,combine=F,combdest="combined") {
idir<-function(n) file.path(".",stripext(n)) idir<-function(n) file.path(".",stripext(n))
fnocomb<-function(fn,stgs,wd) { fnocomb<-function(fn,stgs,wd) {
single.sw(fn,stgs,wd,fn_cmpd_list,mode,readMethod,archdir,lastStep=lastStep) sw.single(fn,stgs,wd,fn_cmpd_list,mode,readMethod,archdir,lastStep=lastStep)
}
fcomb<-function(fn,stgs,wd) {
single.sw(fn,stgs,wd,fn_cmpd_list,mode,readMethod,archdir,lastStep=7)
} }
fcomb <- function(fn,stgs,wd) sw.single(fn,stgs,wd,fn_cmpd_list,mode,readMethod,archdir,lastStep=7)
if (combine) { if (combine) {
rootdir <- getwd()
z<-parallel::clusterMap(cl,fcomb,fn_data,stgs_alist,wd) z<-parallel::clusterMap(cl,fcomb,fn_data,stgs_alist,wd)
names(z)<-basename(fn_data) names(z)<-basename(fn_data)
zz<-RMassBank::combineMultiplicities(z) zz<-RMassBank::combineMultiplicities(z)
archdir<-file.path(combdest,archdir)
combdir<-"combined" no_drama_mkdir(combdest)
archdir<-file.path(rootdir,combdir,archdir)
no_drama_mkdir(combdir)
no_drama_mkdir(archdir) no_drama_mkdir(archdir)
fn_arch<-file.path(archdir,"archive") fn_arch<-file.path(archdir,"archive")
fn_comb_stgs <- file.path(rootdir,combdir,paste(combdir,".mzML.ini",sep='')) fn_comb_stgs <- file.path(combdest,paste(basename(combdest),".mzML.ini",sep=''))
ddirs <- sapply(names(z),idir) ## ddirs <- sapply(fn_data,idir)
stgs_fls <- sapply(ddirs,function(x) file.path(x,paste(x,".mzML.ini",sep=''))) ## stgs_fls <- sapply(ddirs,function(x) file.path(x,paste(basename(x),".mzML.ini",sep='')))
mk_combine_file(stgs_fls,fn_comb_stgs) mk_combine_file(stgs_alist,fn_comb_stgs)
anycpdlist <- file.path(wd[[1]],paste(basename(wd[[1]]),".mzML.comp.csv",sep=''))
RMassBank::loadRmbSettings(fn_comb_stgs)
RMassBank::loadList(anycpdlist)
res<-list(RMassBank::msmsWorkflow(zz, steps=8, mode=mode, archivename = fn_arch)) res<-list(RMassBank::msmsWorkflow(zz, steps=8, mode=mode, archivename = fn_arch))
names(res)<-paste(combdir,".yml",sep='') #Clearly a hack. names(res)<-paste(basename(combdest),".ini",sep='') #Clearly a hack.
res res
} else { } else {
z<-parallel::clusterMap(cl,fnocomb,fn_data,stgs_alist,wd) z<-parallel::clusterMap(cl,fnocomb,fn_data,stgs_alist,wd)
......
...@@ -53,7 +53,7 @@ presc.do<-function(fn_data,fn_cmpd_list,mode,proc=F) { ...@@ -53,7 +53,7 @@ presc.do<-function(fn_data,fn_cmpd_list,mode,proc=F) {
##' @param fn_data List of mzML data filenames to be processed. ##' @param fn_data List of mzML data filenames to be processed.
##' @param fn_cmpd_list Compound list. ##' @param fn_cmpd_list Compound list.
##' @param mode as in msmsRead. ##' @param mode as in msmsRead.
##' @param rdir The root data directory. ##' @param dest The destination data directory.
##' @param combine If TRUE, use combineMultiplicies to merge ##' @param combine If TRUE, use combineMultiplicies to merge
##' workspaces corresponding to different collisional energies. ##' workspaces corresponding to different collisional energies.
##' @param proc Split work between this amount of processes. If FALSE ##' @param proc Split work between this amount of processes. If FALSE
...@@ -61,15 +61,17 @@ presc.do<-function(fn_data,fn_cmpd_list,mode,proc=F) { ...@@ -61,15 +61,17 @@ presc.do<-function(fn_data,fn_cmpd_list,mode,proc=F) {
##' @return A named list of msmsWorkspace objects. ##' @return A named list of msmsWorkspace objects.
##' @author Todor Kondić ##' @author Todor Kondić
##' @export ##' @export
sw.do<-function(fn_data,fn_cmpd_list,mode,rdir=".",combine=F,proc=F) { sw.do<-function(fn_data,fn_cmpd_list,mode,dest=".",combine=F,proc=F) {
no_drama_mkdir(rdir) dest <- normalizePath(dest)
wdirs<-sapply(basename(fn_data),function(nm) file.path(rdir,stripext(nm))) no_drama_mkdir(dest)
sapply(wdirs,no_drama_mkdir) fn_data <- normalizePath(fn_data)
stgs<-sapply(basename(wdirs),function (nm) paste(nm,"yml",sep='.')) wdirs<-sapply(basename(fn_data),function(nm) file.path(dest,stripext(nm)))
stgs<-sapply(fn_data,function (nm) file.path(paste(stripext(nm),"ini",sep='.')))
if (proc) { if (proc) {
cl<-parallel::makeCluster(proc,type='FORK') cl<-parallel::makeCluster(proc)
p.sw(fn_data,stgs,wdirs,fn_cmpd_list,mode,combine=combine,cl=cl) parallel::clusterEvalQ(cl,library("rmbmix"))
p.sw(cl,fn_data,stgs,wdirs,fn_cmpd_list,mode,combine=combine)
} else { } else {
v(fn_data,stgs,wdirs,fn_cmpd_list,mode,combine=combine) v(fn_data,stgs,wdirs,fn_cmpd_list,mode,combine=combine)
} }
...@@ -116,7 +118,8 @@ mb.do<-function(mb,rdir=".",proc=F) { ...@@ -116,7 +118,8 @@ mb.do<-function(mb,rdir=".",proc=F) {
fn_stgs<-sapply(names(mb),function(n) file.path(idir(n),attch(n,'.ini'))) fn_stgs<-sapply(names(mb),function(n) file.path(idir(n),attch(n,'.ini')))
if (proc) { if (proc) {
cl<-parallel::makeCluster(proc,type='FORK') cl<-parallel::makeCluster(proc)
parallel::clusterEvalQ(cl,library("rmbmix"))
mb.p(mb,infodir,fn <- stgs,cl=cl) mb.p(mb,infodir,fn <- stgs,cl=cl)
} else { } else {
mb.v(mb,infodir,fn_stgs) mb.v(mb,infodir,fn_stgs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment