Commit a17cbe8b authored by Valentina Galata's avatar Valentina Galata
Browse files

changed structure to incl metat: updated prepare_input and preprocessing, other minor changes

parent d305f372
......@@ -3,7 +3,7 @@
# Pipeline steps
# steps: ["assembly_annotation", "mapping", "metaT", "mmseq", "binning", "taxonomy", "analysis"]
steps: ["assembly"]
steps: ["preprocessing"]
# Analysis sub-steps
analysis_steps: ["cdhit", "mappability", "crispr", "plasmids", "amr"]
......@@ -23,20 +23,23 @@ results_dir: "results"
# Data paths: Use absolute paths or paths relative to the working directory !!!
data:
# MetaT data: R1 and R2
# Meta-genomics
metag:
sr:
r1: "data/raw/short_reads/ONT3_MG_xx_Rashi_S11_R1_001.fastq.gz"
r2: "data/raw/short_reads/ONT3_MG_xx_Rashi_S11_R2_001.fastq.gz"
ont:
# List of directories containing FAST5 files
dirs: ["data/multifast5"]
# List of FAST5 files
files: []
# Meta-transcriptomics
metat:
r1: "data/metaT/FastSelectFull1_MT_Rashi_S14_R1_001.fastq.gz"
r2: "data/metaT/FastSelectFull1_MT_Rashi_S14_R2_001.fastq.gz"
# Short reads data: R1 and R2
sr:
r1: "data/raw/short_reads/ONT3_MG_xx_Rashi_S11_R1_001.fastq.gz"
r2: "data/raw/short_reads/ONT3_MG_xx_Rashi_S11_R2_001.fastq.gz"
# ONT data
ont:
# List of directories containing FAST5 files
dirs: ["data/multifast5"]
# List of FAST5 files
files: []
sr:
r1: "data/metaT/FastSelectFull1_MT_Rashi_S14_R1_001.fastq.gz"
r2: "data/metaT/FastSelectFull1_MT_Rashi_S14_R2_001.fastq.gz"
# Meta-proteomics
metap:
# binning_samples: ["flye", "megahit", "bwa_sr_metaspades_hybrid", "bwa_lr_metaspades_hybrid", "bwa_merged_metaspades_hybrid", "mmi_sr_metaspades_hybrid", "mmi_lr_metaspades_hybrid", "mmi_merged_metaspades_hybrid"]
......@@ -110,7 +113,7 @@ samtools:
# Polishing
medaka:
threads: 30
threads: 10
model: r941_min_high # the MinION model, high accuarcy
racon:
......@@ -212,12 +215,14 @@ racon:
# rbh: "/home/users/sbusi/apps/mmseqs/bin/mmseqs rbh"
# convertalis: "/home/users/sbusi/apps/mmseqs/bin/mmseqs convertalis"
# # CRISPR
# CASC:
# PATH: "$PATH:/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/bin"
# PERL5LIB: "/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/lib/site_perl"
# minced:
# PATH: "$PATH:/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/minced/"
# CRISPR
casc:
threads: 10
path: "$PATH:/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/bin"
perl5lib: "/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/lib/site_perl"
minced:
path: "$PATH:/mnt/lscratch/users/sbusi/ONT/cedric_ont_basecalling/2019_GDB/crispr/minced/"
# # Plasmid prediction
# plasflow:
......
......@@ -19,7 +19,9 @@
ONTP_ENV="ONT_pilot"
# number of cores for snakemake
ONTP_CORES=30
# IMP config file
# snakemake file
ONTP_SMK="workflow/Snakefile"
# config file
ONTP_CONFIG="config/config.yaml" # USER INPUT REQUIRED
# slurm config file
ONTP_SLURM="config/slurm.yaml"
......@@ -33,6 +35,6 @@ ONTP_CLUSTER="-p {cluster.partition} -q {cluster.qos} {cluster.explicit} -N {clu
conda activate ${ONTP_ENV}
# run the pipeline
snakemake -s Snakefile -rp --cores ${ONTP_CORES} --configfile ${ONTP_CONFIG} \
snakemake -s ${ONTP_SMK} -rp --cores ${ONTP_CORES} --configfile ${ONTP_CONFIG} \
--use-conda --conda-prefix ${CONDA_PREFIX}/pipeline \
--cluster-config ${ONTP_SLURM} --cluster "sbatch ${ONTP_CLUSTER}"
......@@ -6,14 +6,16 @@ import os
from pathlib import Path
from tempfile import TemporaryDirectory
from scripts.utils import find_fast5
##############################
# CONFIG
# can be overwritten by using --configfile <path to config> when calling snakemake
# configfile: "config/config.yaml"
# Paths
SRC_DIR = srcdir("workflow/scripts")
ENV_DIR = srcdir("workflow/envs")
SRC_DIR = srcdir("scripts")
ENV_DIR = srcdir("envs")
# default executable for snakmake
shell.executable("bash")
......@@ -25,6 +27,8 @@ workdir:
##############################
# DATA & TOOLS
# TODO: config validation
# DATA_DIR = config["data_dir"]
RESULTS_DIR = config["results_dir"]
DB_DIR = config["db_dir"]
......@@ -34,29 +38,14 @@ STEPS = config['steps']
ANALYSIS_STEPS = config["analysis_steps"]
# Input
# ONT files (i.e. FAST5 files): from given directories and files
INPUT_FAST5 = []
INPUT_FAST5 += [os.path.abspath(f) for f in config["data"]["ont"]["files"]]
for ont_d in config["data"]["ont"]["dirs"]:
INPUT_FAST5 += [os.path.abspath(f) for f in Path(ont_d).rglob('*.fast5')]
# INPUT_FAST5 = INPUT_FAST5[0:10] # NOTE TEST
# SR files
INPUT_SR = config["data"]["sr"]
# MetaT files
INPUT_METAT = config["data"]["metat"]
# Data from input
DATA_FAST5 = [os.path.join(RESULTS_DIR, "input_ont", os.path.basename(f)) for f in INPUT_FAST5]
assert len(DATA_FAST5) == len(set(DATA_FAST5)), "Created link names for FAST5 files are NOT unique: {}".format(DATA_FAST5)
DATA_SR = {
"r1": os.path.join(RESULTS_DIR, "input_sr/R1.fq.gz"),
"r2": os.path.join(RESULTS_DIR, "input_sr/R2.fq.gz")
}
DATA_METAT = {
"r1": os.path.join(RESULTS_DIR, "input_metat/R1.fq.gz"),
"r2": os.path.join(RESULTS_DIR, "input_metat/R2.fq.gz")
}
INPUT_G_FAST5 = find_fast5(config["data"]["metag"]["ont"]["files"], config["data"]["metag"]["ont"]["dirs"]) # TODO: consider a different approach
INPUT_G_SR = list(config["data"]["metag"]["sr"].values())
INPUT_T_SR = []
if config["data"]["metat"]["sr"]["r1"] and config["data"]["metat"]["sr"]["r2"]:
INPUT_T_SR = list(config["data"]["metat"]["sr"].values())
# Assemblers and read types
META_TYPES = ["metag", "metat"] if INPUT_T_SR else ["metag"]
READ_TYPES = list(config["assemblers"].keys()) # list of read types
ASSEMBLERS = [y for x in config["assemblers"].values() for y in x] # list of all assemblers
READ_ASSEMBLERS = [y for x in [[(k, vv) for vv in v] for k, v in config["assemblers"].items()] for y in x] # list of (read type, assembler)
......@@ -66,40 +55,52 @@ BWA_IDX_EXT = ["amb", "ann", "bwt", "pac", "sa"]
##############################
# TARGETS & RULES
# List of targets to be created
# Links to input files
INPUT_LINKS = expand(
os.path.join(RESULTS_DIR, "input/metag/ont/{fast5}"),
fast5=[os.path.basename(f) for f in INPUT_G_FAST5]
)
INPUT_LINKS += expand(
os.path.join(RESULTS_DIR, "input/{mtype}/sr/{rtype}.fq.gz"),
mtype=META_TYPES,
rtype=["R1", "R2"]
)
# List of (main) targets to be created
TARGETS = []
# Include rules and add targets based on the config file
# Prepare input files
include:
"workflow/steps/prepare_input.smk"
"steps/prepare_input.smk"
# TARGETS.append("status/prepare_input.done")
# Preprocessing
if "preprocessing" in STEPS:
include:
"workflow/steps/preprocessing.smk"
"steps/preprocessing.smk"
TARGETS += [
"status/preprocessing_lr.done",
"status/preprocessing_sr.done"
]
# Assembly
if "assembly" in STEPS:
include:
"workflow/steps/assembly.smk"
TARGETS += [
"status/assembly.done",
# "status/polishing.done",
"status/mapping.done",
"status/coverage.done"
]
# # Assembly
# if "assembly" in STEPS:
# include:
# "steps/assembly.smk"
# TARGETS += [
# "status/assembly.done",
# # "status/polishing.done",
# "status/mapping.done",
# "status/coverage.done"
# ]
# # Assembly annotation
# if 'assembly_annotation' in STEPS:
# include:
# "workflow/steps/assembly_annotation.smk"
# "steps/assembly_annotation.smk"
# TARGETS += [
# "assemble_and_coverage.done",
# "annotate.done",
......@@ -111,7 +112,7 @@ if "assembly" in STEPS:
# if 'checkpoint_assembly_annotation' in STEPS:
# include:
# "workflow/steps/checkpoint_assembly_annotation.smk"
# "steps/checkpoint_assembly_annotation.smk"
# TARGETS += [
# "assemble_and_coverage.done",
# "annotate.done",
......@@ -126,7 +127,7 @@ if "assembly" in STEPS:
# # MMSEQ2
# if 'mmseq' in STEPS:
# include:
# "workflow/steps/mmseq.smk"
# "steps/mmseq.smk"
# TARGETS += [
# "mmseq_comparison_for_ont.done"
# ]
......@@ -134,31 +135,31 @@ if "assembly" in STEPS:
# # MetaT
# if 'metaT' in STEPS:
# include:
# "workflow/steps/metat.smk"
# "steps/metat.smk"
# TARGETS += ["metaT_mapping_for_ONT.done"]
# # Mapping
# if 'mapping' in STEPS:
# include:
# "workflow/steps/mapping.smk"
# "steps/mapping.smk"
# TARGETS += ["mapping_for_binning.done"]
# # Binning
# if 'binning' in STEPS:
# include:
# "workflow/steps/binning.smk"
# "steps/binning.smk"
# TARGETS += ["binning_for_ont.done"]
# # Taxonomy
# if 'taxonomy' in STEPS:
# include:
# "workflow/steps/taxonomy.smk"
# "steps/taxonomy.smk"
# TARGETS += ["taxonomy_for_ont.done"]
# # Analysis
# if 'analysis' in STEPS:
# include:
# "workflow/steps/analysis.smk"
# "steps/analysis.smk"
# # CD-HIT
# if "cdhit" in ANALYSIS_STEPS:
# TARGETS.append("cdhit_analysis.done")
......
......@@ -4,9 +4,9 @@ localrules: link_input_files
rule link_input_files:
input:
INPUT_FAST5 + list(INPUT_SR.values()) + list(INPUT_METAT.values())
INPUT_G_FAST5 + INPUT_G_SR + INPUT_T_SR
output:
DATA_FAST5 + list(DATA_SR.values()) + list(DATA_METAT.values())
INPUT_LINKS
message:
"Link input files"
run:
......
# Preprocessing
##################################################
# Short reads
# Preprocess the short reads using fastp
rule fastp_sr:
input:
r1=os.path.join(RESULTS_DIR, "input/{mtype}/sr/R1.fq.gz"),
r2=os.path.join(RESULTS_DIR, "input/{mtype}/sr/R2.fq.gz")
output:
r1=os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/R1.fastp.fastq.gz"),
r2=os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/R2.fastp.fastq.gz"),
html=os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/fastp.html"),
json=os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/fastp.json")
log:
out="logs/fastp.{mtype}.sr.out.log",
err="logs/fastp.{mtype}.sr.err.log"
wildcard_constraints:
mtype="metag|metat"
threads:
config["fastp"]["threads"]
conda:
os.path.join(ENV_DIR, "fastp.yaml")
message:
"Preprocessing short reads: FastP"
shell:
"(date && fastp -l {config[fastp][min_length]} -i {input.r1} -I {input.r2} -o {output.r1} -O {output.r2} -h {output.html} -j {output.json} -w {threads} && date) 2> {log.err} > {log.out}"
rule fastqc_fastp_sr:
input:
os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/{rid}.fastp.fastq.gz")
output:
html=os.path.join(RESULTS_DIR, "qc/{mtype}/sr/{rid}.fastp_fastqc.html"),
zip=os.path.join(RESULTS_DIR, "qc/{mtype}/sr/{rid}.fastp_fastqc.zip")
log:
out="logs/fastqc.{mtype}.sr.{rid}.out.log",
err="logs/fastqc.{mtype}.sr.{rid}.err.log"
wildcard_constraints:
mtype="metag|metat",
rid="|".join(["R1", "R2"])
threads:
config["fastqc"]["threads"]
conda:
os.path.join(ENV_DIR, "fastqc.yaml")
message:
"Preprocessing short reads: FastQC"
shell:
"(date && fastqc {config[fastqc][params]} -t {threads} -o $(dirname {output.html}) {input} && date) 2> {log.err} > {log.out}"
##################################################
# Long reads
# Basecalling
checkpoint guppy_gpu_basecalling:
input:
expand(os.path.join(RESULTS_DIR, "input/metag/ont/{fast5}"), fast5=[os.path.basename(f) for f in INPUT_G_FAST5])
output:
directory(os.path.join(RESULTS_DIR, "preproc/metag/lr/checkpoints"))
log:
out="logs/guppy.metag.lr.out.log",
err="logs/guppy.metag.lr.err.log"
threads:
config["guppy"]["gpu"]["threads"]
message:
"Preprocessing long reads: Basecalling w/ Guppy"
shell:
"""
(date && \
{config[guppy][gpu][bin]} --input_path $(dirname {input[0]}) --save_path {output} \
--config {config[guppy][config]} \
--disable_pings --compress_fastq \
--cpu_threads_per_caller {threads} \
-x {config[guppy][gpu][gpu_device]} \
--records_per_fastq {config[guppy][gpu][records_per_fastq]} \
--chunk_size {config[guppy][gpu][chunk_size]} \
--chunks_per_runner {config[guppy][gpu][chunks_per_runner]} \
--gpu_runners_per_device {config[guppy][gpu][runners_per_device]} \
--num_callers {config[guppy][gpu][num_callers]} && \
date) 2>> {log.err} >> {log.out}
"""
def aggregate_guppy_basecalling(wildcards):
checkpoint_output = checkpoints.guppy_gpu_basecalling.get(**wildcards).output[0]
return expand(
os.path.join(RESULTS_DIR, "preproc/metag/lr/checkpoints/fastq_runid_{runid_i_j}.fastq.gz"),
runid_i_j=glob_wildcards(os.path.join(checkpoint_output, "fastq_runid_{runid_i_j}.fastq.gz")).runid_i_j,
)
rule merge_guppy_basecalling:
input:
aggregate_guppy_basecalling
output:
os.path.join(RESULTS_DIR, "preproc/metag/lr/lr.fastq.gz")
message:
"Preprocessing long reads: Cat FASTQ"
shell:
"cat $(echo \"{input}\" | sort) > {output}"
# QC
rule nanostat_guppy_basecalling:
input:
os.path.join(RESULTS_DIR, "preproc/metag/lr/lr.fastq.gz")
output:
os.path.join(RESULTS_DIR, "qc/metag/lr/NanoStats.txt")
log:
out="logs/nanostats.metag.lr.out.log",
err="logs/nanostats.metag.lr.err.log"
conda:
os.path.join(ENV_DIR, "nanostat.yaml")
message:
"Preprocessing long reads: NanoStats"
shell:
"(date && NanoStat --fastq {input} --outdir $(dirname {output}) -n $(basename {output}) && date) 2> {log.err} > {log.out}"
#!/usr/bin/env python
def find_fast5(file_list=[], dir_list=[]):
import os
from pathlib import Path
# list of files
fast5_files = []
# files from list of files
fast5_files += [os.path.abspath(f) for f in file_list]
# files from list of directories
for ont_d in dir_list:
fast5_files += [os.path.abspath(f) for f in Path(ont_d).rglob('*.fast5')]
# basenames should be unique
assert len(fast5_files) == len(set([os.path.basename(f) for f in fast5_files])), "Basenames of FAST5 files are NOT unique: {}".format(fast5_files)
return fast5_files
\ No newline at end of file
......@@ -5,7 +5,7 @@ include:
rule PREPARE_INPUT:
input:
DATA_FAST5 + list(DATA_SR.values()) + list(DATA_METAT.values())
INPUT_LINKS
output:
"status/prepare_input.done"
shell:
......
# Preprocessing reads before de novo assembly
include:
'../rules/preprocessing_lr.smk'
include:
'../rules/preprocessing_sr.smk'
'../rules/preprocessing.smk'
# NOTE: Using "shell: touch ..." to avoid the rule from being autodetected as `localrule`.
# This is needed so that an email can be sent upon event changes for this rule.
rule PREPROCESSING_LR:
input:
basecalling=os.path.join(RESULTS_DIR, "preproc/lr/lr.fastq.gz"),
nanostats=os.path.join(RESULTS_DIR, "qc/lr/NanoStats.txt")
basecalling=os.path.join(RESULTS_DIR, "preproc/metag/lr/lr.fastq.gz"),
nanostats=os.path.join(RESULTS_DIR, "qc/metag/lr/NanoStats.txt")
output:
"status/preprocessing_lr.done"
shell:
......@@ -19,8 +17,8 @@ rule PREPROCESSING_LR:
rule PREPROCESSING_SR:
input:
fastp=expand(os.path.join(RESULTS_DIR, "preproc/sr/{rid}.fastp.fastq.gz"), rid=["R1", "R2"]),
fastqc=expand(os.path.join(RESULTS_DIR, "qc/sr/{rid}.fastp_{ext}"), rid=["R1", "R2"], ext=["fastqc.html", "fastqc.zip"])
fastp=expand(os.path.join(RESULTS_DIR, "preproc/{mtype}/sr/{rid}.fastp.fastq.gz"), mtype=META_TYPES, rid=["R1", "R2"]),
fastqc=expand(os.path.join(RESULTS_DIR, "qc/{mtype}/sr/{rid}.fastp_{ext}"), mtype=META_TYPES, rid=["R1", "R2"], ext=["fastqc.html", "fastqc.zip"])
output:
"status/preprocessing_sr.done"
shell:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment