From da3af9f680b398b97117a5bcb4a59456425168d7 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Fri, 9 Aug 2024 13:09:54 +0200 Subject: [PATCH 01/17] support for threading --- CADD.sh | 2 + Snakefile | 177 +++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 131 insertions(+), 48 deletions(-) diff --git a/CADD.sh b/CADD.sh index 5401eed..3cafc0a 100755 --- a/CADD.sh +++ b/CADD.sh @@ -145,7 +145,9 @@ fi echo "Running snakemake pipeline:" +# resources is added to divide total load wrt number of cores command="snakemake $TMP_OUTFILE \ + --resources load=100 \ --sdm conda $SIGNULARITYARGS --conda-prefix $CADD/envs/conda \ --cores $CORES --configfile $CONFIG \ --snakefile $CADD/Snakefile $VERBOSE" diff --git a/Snakefile b/Snakefile index d53587e..88dd38c 100644 --- a/Snakefile +++ b/Snakefile @@ -6,6 +6,8 @@ Note that we are declaring many files temporary here that should be located in a # container with conda environments containerized: "docker://visze/cadd-scripts-v1_7:0.1.0" +# need glob to get chunked files +import psutil # Min version of snakemake from snakemake.utils import min_version @@ -21,12 +23,26 @@ validate(config, schema="schemas/config_schema.yaml") import os + +# esm takes ~16 Gb of memory per tread. limit threading to 0.8*RAM / 16 threads ; with a minimum of 1 thread. +config['esm_slots'] = int((0.8 * (psutil.virtual_memory().total / (1024 ** 3)))/16) +if config['esm_slots'] < 1: + config['esm_slots'] = 1 +config['esm_load'] = max(20,int(100/config['esm_slots'])) # 4GB of GPU ram per thread.. +config['vep_load'] = int(config['esm_load']/5) # up to 4Gb/ram +config['regseq_load'] = int(config['esm_load']/4) # +config['mms_load'] = int(config['esm_load']) # up to 16Gb/ram +config['anno_load'] = int(config['esm_load']/10) # disk IO intensive +## allowed scattering +scattergather: + split=workflow.cores, + envvars: "CADD", -# wildcard_constraints: -# file=".*(?> {log} + cat {input.vcf} \ | python {params.cadd}/src/scripts/VCF2vepVCF.py \ + | grep -v '^#' \ + | sed 's/^chr//' \ | sort -k1,1 -k2,2n -k4,4 -k5,5 \ - | uniq > {output} 2> {log} + | uniq > {wildcards.file}_splits/full.vcf 2> {log} + + # split + LC=$(wc -l {wildcards.file}_splits/full.vcf | cut -f1 -d' ') + LC=$(((LC / {params.threads})+1)) + + split -l $LC --numeric-suffixes=1 --additional-suffix="-of-{params.threads}.prepared.vcf" {wildcards.file}_splits/full.vcf {wildcards.file}_splits/chunk_ 2>> {log} + + rm -f {wildcards.file}_splits/full.vcf + + # strip padding zeros in the file names + for f in {wildcards.file}_splits/chunk_*.prepared.vcf + do + mv -n "$f" "$(echo "$f" | sed -E 's/(chunk_)0*([1-9][0-9]*)(-of-{params.threads}\\.prepared\\.vcf)/\\1\\2\\3/')" + done """ @@ -70,15 +110,18 @@ checkpoint prescore: conda: "envs/environment_minimal.yml" input: - vcf="{file}.prepared.vcf", + vcf="{file}_splits/chunk_{chunk}.prepared.vcf", prescored="%s/%s" % (os.environ["CADD"], config["PrescoredFolder"]), output: - novel=temp("{file}.novel.vcf"), - prescored=temp("{file}.pre.tsv"), + novel="{file}_splits/chunk_{chunk}.novel.vcf", + prescored="{file}_splits/chunk_{chunk}.pre.tsv", log: - "{file}.prescore.log", + "{file}.chunk_{chunk}.prescore.log", params: cadd=os.environ["CADD"], + resources: + # < 1GB of memory + load=1, shell: """ # Prescoring @@ -106,16 +149,19 @@ rule annotation_vep: conda: "envs/vep.yml" input: - vcf="{file}.novel.vcf", + vcf="{file}_splits/chunk_{chunk}.novel.vcf", veppath="%s/%s" % (os.environ["CADD"], config["VEPpath"]), output: - temp("{file}.vep.vcf.gz"), + "{file}_splits/chunk_{chunk}.vep.vcf.gz", log: - "{file}.annotation_vep.log", + "{file}.chunk_{chunk}.annotation_vep.log", params: cadd=os.environ["CADD"], genome_build=config["GenomeBuild"], ensembl_db=config["EnsemblDB"], + resources: + # < 1GB of memory + load=int(config['vep_load']), shell: """ cat {input.vcf} \ @@ -132,7 +178,8 @@ rule annotate_esm: conda: "envs/esm.yml" input: - vcf="{file}.vep.vcf.gz", + #vcf="{file}_splits/chunk_{chunk}.vep.vcf.gz", + vcf="{file}_splits/chunk_{chunk}.vep.vcf.gz", models=expand( "{path}/{model}.pt", path=config["ESMpath"], @@ -140,20 +187,25 @@ rule annotate_esm: ), transcripts="%s/pep.%s.fa" % (config["ESMpath"], config["EnsemblDB"]), output: - missens=temp("{file}.esm_missens.vcf.gz"), - frameshift=temp("{file}.esm_frameshift.vcf.gz"), - final=temp("{file}.esm.vcf.gz"), + missens="{file}_splits/chunk_{chunk}.esm_missens.vcf.gz", + frameshift="{file}_splits/chunk_{chunk}.esm_frameshift.vcf.gz", + final="{file}_splits/chunk_{chunk}.esm.vcf.gz", log: - "{file}.annotate_esm.log", + "{file}.chunk_{chunk}.annotate_esm.log", + resources: + load=int(config['esm_load']), params: cadd=os.environ["CADD"], models=["--model %s " % model for model in config["ESMmodels"]], batch_size=config["ESMbatchsize"], + #header=config["Header"], + shell: """ model_directory=`dirname {input.models[0]}`; model_directory=`dirname $model_directory`; + python {params.cadd}/src/scripts/lib/tools/esmScore/esmScore_missense_av_fast.py \ --input {input.vcf} \ --transcripts {input.transcripts} \ @@ -171,6 +223,8 @@ rule annotate_esm: --transcripts {input.transcripts} \ --model-directory $model_directory {params.models} \ --output {output.final} --batch-size {params.batch_size} &>> {log} + + #rm -f {wildcards.file}.esm_in.vcf.gz """ @@ -178,17 +232,20 @@ rule annotate_regseq: conda: "envs/regulatorySequence.yml" input: - vcf="{file}.esm.vcf.gz", + vcf="{file}_splits/chunk_{chunk}.esm.vcf.gz", reference="%s/%s" % (config["REGSEQpath"], "reference.fa"), genome="%s/%s" % (config["REGSEQpath"], "reference.fa.genome"), model="%s/%s" % (config["REGSEQpath"], "Hyperopt400InclNegatives.json"), weights="%s/%s" % (config["REGSEQpath"], "Hyperopt400InclNegatives.h5"), output: - temp("{file}.regseq.vcf.gz"), + "{file}_splits/chunk_{chunk}.regseq.vcf.gz", log: - "{file}.annotate_regseq.log", + "{file}.chunk_{chunk}.annotate_regseq.log", params: cadd=os.environ["CADD"], + resources: + # roughly 4GB of memory + load=int(config['regseq_load']), shell: """ python {params.cadd}/src/scripts/lib/tools/regulatorySequence/predictVariants.py \ @@ -205,16 +262,18 @@ rule annotate_mmsplice: conda: "envs/mmsplice.yml" input: - vcf="{file}.regseq.vcf.gz", + vcf="{file}_splits/chunk_{chunk}.regseq.vcf.gz", transcripts="%s/homo_sapiens.110.gtf" % config.get("MMSPLICEpath", ""), reference="%s/reference.fa" % config.get("REFERENCEpath", ""), output: - mmsplice=temp("{file}.mmsplice.vcf.gz"), - idx=temp("{file}.regseq.vcf.gz.tbi"), + mmsplice="{file}_splits/chunk_{chunk}.mmsplice.vcf.gz", + idx="{file}_splits/chunk_{chunk}.regseq.vcf.gz.tbi", log: - "{file}.annotate_mmsplice.log", + "{file}.chunk_{chunk}.annotate_mmsplice.log", params: cadd=os.environ["CADD"], + resources: + load=int(config['mms_load']), shell: """ tabix -p vcf {input.vcf} &> {log}; @@ -230,15 +289,17 @@ rule annotation: conda: "envs/environment_minimal.yml" input: - vcf=lambda wc: "{file}.%s.vcf.gz" + vcf=lambda wc: "{file}_splits/chunk_{chunk}.%s.vcf.gz" % ("mmsplice" if config["GenomeBuild"] == "GRCh38" else "regseq"), reference_cfg="%s/%s" % (os.environ["CADD"], config["ReferenceConfig"]), output: - temp("{file}.anno.tsv.gz"), + "{file}_splits/chunk_{chunk}.anno.tsv.gz", log: - "{file}.annotation.log", + "{file}.chunk_{chunk}.annotation.log", params: cadd=os.environ["CADD"], + resources: + load=int(config['anno_load']), shell: """ zcat {input.vcf} \ @@ -252,14 +313,15 @@ rule imputation: conda: "envs/environment_minimal.yml" input: - tsv="{file}.anno.tsv.gz", + tsv="{file}_splits/chunk_{chunk}.anno.tsv.gz", impute_cfg="%s/%s" % (os.environ["CADD"], config["ImputeConfig"]), output: - temp("{file}.csv.gz"), + "{file}_splits/chunk_{chunk}.csv.gz", log: - "{file}.imputation.log", + "{file}.chunk_{chunk}.imputation.log", params: cadd=os.environ["CADD"], + shell: """ zcat {input.tsv} \ @@ -272,18 +334,19 @@ rule score: conda: "envs/environment_minimal.yml" input: - impute="{file}.csv.gz", - anno="{file}.anno.tsv.gz", + impute="{file}_splits/chunk_{chunk}.csv.gz", + anno="{file}_splits/chunk_{chunk}.anno.tsv.gz", conversion_table="%s/%s" % (os.environ["CADD"], config["ConversionTable"]), model_file="%s/%s" % (os.environ["CADD"], config["Model"]), output: - temp("{file}.novel.tsv"), + "{file}_splits/chunk_{chunk}.novel.tsv", log: - "{file}.score.log", + "{file}.chunk_{chunk}.score.log", params: cadd=os.environ["CADD"], use_anno=config["Annotation"], columns=config["Columns"], + shell: """ python {params.cadd}/src/scripts/predictSKmodel.py \ @@ -300,21 +363,39 @@ rule score: """ -def aggregate_input(wildcards): - with checkpoints.prescore.get(file=wildcards.file).output["novel"].open() as f: - output = ["{file}.pre.tsv"] - for line in f: - if line.strip() != "": - output.append("{file}.novel.tsv") - break - return output +# def aggregate_input(wildcards): +# # Find all chunk files for the given wildcard +# chunk_files = glob.glob(f"{wildcards.file}_splits/{wildcards.file}.chunk_*.novel.vcf") +# pre_files = glob.glob(f"{wildcards.file}_splits/{wildcards.file}.chunk_*.pre.tsv") +# +# # Combine the novel and prescore chunk files if not empty +# output = [f for f in chunk_files + pre_files if os.path.getsize(f) > 0] +# if not output: +# # no output : make empty file +# open(f"{wildcards.file}.empty", "w").close() +# output = [f"{wildcards.file}.empty"] +# +# return output + + + +#def aggregate_input(wildcards): +# with checkpoints.prescore.get(file=wildcards.file).output["novel"].open() as f: +# output = ["{file}.pre.tsv"] +# for line in f: +# if line.strip() != "": +# output.append("{file}.novel.tsv") +# break +# return output rule join: conda: "envs/environment_minimal.yml" input: - aggregate_input, + #aggregate_input, + pre=gather.split("{{file}}_splits/chunk_{scatteritem}.pre.tsv"), + scored=gather.split("{{file}}_splits/chunk_{scatteritem}.novel.tsv"), output: "{file,.+(? {output} 2>> {log}; """ -# END Rules +# END Rules \ No newline at end of file From cdd125b917876505f8f275a6000596d7a9fdc6d9 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Fri, 9 Aug 2024 14:15:48 +0200 Subject: [PATCH 02/17] tuning the threading load --- Snakefile | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/Snakefile b/Snakefile index 88dd38c..c91b61a 100644 --- a/Snakefile +++ b/Snakefile @@ -22,17 +22,31 @@ validate(config, schema="schemas/config_schema.yaml") # CADD environment variable import os - - -# esm takes ~16 Gb of memory per tread. limit threading to 0.8*RAM / 16 threads ; with a minimum of 1 thread. -config['esm_slots'] = int((0.8 * (psutil.virtual_memory().total / (1024 ** 3)))/16) +################################### +## RULE SPECFIC THREADING LIMITS ## +################################### +system_memory = psutil.virtual_memory().total / (1024 ** 3) +gpu_memory = 0 +try: + lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits", shell=True).decode('utf-8').splitlines() + # sum over gpu(s) + gpu_memory = int(sum([int(x) for x in lines])/1024) +except Exception as e: + pass + +# esm takes ~16 Gb of system memory per tread & 4 Gb of GPU ram (if available). +config['esm_slots'] = int(min((0.9*system_memory)/16, 0.95*gpu_memory/4)) if config['esm_slots'] < 1: config['esm_slots'] = 1 -config['esm_load'] = max(20,int(100/config['esm_slots'])) # 4GB of GPU ram per thread.. -config['vep_load'] = int(config['esm_load']/5) # up to 4Gb/ram -config['regseq_load'] = int(config['esm_load']/4) # -config['mms_load'] = int(config['esm_load']) # up to 16Gb/ram -config['anno_load'] = int(config['esm_load']/10) # disk IO intensive +config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) +config['vep_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram +config['regseq_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram +config['mms_load'] = if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram +config['anno_load'] = 1 # disk IO intensive +config['impute_load'] = 1 +config['prescore_load'] = 1 +config['score_load'] = 1 + ## allowed scattering scattergather: split=workflow.cores, @@ -121,7 +135,7 @@ checkpoint prescore: cadd=os.environ["CADD"], resources: # < 1GB of memory - load=1, + load=int(config['prescore_load']), shell: """ # Prescoring @@ -321,7 +335,8 @@ rule imputation: "{file}.chunk_{chunk}.imputation.log", params: cadd=os.environ["CADD"], - + resources: + load=int(config['impute_load']), shell: """ zcat {input.tsv} \ @@ -346,7 +361,8 @@ rule score: cadd=os.environ["CADD"], use_anno=config["Annotation"], columns=config["Columns"], - + resources: + load=config['score_load'], shell: """ python {params.cadd}/src/scripts/predictSKmodel.py \ From 1419ef1ff3f236d09ba22195df9cbae7db075c94 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Fri, 9 Aug 2024 14:19:40 +0200 Subject: [PATCH 03/17] tuning the threading load --- Snakefile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index c91b61a..673c52f 100644 --- a/Snakefile +++ b/Snakefile @@ -35,7 +35,10 @@ except Exception as e: pass # esm takes ~16 Gb of system memory per tread & 4 Gb of GPU ram (if available). -config['esm_slots'] = int(min((0.9*system_memory)/16, 0.95*gpu_memory/4)) +if gpu_memory > 0: + config['esm_slots'] = int(min((0.9*system_memory)/16, 0.95*gpu_memory/4)) +else: + config['esm_slots'] = int(0.9*system_memory / 16) if config['esm_slots'] < 1: config['esm_slots'] = 1 config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) From 50cadc19eeeb1a6738488a3f086bb4bc3bd318e6 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Fri, 9 Aug 2024 14:43:53 +0200 Subject: [PATCH 04/17] Print Parallelization summary on launch --- Snakefile | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index 673c52f..baad665 100644 --- a/Snakefile +++ b/Snakefile @@ -44,12 +44,32 @@ if config['esm_slots'] < 1: config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) config['vep_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram config['regseq_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram -config['mms_load'] = if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram +config['mms_load'] = 100 if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram config['anno_load'] = 1 # disk IO intensive config['impute_load'] = 1 config['prescore_load'] = 1 config['score_load'] = 1 +print("Threading Overview",flush=True) +print("##################",flush=True) +print("Assigned cores: {}".format(workflow.cores),flush=True) +print("Available system memory: {}GB".format(int(system_memory)),flush=True) +if gpu_memory > 0: + print("Total GPU memory: {}GB".format(gpu_memory),flush=True) +else: + print("No gpu found",flush=True) +print("Task Parallelization: ",flush=True) +print(" PreScore : {}x".format(min(workflow.cores,int(100/config['prescore_load'])))) +print(" VEP : {}x".format(min(workflow.cores,int(100/config['vep_load'])))) +print(" ESM : {}x (memory/gpu constraints)".format(min(workflow.cores,config['esm_slots']))) +print(" RegSeq : {}x".format(min(workflow.cores,int(100/config['regseq_load'])))) +print(" MMsplice : {}x (memory constraints)".format(min(workflow.cores,int(100/config['mms_load'])))) +print(" Annotate : {}x".format(min(workflow.cores,int(100/config['anno_load'])))) +print(" Impute : {}x".format(min(workflow.cores,int(100/config['impute_load'])))) + + + + ## allowed scattering scattergather: split=workflow.cores, From a9c498794c4e864e71c008a911cc38e55cc9d68d Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Fri, 30 Aug 2024 14:49:20 +0200 Subject: [PATCH 05/17] Provide options to specify the temp folder to use --- CADD.sh | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/CADD.sh b/CADD.sh index 3cafc0a..7b4cea6 100755 --- a/CADD.sh +++ b/CADD.sh @@ -14,6 +14,7 @@ where: -q print basic information about snakemake run -p print full information about the snakemake run -d do not remove temporary directory for debug puroposes + -t specify temporary directory [default: mktemp -d] -c number of cores that snakemake is allowed to use [default: 1] " @@ -30,7 +31,8 @@ SIGNULARITYARGS="" VERBOSE="-q" CORES="1" RM_TMP_DIR=true -while getopts ':ho:g:v:c:amr:qpd' option; do +TMP_FOLDER=$(mktemp -d) +while getopts ':ho:g:v:c:amr:qpdt:' option; do case "$option" in h) echo "$usage" exit @@ -55,6 +57,9 @@ while getopts ':ho:g:v:c:amr:qpd' option; do ;; d) RM_TMP_DIR=false ;; + t) TMP_FOLDER=$OPTARG + RM_TMP_DIR=false + ;; \?) printf "illegal option: -%s\n" "$OPTARG" >&2 echo "$usage" >&2 exit 1 @@ -80,6 +85,7 @@ then exit 1 fi +## check if temp folder was specified with ### Configuring all the paths FILENAME=$(basename $INFILE) @@ -123,7 +129,6 @@ fi # Setup temporary folder that is removed reliably on exit and is outside of # the CADD-scripts directory. -TMP_FOLDER=$(mktemp -d) if [ "$RM_TMP_DIR" = 'true' ] then trap "rm -rf $TMP_FOLDER" ERR EXIT From cc558a6927086242d4594959cb00e7a8aadc4b86 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Sat, 31 Aug 2024 10:40:00 +0200 Subject: [PATCH 06/17] correction to tmp setup --- CADD.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CADD.sh b/CADD.sh index 7b4cea6..4ad3eee 100755 --- a/CADD.sh +++ b/CADD.sh @@ -14,7 +14,7 @@ where: -q print basic information about snakemake run -p print full information about the snakemake run -d do not remove temporary directory for debug puroposes - -t specify temporary directory [default: mktemp -d] + -t specify location for temporary directory [default: /tmp/] -c number of cores that snakemake is allowed to use [default: 1] " @@ -31,7 +31,7 @@ SIGNULARITYARGS="" VERBOSE="-q" CORES="1" RM_TMP_DIR=true -TMP_FOLDER=$(mktemp -d) +TMP_PREFIX="/tmp" while getopts ':ho:g:v:c:amr:qpdt:' option; do case "$option" in h) echo "$usage" @@ -57,8 +57,7 @@ while getopts ':ho:g:v:c:amr:qpdt:' option; do ;; d) RM_TMP_DIR=false ;; - t) TMP_FOLDER=$OPTARG - RM_TMP_DIR=false + t) TMP_PREFIX=$OPTARG ;; \?) printf "illegal option: -%s\n" "$OPTARG" >&2 echo "$usage" >&2 @@ -129,6 +128,7 @@ fi # Setup temporary folder that is removed reliably on exit and is outside of # the CADD-scripts directory. +TMP_FOLDER=$(mktemp -d -p $TMP_PREFIX) if [ "$RM_TMP_DIR" = 'true' ] then trap "rm -rf $TMP_FOLDER" ERR EXIT From 7dcf09ae9281ea1955751051d0362fe55ccb1075 Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Mon, 2 Sep 2024 13:32:55 +0200 Subject: [PATCH 07/17] numpy 2+ gave errors in esm env --- envs/esm.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/envs/esm.yml b/envs/esm.yml index a364f54..ad6ea64 100644 --- a/envs/esm.yml +++ b/envs/esm.yml @@ -6,7 +6,7 @@ channels: - conda-forge - bioconda dependencies: - - numpy + - numpy<2 - python - click - biopython From 8e089ff515c867757d0ab12b482bbbae084ab7ee Mon Sep 17 00:00:00 2001 From: geertvandeweyer Date: Wed, 4 Sep 2024 20:23:37 +0200 Subject: [PATCH 08/17] assign scaled threads to high esm and mms rules to enable multi-cpu usage --- Snakefile | 18 ++++++++++++------ .../tools/esmScore/esmScore_frameshift_av.py | 14 +++++++++++--- .../lib/tools/esmScore/esmScore_inFrame_av.py | 8 +++++++- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/Snakefile b/Snakefile index baad665..e4a6b34 100644 --- a/Snakefile +++ b/Snakefile @@ -28,7 +28,7 @@ import os system_memory = psutil.virtual_memory().total / (1024 ** 3) gpu_memory = 0 try: - lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits", shell=True).decode('utf-8').splitlines() + lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits 2>/dev/null", shell=True).decode('utf-8').splitlines() # sum over gpu(s) gpu_memory = int(sum([int(x) for x in lines])/1024) except Exception as e: @@ -41,10 +41,13 @@ else: config['esm_slots'] = int(0.9*system_memory / 16) if config['esm_slots'] < 1: config['esm_slots'] = 1 -config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) +# then assign other resources +config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) +config['esm_threads'] = int(workflow.cores / config['esm_slots']) config['vep_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram config['regseq_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram config['mms_load'] = 100 if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram +config['mms_threads'] = int(workflow.cores / (100/config['mms_load'])) config['anno_load'] = 1 # disk IO intensive config['impute_load'] = 1 config['prescore_load'] = 1 @@ -61,9 +64,9 @@ else: print("Task Parallelization: ",flush=True) print(" PreScore : {}x".format(min(workflow.cores,int(100/config['prescore_load'])))) print(" VEP : {}x".format(min(workflow.cores,int(100/config['vep_load'])))) -print(" ESM : {}x (memory/gpu constraints)".format(min(workflow.cores,config['esm_slots']))) +print(" ESM : {}x with {} threads each (memory/gpu constraints)".format(min(workflow.cores,config['esm_slots']),config['esm_threads'])) print(" RegSeq : {}x".format(min(workflow.cores,int(100/config['regseq_load'])))) -print(" MMsplice : {}x (memory constraints)".format(min(workflow.cores,int(100/config['mms_load'])))) +print(" MMsplice : {}x with {} threads each (memory constraints)".format(min(workflow.cores,int(100/config['mms_load'])),config['mms_threads'])) print(" Annotate : {}x".format(min(workflow.cores,int(100/config['anno_load'])))) print(" Impute : {}x".format(min(workflow.cores,int(100/config['impute_load'])))) @@ -83,7 +86,6 @@ envvars: # START Rules - rule decompress: conda: "envs/environment_minimal.yml" @@ -231,6 +233,8 @@ rule annotate_esm: "{file}.chunk_{chunk}.annotate_esm.log", resources: load=int(config['esm_load']), + threads: + config['esm_threads'], params: cadd=os.environ["CADD"], models=["--model %s " % model for model in config["ESMmodels"]], @@ -311,6 +315,8 @@ rule annotate_mmsplice: cadd=os.environ["CADD"], resources: load=int(config['mms_load']), + threads: + config['mms_threads'], shell: """ tabix -p vcf {input.vcf} &> {log}; @@ -453,4 +459,4 @@ rule join: """ -# END Rules \ No newline at end of file +# END Rules diff --git a/src/scripts/lib/tools/esmScore/esmScore_frameshift_av.py b/src/scripts/lib/tools/esmScore/esmScore_frameshift_av.py index f6ba8bf..b46a6e2 100644 --- a/src/scripts/lib/tools/esmScore/esmScore_frameshift_av.py +++ b/src/scripts/lib/tools/esmScore/esmScore_frameshift_av.py @@ -368,6 +368,12 @@ def cli( data_alt.append((transcript_id[i], aa_seq_alt[i][-window:])) ref_alt_scores = [] + # preloading reduces disk I/O time and bottlenecks + preloaded_models = list() + for k in range(0,len(modelsToUse),1): + # print("Preloading model {}".format(k)) + model, alphabet = pretrained.load_model_and_alphabet(modelsToUse[k]) + preloaded_models.append([model,alphabet]) # load esm model(s) for o in range(0, len([data_ref, data_alt]), 1): data = [data_ref, data_alt][o] @@ -375,7 +381,8 @@ def cli( if len(data) >= 1: for k in range(0, len(modelsToUse), 1): torch.cuda.empty_cache() - model, alphabet = pretrained.load_model_and_alphabet(modelsToUse[k]) + # print("fetch model {} from preloaded models".format(k)) + model, alphabet = preloaded_models[k] # pretrained.load_model_and_alphabet(modelsToUse[k]) model.eval() # disables dropout for deterministic results batch_converter = alphabet.get_batch_converter() @@ -386,6 +393,7 @@ def cli( # apply es model to sequence, tokenProbs hat probs von allen aa an jeder pos basierend auf der seq in "data" seq_scores = [] for t in range(0, len(data), batch_size): + # print("modelling allele {}/2 : model {}/{} : {}/{}".format(o+1,k+1,len(modelsToUse),t+1,len(data))) if t + batch_size > len(data): batch_data = data[t:] else: @@ -423,9 +431,9 @@ def cli( else: # calc mean of all possible aa at this position aa_scores = [] - for k in range(4, 24, 1): + for l in range(4, 24, 1): aa_scores.append( - token_probs[i, y + 1, k] + token_probs[i, y + 1, l] ) # for all aa (except selenocystein) aa_scores.append( token_probs[i, y + 1, 26] diff --git a/src/scripts/lib/tools/esmScore/esmScore_inFrame_av.py b/src/scripts/lib/tools/esmScore/esmScore_inFrame_av.py index a8ca043..8a1b0af 100644 --- a/src/scripts/lib/tools/esmScore/esmScore_inFrame_av.py +++ b/src/scripts/lib/tools/esmScore/esmScore_inFrame_av.py @@ -464,6 +464,11 @@ def cli(input_file, transcript_file, model_directory, modelsToUse, output_file, data_alt.append((transcript_id[i], aa_seq_alt[i][-window:])) ref_alt_scores = [] + preloaded_models = list() + for k in range(0,len(modelsToUse),1): + # print("Preloading model {}".format(k)) + model, alphabet = pretrained.load_model_and_alphabet(modelsToUse[k]) + preloaded_models.append([model,alphabet]) # load esm model(s) for o in range(0, len([data_ref, data_alt]), 1): data = [data_ref, data_alt][o] @@ -471,7 +476,8 @@ def cli(input_file, transcript_file, model_directory, modelsToUse, output_file, if len(data) >= 1: for k in range(0, len(modelsToUse), 1): torch.cuda.empty_cache() - model, alphabet = pretrained.load_model_and_alphabet(modelsToUse[k]) + # print("fetch model {} from preloaded models".format(k)) + model, alphabet = preloaded_models[k] # pretrained.load_model_and_alphabet(modelsToUse[k]) model.eval() # disables dropout for deterministic results batch_converter = alphabet.get_batch_converter() From 7399d98c3f4e6ea791e4ea10c39810ee6bd1e145 Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Fri, 7 Mar 2025 06:17:33 +0000 Subject: [PATCH 09/17] working on dockerfile --- CADD.sh | 17 ++++++++- Dockerfile | 87 ++++++++++++++++++++++++++++-------------- Install_Annotations.sh | 63 ++++++++++++++++++++++++++++++ Snakefile | 11 ++++-- 4 files changed, 143 insertions(+), 35 deletions(-) create mode 100644 Install_Annotations.sh diff --git a/CADD.sh b/CADD.sh index 4ad3eee..151c252 100755 --- a/CADD.sh +++ b/CADD.sh @@ -31,7 +31,18 @@ SIGNULARITYARGS="" VERBOSE="-q" CORES="1" RM_TMP_DIR=true -TMP_PREFIX="/tmp" +# set memory to available memory by default (can be lower than system memory in docker containers): +if [[ -f "/sys/fs/cgroup/memory/memory.limit_in_bytes" ]]; then + MEMORY=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) + MEMORY=$((MEMORY / 1024 / 1024 / 1024)) +elif [[ -f "/sys/fs/cgroup/memory.max" ]]; then + MEMORY=$(cat /sys/fs/cgroup/memory.max) + MEMORY=$((MEMORY / 1024 / 1024 / 1024)) +else + # if files are not available, determine in snakemake + MEMORY="0" +fi + while getopts ':ho:g:v:c:amr:qpdt:' option; do case "$option" in h) echo "$usage" @@ -45,6 +56,8 @@ while getopts ':ho:g:v:c:amr:qpdt:' option; do ;; c) CORES=$OPTARG ;; + M) MAXMEMORY=$OPTARG + ;; a) ANNOTATION=true ;; m) MAMBAONLY=true @@ -154,7 +167,7 @@ echo "Running snakemake pipeline:" command="snakemake $TMP_OUTFILE \ --resources load=100 \ --sdm conda $SIGNULARITYARGS --conda-prefix $CADD/envs/conda \ - --cores $CORES --configfile $CONFIG \ + --cores $CORES --memory $MEMORY --configfile $CONFIG \ --snakefile $CADD/Snakefile $VERBOSE" echo -e $command diff --git a/Dockerfile b/Dockerfile index e40171f..358a4b0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,29 +1,58 @@ -FROM condaforge/mambaforge:latest -LABEL io.github.snakemake.containerized="true" -LABEL io.github.snakemake.conda_env_hash="cb2c51dd0ad3df620c4914840c5ef6f5570a5ffd8cfd54cec57d2ffef0a76b08" - -# Step 1: Retrieve conda environments - -RUN mkdir -p /conda-envs/a4fcaaffb623ea8aef412c66280bd623 -COPY envs/environment_minimal.yml /conda-envs/a4fcaaffb623ea8aef412c66280bd623/environment.yaml - -RUN mkdir -p /conda-envs/ef25c8d726aebbe9e0ee64fee6c3caa9 -COPY envs/esm.yml /conda-envs/ef25c8d726aebbe9e0ee64fee6c3caa9/environment.yaml - -RUN mkdir -p /conda-envs/7f88b844a05ae487b7bb6530b5e6a90c -COPY envs/mmsplice.yml /conda-envs/7f88b844a05ae487b7bb6530b5e6a90c/environment.yaml - -RUN mkdir -p /conda-envs/dfc51ced08aaeb4cbd3dcd509dec0fc5 -COPY envs/regulatorySequence.yml /conda-envs/dfc51ced08aaeb4cbd3dcd509dec0fc5/environment.yaml - -RUN mkdir -p /conda-envs/89fe1049cc18768b984c476c399b7989 -COPY envs/vep.yml /conda-envs/89fe1049cc18768b984c476c399b7989/environment.yaml - -# Step 2: Generate conda environments - -RUN mamba env create --prefix /conda-envs/a4fcaaffb623ea8aef412c66280bd623 --file /conda-envs/a4fcaaffb623ea8aef412c66280bd623/environment.yaml && \ - mamba env create --prefix /conda-envs/ef25c8d726aebbe9e0ee64fee6c3caa9 --file /conda-envs/ef25c8d726aebbe9e0ee64fee6c3caa9/environment.yaml && \ - mamba env create --prefix /conda-envs/7f88b844a05ae487b7bb6530b5e6a90c --file /conda-envs/7f88b844a05ae487b7bb6530b5e6a90c/environment.yaml && \ - mamba env create --prefix /conda-envs/dfc51ced08aaeb4cbd3dcd509dec0fc5 --file /conda-envs/dfc51ced08aaeb4cbd3dcd509dec0fc5/environment.yaml && \ - mamba env create --prefix /conda-envs/89fe1049cc18768b984c476c399b7989 --file /conda-envs/89fe1049cc18768b984c476c399b7989/environment.yaml && \ - mamba clean --all -y +###################### +# aws output handler # +###################### + +# includes: +# - the cmg modules +# - dependencies + +FROM ubuntu:24.04 + +## needed apt packages +ARG BUILD_PACKAGES="wget git ssh bzip2 curl axel" +# needed conda packages (only packages not in the requirements of cmg-package) +ARG CONDA_PACKAGES="python==3.12.3 snakemake==8.16.0" +ENV MAMBA_ROOT_PREFIX=/opt/conda/ +ENV PATH /opt/micromamba/bin:/opt/conda/bin:$PATH +# ADD credentials on build +ARG SSH_PRIVATE_KEY +## ENV SETTINGS during runtime +ENV LANG=C.UTF-8 LC_ALL=C.UTF-8 +ENV PATH=/opt/conda/bin:/opt/CADD-scripts/:$PATH +ENV DEBIAN_FRONTEND noninteractive +ENV CADD=/opt/CADD-scripts +SHELL ["/bin/bash", "-l", "-c"] + +# install base packages +RUN echo "Acquire::http::Pipeline-Depth 0;" > /etc/apt/apt.conf.d/99fixbadproxy && \ + echo "Acquire::http::No-Cache true;" >> /etc/apt/apt.conf.d/99fixbadproxy && \ + echo "Acquire::BrokenProxy true;" >> /etc/apt/apt.conf.d/99fixbadproxy && \ + apt-get -y update && \ + apt-get -y upgrade && \ + apt-get install -y $BUILD_PACKAGES && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Install conda/miniforge3 +RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" && \ + /bin/bash Miniforge3-$(uname)-$(uname -m).sh -b -p /opt/conda && \ + rm Miniforge3-$(uname)-$(uname -m).sh && \ + mamba install -y -c conda-forge -c bioconda $CONDA_PACKAGES && \ + conda clean --tarballs --index-cache --packages --yes && \ + conda config --set channel_priority strict && \ + echo ". /opt/conda/etc/profile.d/conda.sh && conda activate base" >> /etc/skel/.bashrc && \ + echo ". /opt/conda/etc/profile.d/conda.sh && conda activate base" >> ~/.bashrc + +# install cadd +RUN cd /opt && \ + git clone https://github.com/geertvandeweyer/CADD-scripts.git && \ + cd CADD-scripts && \ + snakemake test/input.tsv.gz \ + --software-deployment-method conda \ + --conda-create-envs-only \ + --conda-prefix envs/conda \ + --configfile config/config_GRCh38_v1.7.yml \ + --snakefile Snakefile -c 1 + +COPY Install_Annotations.sh /opt/CADD-scripts/Install_Annotations.sh +RUN chmod a+x /opt/CADD-scripts/Install_Annotations.sh \ No newline at end of file diff --git a/Install_Annotations.sh b/Install_Annotations.sh new file mode 100644 index 0000000..081dd38 --- /dev/null +++ b/Install_Annotations.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash + +set -euo pipefail + +# need two arguments: +# 1. target +# 2. build +if [ "$#" -ne 2 ]; then + echo "Usage: $0 " + exit 1 +fi +TARGET=$1 +BUILD=$2 + +# LOCATIONS: +DOWNLOAD_LOCATION=https://kircherlab.bihealth.org/download/CADD + + + +# supported builds: GRCh37, GRCh38 +if [ "$BUILD" != "GRCh37" ] && [ "$BUILD" != "GRCh38" ]; then + echo "Usage: $0 " + echo "Supported builds: GRCh37, GRCh38" + exit 1 +fi + +## ANNOTATIONS +echo "1. ANNOTATIONS" +mkdir -p $TARGET/annotations/ +cd $TARGET/annotations/ +URL="$DOWNLOAD_LOCATION/v1.7/$BUILD/${BUILD}_v1.7.tar.gz" +echo " - download" +axel -a "$URL" +axel -a "$URL.md5" +echo " - md5sum" +md5sum -c ${BUILD}_v1.7.tar.gz.md5 +echo " - untar" +tar -xzvf ${BUILD}_v1.7.tar.gz +rm ${BUILD}_v1.7.tar.gz +rm ${BUILD}_v1.7.tar.gz.md5 + +## PRESCORED +echo "2. PRESCORED" +mkdir -p $TARGET/prescored/${BUILD}_v1.7/noanno/ +cd $TARGET/prescored/${BUILD}_v1.7/noanno/ +URL="$DOWNLOAD_LOCATION/v1.7/$BUILD/whole_genome_SNVs.tsv.gz" +echo " - download" +axel -a "$URL" +axel -a "$URL.md5" +axel -a "$URL.tbi" +axel -a "$URL.tbi.md5" +URL="$DOWNLOAD_LOCATION/v1.7/$BUILD/gnomad.genomes.r4.0.indel.tsv.gz" +axel -a "$URL" +axel -a "$URL.md5" +axel -a "$URL.tbi" +axel -a "$URL.tbi.md5" +echo " - md5sum" +md5sum -c *.md5 +rm *.md5 + + + + diff --git a/Snakefile b/Snakefile index e4a6b34..1b158f7 100644 --- a/Snakefile +++ b/Snakefile @@ -25,13 +25,16 @@ import os ################################### ## RULE SPECFIC THREADING LIMITS ## ################################### -system_memory = psutil.virtual_memory().total / (1024 ** 3) -gpu_memory = 0 +if workflow.memory == 0: + system_memory = psutil.virtual_memory().total / (1024 ** 3) +else: + system_memory = workflow.memory try: lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits 2>/dev/null", shell=True).decode('utf-8').splitlines() # sum over gpu(s) gpu_memory = int(sum([int(x) for x in lines])/1024) except Exception as e: + gpu_memory = 0 pass # esm takes ~16 Gb of system memory per tread & 4 Gb of GPU ram (if available). @@ -43,11 +46,11 @@ if config['esm_slots'] < 1: config['esm_slots'] = 1 # then assign other resources config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) -config['esm_threads'] = int(workflow.cores / config['esm_slots']) +config['esm_threads'] = max(1, int(workflow.cores / config['esm_slots'])) config['vep_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram config['regseq_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram config['mms_load'] = 100 if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram -config['mms_threads'] = int(workflow.cores / (100/config['mms_load'])) +config['mms_threads'] = max(1, int(workflow.cores / (100/config['mms_load']))) config['anno_load'] = 1 # disk IO intensive config['impute_load'] = 1 config['prescore_load'] = 1 From c7fc5892417862df7cbdd1a4150ec438eb20c3eb Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Fri, 7 Mar 2025 06:38:15 +0000 Subject: [PATCH 10/17] expose memory --- CADD.sh | 1 + Snakefile | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CADD.sh b/CADD.sh index 151c252..8d0aef2 100755 --- a/CADD.sh +++ b/CADD.sh @@ -168,6 +168,7 @@ command="snakemake $TMP_OUTFILE \ --resources load=100 \ --sdm conda $SIGNULARITYARGS --conda-prefix $CADD/envs/conda \ --cores $CORES --memory $MEMORY --configfile $CONFIG \ + --config mem_gb=$MEMORY \ --snakefile $CADD/Snakefile $VERBOSE" echo -e $command diff --git a/Snakefile b/Snakefile index 1b158f7..f7cf553 100644 --- a/Snakefile +++ b/Snakefile @@ -25,10 +25,10 @@ import os ################################### ## RULE SPECFIC THREADING LIMITS ## ################################### -if workflow.memory == 0: +if int(config.get("mem_gb",0)) == 0: system_memory = psutil.virtual_memory().total / (1024 ** 3) else: - system_memory = workflow.memory + system_memory = int(config["mem_gb"]) try: lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits 2>/dev/null", shell=True).decode('utf-8').splitlines() # sum over gpu(s) From fe3f90b203d2ca461a203b1d753593eb43729f34 Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Fri, 7 Mar 2025 06:53:06 +0000 Subject: [PATCH 11/17] fixed missing arg --- CADD.sh | 3 ++- Dockerfile | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CADD.sh b/CADD.sh index 8d0aef2..30ce811 100755 --- a/CADD.sh +++ b/CADD.sh @@ -16,6 +16,7 @@ where: -d do not remove temporary directory for debug puroposes -t specify location for temporary directory [default: /tmp/] -c number of cores that snakemake is allowed to use [default: 1] + -M maximum memory that snakemake is allowed to use [default: available memory] " unset OPTARG @@ -43,7 +44,7 @@ else MEMORY="0" fi -while getopts ':ho:g:v:c:amr:qpdt:' option; do +while getopts ':ho:g:v:c:M:amr:qpdt:' option; do case "$option" in h) echo "$usage" exit diff --git a/Dockerfile b/Dockerfile index 358a4b0..8826b04 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,9 +45,9 @@ RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/downloa # install cadd RUN cd /opt && \ - git clone https://github.com/geertvandeweyer/CADD-scripts.git && \ + git clone --branch Fix/max_memory https://github.com/geertvandeweyer/CADD-scripts.git && \ cd CADD-scripts && \ - snakemake test/input.tsv.gz \ + snakemake test/input.vcf \ --software-deployment-method conda \ --conda-create-envs-only \ --conda-prefix envs/conda \ From da98ffc9c05e8861f91e56b612e506e1be232c5e Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Fri, 7 Mar 2025 11:17:05 +0000 Subject: [PATCH 12/17] more fixes --- CADD.sh | 2 +- Install_Annotations.sh | 2 +- schemas/config_schema.yaml | 4 ++++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CADD.sh b/CADD.sh index 30ce811..effe3d0 100755 --- a/CADD.sh +++ b/CADD.sh @@ -168,7 +168,7 @@ echo "Running snakemake pipeline:" command="snakemake $TMP_OUTFILE \ --resources load=100 \ --sdm conda $SIGNULARITYARGS --conda-prefix $CADD/envs/conda \ - --cores $CORES --memory $MEMORY --configfile $CONFIG \ + --cores $CORES --configfile $CONFIG \ --config mem_gb=$MEMORY \ --snakefile $CADD/Snakefile $VERBOSE" diff --git a/Install_Annotations.sh b/Install_Annotations.sh index 081dd38..01dee7f 100644 --- a/Install_Annotations.sh +++ b/Install_Annotations.sh @@ -13,7 +13,7 @@ TARGET=$1 BUILD=$2 # LOCATIONS: -DOWNLOAD_LOCATION=https://kircherlab.bihealth.org/download/CADD +DOWNLOAD_LOCATION=https://krishna.gs.washington.edu/download/CADD diff --git a/schemas/config_schema.yaml b/schemas/config_schema.yaml index 3f77190..b89bd27 100644 --- a/schemas/config_schema.yaml +++ b/schemas/config_schema.yaml @@ -77,6 +77,10 @@ properties: Columns: type: string pattern: "^[0-9]+(-[0-9]+)?(,[0-9]+(-[0-9]+)?)*$" + mem_gb: + type: integer + minimum: 0 + description: Available memory in GB, to scale max thread counts additionalProperties: false From 77b56b02f63f6bf664a4641a71040407f6de2f64 Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Sat, 8 Mar 2025 20:36:03 +0100 Subject: [PATCH 13/17] fix for empty files in MMsplice --- Snakefile | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/Snakefile b/Snakefile index f7cf553..0ba6558 100644 --- a/Snakefile +++ b/Snakefile @@ -168,17 +168,22 @@ checkpoint prescore: """ # Prescoring echo '## Prescored variant file' > {output.prescored} 2> {log}; + # are prescored files available? PRESCORED_FILES=`find -L {input.prescored} -maxdepth 1 -type f -name \\*.tsv.gz | wc -l` cp {input.vcf} {input.vcf}.new if [ ${{PRESCORED_FILES}} -gt 0 ]; then + # loop over all prescored files: snv , indel, ... for PRESCORED in $(ls {input.prescored}/*.tsv.gz) do + # extract writes found to outfile, not-found to stdout cat {input.vcf}.new \ | python {params.cadd}/src/scripts/extract_scored.py --header \ -p $PRESCORED --found_out={output.prescored}.tmp \ > {input.vcf}.tmp 2>> {log}; + # get prescored to the outfile cat {output.prescored}.tmp >> {output.prescored} + # put not-found back to the input mv {input.vcf}.tmp {input.vcf}.new &> {log}; done; rm {output.prescored}.tmp &>> {log} @@ -311,23 +316,40 @@ rule annotate_mmsplice: reference="%s/reference.fa" % config.get("REFERENCEpath", ""), output: mmsplice="{file}_splits/chunk_{chunk}.mmsplice.vcf.gz", + # needed ? idx="{file}_splits/chunk_{chunk}.regseq.vcf.gz.tbi", log: "{file}.chunk_{chunk}.annotate_mmsplice.log", params: cadd=os.environ["CADD"], + mms_threads=config['mms_threads'] # Assigning the number of threads for mmsplice resources: load=int(config['mms_load']), threads: config['mms_threads'], shell: """ - tabix -p vcf {input.vcf} &> {log}; - KERAS_BACKEND=tensorflow python {params.cadd}/src/scripts/lib/tools/MMSplice.py -i {input.vcf} \ - -g {input.transcripts} \ - -f {input.reference} | \ - grep -v '^Variant(CHROM=' | \ - bgzip -c > {output.mmsplice} 2>> {log} + # mmsplice crashes on empty vcf files: evaluate nr of variants left + LC=$(bgzip -dc {input.vcf} | grep -c -v '#' || true) + bgzip -dc {input.vcf} > /dev/null + echo "nr of lines: " + if [[ "$LC" -eq 0 ]]; then + echo "Empty VCF file, skipping mmsplice annotation." >> {log} + tabix -p vcf {input.vcf} &>> {log}; + cp {input.vcf} {output.mmsplice} + else + # set parallelism for tensorflow : + export OMP_NUM_THREADS={params.mms_threads} + export TF_NUM_INTRAOP_THREADS={params.mms_threads} + export TF_NUM_INTEROP_THREADS=1 + # annotate + tabix -p vcf {input.vcf} &> {log}; + KERAS_BACKEND=tensorflow python {params.cadd}/src/scripts/lib/tools/MMSplice.py -i {input.vcf} \ + -g {input.transcripts} \ + -f {input.reference} | \ + grep -v '^Variant(CHROM=' | \ + bgzip -c > {output.mmsplice} 2>> {log} + fi """ From 1a4a8f8dd38e880a45e6ff553894a87a7783ac3b Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Mon, 17 Mar 2025 10:30:27 +0100 Subject: [PATCH 14/17] revise slots for cpu/gpu separately --- CADD.sh | 4 +- Dockerfile | 33 +++++++++------ Snakefile | 121 ++++++++++++++++++++++++++++++++--------------------- 3 files changed, 96 insertions(+), 62 deletions(-) diff --git a/CADD.sh b/CADD.sh index effe3d0..05e5fd4 100755 --- a/CADD.sh +++ b/CADD.sh @@ -164,9 +164,9 @@ fi echo "Running snakemake pipeline:" -# resources is added to divide total load wrt number of cores +# resources is added to divide total load wrt number of cores & gpus command="snakemake $TMP_OUTFILE \ - --resources load=100 \ + --resources cpu_load=100 --resources gpu_load=100 \ --sdm conda $SIGNULARITYARGS --conda-prefix $CADD/envs/conda \ --cores $CORES --configfile $CONFIG \ --config mem_gb=$MEMORY \ diff --git a/Dockerfile b/Dockerfile index 8826b04..ce7e16a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,16 +43,25 @@ RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/downloa echo ". /opt/conda/etc/profile.d/conda.sh && conda activate base" >> /etc/skel/.bashrc && \ echo ". /opt/conda/etc/profile.d/conda.sh && conda activate base" >> ~/.bashrc -# install cadd +# install cadd & run test file to generate all envs RUN cd /opt && \ - git clone --branch Fix/max_memory https://github.com/geertvandeweyer/CADD-scripts.git && \ - cd CADD-scripts && \ - snakemake test/input.vcf \ - --software-deployment-method conda \ - --conda-create-envs-only \ - --conda-prefix envs/conda \ - --configfile config/config_GRCh38_v1.7.yml \ - --snakefile Snakefile -c 1 - -COPY Install_Annotations.sh /opt/CADD-scripts/Install_Annotations.sh -RUN chmod a+x /opt/CADD-scripts/Install_Annotations.sh \ No newline at end of file + git clone --branch Fix/max_memory https://github.com/geertvandeweyer/CADD-scripts.git + #cd CADD-scripts && \ + #snakemake test/input.vcf \ + # --software-deployment-method conda \ + ## --conda-create-envs-only \ + # --conda-prefix envs/conda \ + # --configfile config/config_GRCh38_v1.7.yml \ + # --snakefile Snakefile -c 1 + +#COPY Install_Annotations.sh /opt/CADD-scripts/Install_Annotations.sh +RUN chmod a+x /opt/CADD-scripts/Install_Annotations.sh + +## some follow up instructions are needed: +RUN echo "WARNING: CADD-scripts installed. To use the container, the following commands are needed: " +RUN echo "# download the annotations sources" +RUN echo "docker run -v /mnt/CADD_data:/opt/CADD-scripts/data my-cadd-scripts:my_version /opt/CADD-Scripts/Install_Annotations.sh /opt/CADD-scripts/data GRCh38" +RUN echo "# run the script on the test data to prepare all conda envs" +RUN echo "docker run --name prep-container -w /opt/CADD-scripts -v /mnt/CADD_data/annotations:/opt/CADD-scripts/data/annotations -v /mnt/CADD_data/prescored:/opt/CADD-scripts/data/prescored my-cadd-scripts:my_version bash -c 'snakemake test/input.tsv.gz --resources load=100 --sdm conda --conda-prefix /opt/CADD-scripts/envs/conda --configfile /opt/CADD-scripts/config/config_GRCh38_v1.7_noanno.yml --snakefile /opt/CADD-scripts/Snakefile -c 1 ; rm -Rf /opt/CADD-scripts/test/input_splits /opt/CADD-scripts/test/input.chunk* /opt/CADD-scripts/test/input.*.log /opt/conda/pkgs/*' " +RUN echo "# commit the changes to the image" +RUN echo "docker commit prep-container my-cadd-scripts:my_version" \ No newline at end of file diff --git a/Snakefile b/Snakefile index 0ba6558..2764e62 100644 --- a/Snakefile +++ b/Snakefile @@ -33,47 +33,64 @@ try: lines = subprocess.check_output("nvidia-smi --query-gpu=memory.total --format=csv,noheader,nounits 2>/dev/null", shell=True).decode('utf-8').splitlines() # sum over gpu(s) gpu_memory = int(sum([int(x) for x in lines])/1024) + # nr of gpus + gpu_count = len(lines) except Exception as e: gpu_memory = 0 + gpu_count = 0 pass - -# esm takes ~16 Gb of system memory per tread & 4 Gb of GPU ram (if available). -if gpu_memory > 0: - config['esm_slots'] = int(min((0.9*system_memory)/16, 0.95*gpu_memory/4)) + +############### +## GPU TASKS ## +############### +# esm takes ~16 Gb of system memory per tread & 14 Gb of GPU ram (if available). +# if gpu is available, we can use it for esm : one thread per gpu. +if gpu_count > 0: + config['esm_cpu_slots'] = int(max(1,(0.9*system_memory)/16)) + config['esm_gpu_slots'] = int(0.9*gpu_memory/14) else: - config['esm_slots'] = int(0.9*system_memory / 16) -if config['esm_slots'] < 1: - config['esm_slots'] = 1 + config['esm_cpu_slots'] = int(max(1,0.9*system_memory / 16)) + config['esm_gpu_slots'] = 0 +config['esm_cpu_load'] = int(100/config['esm_cpu_slots']) +config['esm_gpu_load'] = 100 if config['esm_gpu_slots'] < 1 else int(100/config['esm_gpu_slots']) +config['esm_cpu_threads'] = max(1, int(workflow.cores / config['esm_cpu_slots'])) + +############### +## CPU TASKS ## +############### # then assign other resources -config['esm_load'] = 100 if config['esm_slots'] < 1 else int(100/config['esm_slots']) -config['esm_threads'] = max(1, int(workflow.cores / config['esm_slots'])) -config['vep_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram -config['regseq_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram -config['mms_load'] = 100 if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram -config['mms_threads'] = max(1, int(workflow.cores / (100/config['mms_load']))) -config['anno_load'] = 1 # disk IO intensive -config['impute_load'] = 1 -config['prescore_load'] = 1 -config['score_load'] = 1 - -print("Threading Overview",flush=True) -print("##################",flush=True) -print("Assigned cores: {}".format(workflow.cores),flush=True) -print("Available system memory: {}GB".format(int(system_memory)),flush=True) +config['vep_cpu_load'] = 100 if system_memory < 4 else int(100/int(0.9*system_memory / 4 )) # up to 4Gb/ram +config['vep_cpu_threads'] = max(1, int(workflow.cores / (100/config['vep_cpu_load']))) +config['regseq_cpu_load'] = 100 if system_memory < 2 else int(100/int(0.9*system_memory / 2 )) # up to 2Gb of ram +config['mms_cpu_load'] = 100 if system_memory < 16 else int(100/int(0.9*system_memory / 16 )) # up to 16Gb/ram +config['mms_cpu_threads'] = max(1, int(workflow.cores / (100/config['mms_cpu_load']))) +config['anno_cpu_load'] = 1 # disk IO intensive +config['impute_cpu_load'] = 1 +config['prescore_cpu_load'] = 10 # disk IO intensive +config['score_cpu_load'] = 1 + +print("Threading Overview") +print("##################") +print("Assigned cores: {}".format(workflow.cores)) +print("Available system memory: {}GB".format(int(system_memory))) if gpu_memory > 0: - print("Total GPU memory: {}GB".format(gpu_memory),flush=True) + print("Total GPU memory: {}GB".format(gpu_memory)) else: - print("No gpu found",flush=True) -print("Task Parallelization: ",flush=True) -print(" PreScore : {}x".format(min(workflow.cores,int(100/config['prescore_load'])))) -print(" VEP : {}x".format(min(workflow.cores,int(100/config['vep_load'])))) -print(" ESM : {}x with {} threads each (memory/gpu constraints)".format(min(workflow.cores,config['esm_slots']),config['esm_threads'])) -print(" RegSeq : {}x".format(min(workflow.cores,int(100/config['regseq_load'])))) -print(" MMsplice : {}x with {} threads each (memory constraints)".format(min(workflow.cores,int(100/config['mms_load'])),config['mms_threads'])) -print(" Annotate : {}x".format(min(workflow.cores,int(100/config['anno_load'])))) -print(" Impute : {}x".format(min(workflow.cores,int(100/config['impute_load'])))) + print("No gpu found") + +# print threading limits +print("Task Parallelization: ") +print(" PreScore : {}x".format(min(workflow.cores,int(100/config['prescore_cpu_load'])))) +print(" VEP : {}x with {} threads each".format(min(workflow.cores,int(100/config['vep_cpu_load'])), config['vep_cpu_threads'])) +print(" ESM : CPU : {}x with {} threads each (memory constraints)".format(min(workflow.cores,config['esm_cpu_slots']),config['esm_cpu_threads'])) +print(" ESM : GPU : {}x (gpu memory constraints)".format(config['esm_gpu_slots'])) +print(" RegSeq : {}x".format(min(workflow.cores,int(100/config['regseq_cpu_load'])))) +print(" MMsplice : {}x with {} threads each (memory constraints)".format(min(workflow.cores,int(100/config['mms_cpu_load'])),config['mms_cpu_threads'])) +print(" Annotate : {}x".format(min(workflow.cores,int(100/config['anno_cpu_load'])))) +print(" Impute : {}x".format(min(workflow.cores,int(100/config['impute_cpu_load'])))) - +# flush output before starting workflow +print("Starting workflow",flush=True) ## allowed scattering @@ -121,7 +138,7 @@ rule prepare: threads=workflow.cores, resources: # < 1GB of memory - load=1, + cpu_load=1, shell: """ mkdir -p {wildcards.file}_splits/ 2>> {log} @@ -143,7 +160,11 @@ rule prepare: # strip padding zeros in the file names for f in {wildcards.file}_splits/chunk_*.prepared.vcf do - mv -n "$f" "$(echo "$f" | sed -E 's/(chunk_)0*([1-9][0-9]*)(-of-{params.threads}\\.prepared\\.vcf)/\\1\\2\\3/')" + target=$(echo "$f" | sed -E 's/(chunk_)0*([1-9][0-9]*)(-of-{params.threads}\\.prepared\\.vcf)/\\1\\2\\3/') + if [ "$f" != "$target" ]; then + mv -n "$f" "$target" + fi + done """ @@ -163,7 +184,7 @@ checkpoint prescore: cadd=os.environ["CADD"], resources: # < 1GB of memory - load=int(config['prescore_load']), + cpu_load=int(config['prescore_cpu_load']), shell: """ # Prescoring @@ -206,9 +227,12 @@ rule annotation_vep: cadd=os.environ["CADD"], genome_build=config["GenomeBuild"], ensembl_db=config["EnsemblDB"], + threads=config['vep_cpu_threads'], resources: # < 1GB of memory - load=int(config['vep_load']), + cpu_load=int(config['vep_cpu_load']), + threads: + config['vep_cpu_threads'], shell: """ cat {input.vcf} \ @@ -217,6 +241,7 @@ rule annotation_vep: --db_version={params.ensembl_db} --assembly {params.genome_build} \ --format vcf --regulatory --sift b --polyphen b --per_gene --ccds --domains \ --numbers --canonical --total_length --vcf --force_overwrite --output_file STDOUT \ + --fork {params.threads} \ | bgzip -c > {output} 2> {log} """ @@ -240,9 +265,10 @@ rule annotate_esm: log: "{file}.chunk_{chunk}.annotate_esm.log", resources: - load=int(config['esm_load']), + cpu_load=int(config['esm_cpu_load']), + gpu_load=int(config['esm_gpu_load']), threads: - config['esm_threads'], + config['esm_cpu_threads'], params: cadd=os.environ["CADD"], models=["--model %s " % model for model in config["ESMmodels"]], @@ -254,7 +280,6 @@ rule annotate_esm: model_directory=`dirname {input.models[0]}`; model_directory=`dirname $model_directory`; - python {params.cadd}/src/scripts/lib/tools/esmScore/esmScore_missense_av_fast.py \ --input {input.vcf} \ --transcripts {input.transcripts} \ @@ -294,7 +319,7 @@ rule annotate_regseq: cadd=os.environ["CADD"], resources: # roughly 4GB of memory - load=int(config['regseq_load']), + cpu_load=int(config['regseq_cpu_load']), shell: """ python {params.cadd}/src/scripts/lib/tools/regulatorySequence/predictVariants.py \ @@ -322,11 +347,11 @@ rule annotate_mmsplice: "{file}.chunk_{chunk}.annotate_mmsplice.log", params: cadd=os.environ["CADD"], - mms_threads=config['mms_threads'] # Assigning the number of threads for mmsplice + mms_threads=config['mms_cpu_threads'] # Assigning the number of threads for mmsplice resources: - load=int(config['mms_load']), + cpu_load=int(config['mms_cpu_load']), threads: - config['mms_threads'], + config['mms_cpu_threads'], shell: """ # mmsplice crashes on empty vcf files: evaluate nr of variants left @@ -341,7 +366,7 @@ rule annotate_mmsplice: # set parallelism for tensorflow : export OMP_NUM_THREADS={params.mms_threads} export TF_NUM_INTRAOP_THREADS={params.mms_threads} - export TF_NUM_INTEROP_THREADS=1 + export TF_NUM_INTEROP_THREADS={params.mms_threads} # annotate tabix -p vcf {input.vcf} &> {log}; KERAS_BACKEND=tensorflow python {params.cadd}/src/scripts/lib/tools/MMSplice.py -i {input.vcf} \ @@ -367,7 +392,7 @@ rule annotation: params: cadd=os.environ["CADD"], resources: - load=int(config['anno_load']), + cpu_load=int(config['anno_cpu_load']), shell: """ zcat {input.vcf} \ @@ -390,7 +415,7 @@ rule imputation: params: cadd=os.environ["CADD"], resources: - load=int(config['impute_load']), + cpu_load=int(config['impute_cpu_load']), shell: """ zcat {input.tsv} \ @@ -416,7 +441,7 @@ rule score: use_anno=config["Annotation"], columns=config["Columns"], resources: - load=config['score_load'], + cpu_load=config['score_cpu_load'], shell: """ python {params.cadd}/src/scripts/predictSKmodel.py \ From 38572b5b6644909cb6ce42d8c66b54ef38dfcd5e Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Tue, 4 Nov 2025 18:14:17 +0100 Subject: [PATCH 15/17] fix for input file with only invalid variants --- Snakefile | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Snakefile b/Snakefile index 2764e62..6c68be7 100644 --- a/Snakefile +++ b/Snakefile @@ -151,6 +151,14 @@ rule prepare: # split LC=$(wc -l {wildcards.file}_splits/full.vcf | cut -f1 -d' ') + ## if zero, stop the workflow here (as success) + if [[ "$LC" -eq 0 ]]; then + echo "No variants to process. Exiting." >> {log} + # put expected files in place for this step (generated by split and the for loop below) + touch {wildcards.file}_splits/chunk_1-of-{params.threads}.prepared.vcf + exit 0 + fi + LC=$(((LC / {params.threads})+1)) split -l $LC --numeric-suffixes=1 --additional-suffix="-of-{params.threads}.prepared.vcf" {wildcards.file}_splits/full.vcf {wildcards.file}_splits/chunk_ 2>> {log} From 7b498125ec7254968d507e0c62549529ff7157e2 Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Tue, 4 Nov 2025 19:45:22 +0100 Subject: [PATCH 16/17] fix error when no valid varaints in infile --- Snakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index 6c68be7..831c265 100644 --- a/Snakefile +++ b/Snakefile @@ -144,7 +144,7 @@ rule prepare: mkdir -p {wildcards.file}_splits/ 2>> {log} cat {input.vcf} \ | python {params.cadd}/src/scripts/VCF2vepVCF.py \ - | grep -v '^#' \ + | grep -v '^#' || true \ | sed 's/^chr//' \ | sort -k1,1 -k2,2n -k4,4 -k5,5 \ | uniq > {wildcards.file}_splits/full.vcf 2> {log} From cb68b1e46a4785c2374d56f41127ef568de63a03 Mon Sep 17 00:00:00 2001 From: Geert Vandeweyer Date: Fri, 7 Nov 2025 10:22:41 +0100 Subject: [PATCH 17/17] fixes to no or bad input --- Snakefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Snakefile b/Snakefile index 831c265..b22bc65 100644 --- a/Snakefile +++ b/Snakefile @@ -144,7 +144,7 @@ rule prepare: mkdir -p {wildcards.file}_splits/ 2>> {log} cat {input.vcf} \ | python {params.cadd}/src/scripts/VCF2vepVCF.py \ - | grep -v '^#' || true \ + | (grep -v '^#' || true) \ | sed 's/^chr//' \ | sort -k1,1 -k2,2n -k4,4 -k5,5 \ | uniq > {wildcards.file}_splits/full.vcf 2> {log}