diff --git a/code/molecular_phenotypes/QC/bulk_expression_normalization.ipynb b/code/molecular_phenotypes/QC/bulk_expression_normalization.ipynb index 18f8517cd..282632d37 100644 --- a/code/molecular_phenotypes/QC/bulk_expression_normalization.ipynb +++ b/code/molecular_phenotypes/QC/bulk_expression_normalization.ipynb @@ -181,7 +181,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 4, "id": "fewer-niagara", "metadata": { "kernel": "Bash" @@ -191,6 +191,8 @@ "name": "stdout", "output_type": "stream", "text": [ + "/home/caox2/.pixi/envs/python/lib/python3.12/site-packages/sos/targets.py:22: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.\n", + " import pkg_resources\n", "usage: sos run bulk_expression_normalization.ipynb\n", " [workflow_name | -t targets] [options] [workflow_options]\n", " workflow_name: Single or combined workflows defined in this script\n", @@ -218,9 +220,11 @@ " --tpm-threshold 0.1 (as float)\n", " --count-threshold 6 (as int)\n", " --sample-frac-threshold 0.2 (as float)\n", - " --normalization-method tmm\n", - " Normalization method: TMM (tmm) or quantile\n", + " --normalization-method 'tmm_cpm_voom'\n", + " Normalization method: TMM + CPM(voom) (tmm_cpm_voom),\n", + " TMM + CPM(edgeR) (tmm_cpm_edger), or quantile\n", " normalization (qn)\n", + " --[no-]quantile-normalize (default to True)\n", " --job-size 1 (as int)\n", " For cluster jobs, number commands to run per job\n", " --walltime 5h\n", @@ -265,8 +269,10 @@ "parameter: tpm_threshold = 0.1\n", "parameter: count_threshold = 6\n", "parameter: sample_frac_threshold = 0.2\n", - "# Normalization method: TMM (tmm) or quantile normalization (qn)\n", - "parameter: normalization_method = 'tmm'\n", + "# Normalization method: TMM + CPM(voom) (tmm_cpm_voom), TMM + CPM(edgeR) (tmm_cpm_edger), or quantile normalization (qn)\n", + "parameter: normalization_method = 'tmm_cpm_voom'\n", + "# Quantile Normalization after rescale\n", + "parameter: quantile_normalize = True\n", "# For cluster jobs, number commands to run per job\n", "parameter: job_size = 1\n", "# Wall clock time expected\n", @@ -282,8 +288,8 @@ }, { "cell_type": "code", - "execution_count": 2, - "id": "front-jaguar", + "execution_count": null, + "id": "1b887bc9-1438-442a-9d5b-057975c930ac", "metadata": { "kernel": "SoS" }, @@ -292,14 +298,164 @@ "[normalize]\n", "# Path to the input molecular phenotype data, should be a processd and indexed bed.gz file, with tabix index.\n", "input: tpm_gct, counts_gct, annotation_gtf, sample_participant_lookup\n", - "output: f'{cwd:a}/{_input[0]:bnnn}.{normalization_method}.expression.bed.gz'\n", + "output: f'{cwd:a}/{_input[0]:bnnn}.{normalization_method}.{\"no_qnorm.\" if not quantile_normalize else \"\"}expression.bed.gz'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output[0]:bn}' \n", "python: expand = \"${ }\", stderr = f'{_output[0]:nn}.stderr', stdout = f'{_output[0]:nn}.stdout',container = container, entrypoint = entrypoint\n", - " import pandas as pd \n", - " #read the sample map file\n", + " import numpy as np\n", + " import pandas as pd\n", + " import os\n", + " import qtl.io\n", + " import qtl.norm\n", + " \n", + " # ============= Function definition (packaged from eqtl_prepare_expression.py) =============\n", + " # This function is adapted from the GTEx pipeline's eqtl_prepare_expression.py script\n", + " # Source: https://github.com/broadinstitute/gtex-pipeline/blob/master/qtl/src/eqtl_prepare_expression.py\n", + " # \n", + " # Modifications from the original script:\n", + " # 1. Converted command-line interface (argparse) to function parameters for direct Python calling\n", + " # 2. Encapsulated the main script logic into a callable function\n", + " # 3. Made all parameters explicit function arguments with appropriate defaults\n", + " # 4. Preserved all core processing logic and algorithms from the original implementation\n", + " # 5. Supports three normalization methods:\n", + " # - TMM + CPM with voom transformation (tmm_cpm_voom) [default]\n", + " # - TMM + CPM with edgeR (tmm_cpm_edger)\n", + " # - Quantile normalization (qn)\n", + " # 6. Added optional inverse normal transformation via the 'qnorm' parameter:\n", + " # - 'qnorm': Apply inverse normal transformation after primary normalization\n", + " # - 'none': Skip inverse normal transformation [default]\n", + " #\n", + " # This implementation allows direct function calls within the workflow without external script execution,\n", + " # maintaining compatibility with the original GTEx pipeline while providing better integration.\n", + " # =========================================================================================\n", + " \n", + " def eqtl_prepare_expression(tpm_gct, counts_gct, annotation_gtf, sample_to_participant,\n", + " prefix, output_dir='.', sample_ids=None, chrs=None,\n", + " convert_tpm=False, tpm_threshold=0.1, count_threshold=6,\n", + " sample_frac_threshold=0.2, normalization_method='tmm_cpm_voom',\n", + " qnorm='none', parquet=False):\n", + " \"\"\"\n", + " Generate normalized expression BED files for eQTL analyses\n", + " \n", + " This function packages the main logic from eqtl_prepare_expression.py\n", + " \"\"\"\n", + " \n", + " # First, define the prepare_expression function (from the original script)\n", + " def prepare_expression(counts_df, tpm_df, sample_frac_threshold=0.2,\n", + " count_threshold=6, tpm_threshold=0.1, mode='tmm_cpm_voom', qnorm='none'):\n", + " \"\"\"\n", + " Genes are filtered using the following expression thresholds:\n", + " TPM >= tpm_threshold in >= sample_frac_threshold * samples\n", + " read counts >= count_threshold in sample_frac_threshold * samples\n", + "\n", + " The filtered counts matrix is then normalized using:\n", + " TMM (mode='tmm'; default) or\n", + " quantile normalization (mode='qn')\n", + " \"\"\"\n", + " # expression thresholds\n", + " ns = tpm_df.shape[1]\n", + " mask = (\n", + " (np.sum(tpm_df >= tpm_threshold, axis=1) >= sample_frac_threshold * ns) &\n", + " (np.sum(counts_df >= count_threshold, axis=1) >= sample_frac_threshold * ns)\n", + " ).values\n", + "\n", + " # apply normalization\n", + " if mode.lower() == 'tmm_cpm_edger':\n", + " norm_counts_df = qtl.norm.edger_cpm(counts_df, normalized_lib_sizes=True)\n", + " norm_counts_df = norm_counts_df[mask]\n", + " elif mode.lower() == 'tmm_cpm_voom':\n", + " norm_counts_df = qtl.norm.voom_transform(counts_df)\n", + " norm_counts_df = norm_counts_df[mask]\n", + " elif mode.lower() == 'qn':\n", + " norm_counts_df = qtl.norm.normalize_quantiles(tpm_df.loc[mask])\n", + " else:\n", + " raise ValueError(f'Unsupported mode {mode}')\n", + "\n", + " # apply quantile normalization\n", + " if qnorm.lower() == 'qnorm':\n", + " result_df = qtl.norm.inverse_normal_transform(norm_counts_df)\n", + " elif qnorm.lower() == 'none':\n", + " result_df = norm_counts_df\n", + " else:\n", + " raise ValueError(f'Unsupported qnorm mode {qnorm}')\n", + " \n", + " return result_df\n", + " \n", + " # Main logic (from the original __main__ block)\n", + " print('Loading expression data', flush=True)\n", + " \n", + " # Handle sample_ids if provided\n", + " if sample_ids is not None:\n", + " if isinstance(sample_ids, str): # If it's a file path\n", + " with open(sample_ids) as f:\n", + " sample_ids = f.read().strip().split('\\n')\n", + " print(f' * Loading {len(sample_ids)} samples', flush=True)\n", + " \n", + " # Load expression data\n", + " counts_df = qtl.io.read_gct(counts_gct, sample_ids=sample_ids, load_description=False)\n", + " tpm_df = qtl.io.read_gct(tpm_gct, sample_ids=sample_ids, load_description=False)\n", + " \n", + " # Load sample to participant mapping\n", + " sample_to_participant_s = pd.read_csv(sample_to_participant, sep='\\t', index_col=0,\n", + " header=None, dtype=str).squeeze('columns')\n", + " \n", + " # Check inputs\n", + " if not counts_df.columns.equals(tpm_df.columns):\n", + " raise ValueError('Sample IDs in the TPM and read counts files must match.')\n", + " missing_ids = ~counts_df.columns.isin(sample_to_participant_s.index)\n", + " if missing_ids.any():\n", + " raise ValueError(f\"Sample IDs in expression files and participant lookup table must match ({missing_ids.sum()} sample IDs missing from {os.path.basename(sample_to_participant)}).\")\n", + " \n", + " # Convert to TPM if requested\n", + " if convert_tpm:\n", + " print(' * Converting to TPM', flush=True)\n", + " tpm_df = tpm_df / tpm_df.sum(0) * 1e6\n", + " \n", + " # Normalize data\n", + " print(f'Normalizing data ({normalization_method})', flush=True)\n", + " norm_df = prepare_expression(counts_df, tpm_df,\n", + " sample_frac_threshold=sample_frac_threshold,\n", + " count_threshold=count_threshold,\n", + " tpm_threshold=tpm_threshold,\n", + " mode=normalization_method,\n", + " qnorm=qnorm)\n", + " print(f' * {counts_df.shape[0]} genes in input tables', flush=True)\n", + " print(f' * {norm_df.shape[0]} genes remain after thresholding', flush=True)\n", + " \n", + " # Change sample IDs to participant IDs\n", + " norm_df.rename(columns=sample_to_participant_s, inplace=True)\n", + " \n", + " # Handle chromosome list\n", + " if chrs is not None:\n", + " if isinstance(chrs, str): # If it's a file path\n", + " with open(chrs) as f:\n", + " chrs = f.read().strip().split('\\n')\n", + " else:\n", + " chrs = [f'chr{i}' for i in range(1,23)] + ['chrX']\n", + " \n", + " # Prepare BED\n", + " bed_template_df = qtl.io.gtf_to_tss_bed(annotation_gtf, feature='transcript')\n", + " bed_template_df = bed_template_df[bed_template_df['chr'].isin(chrs)]\n", + " bed_df = pd.merge(bed_template_df, norm_df, left_index=True, right_index=True)\n", + " qtl.io.sort_bed(bed_df, inplace=True)\n", + " print(f' * {bed_df.shape[0]} genes remain after selecting chromosomes', flush=True)\n", + " \n", + " # Write output\n", + " print('Writing BED file', flush=True)\n", + " output_file = os.path.join(output_dir, f'{prefix}.expression.bed.gz')\n", + " if not parquet:\n", + " qtl.io.write_bed(bed_df, output_file)\n", + " else:\n", + " output_file = os.path.join(output_dir, f'{prefix}.expression.bed.parquet')\n", + " bed_df.to_parquet(output_file)\n", + " \n", + " return output_file\n", + " \n", + " # ============= Now use the function in your workflow =============\n", + " \n", + " # Check for duplicates first\n", " sample_map = pd.read_table(\"${_input[3]}\")\n", " duplicated = sample_map.loc[sample_map.duplicated(subset=['participant_id'])]\n", - "\n", + " \n", " if duplicated.shape[0] > 0:\n", " print(\"Duplicate samples found. Please remove duplicates from ${_input[3]} before normalizing.\")\n", " print(\"Duplicates:\")\n", @@ -307,28 +463,42 @@ " raise ValueError\n", " else:\n", " print(\"No duplicates found. Proceeding with normalization...\")\n", - "\n", - "bash: expand = \"${ }\", stderr = f'{_output[0]:nn}.stderr', stdout = f'{_output[0]:nn}.stdout',container = container, entrypoint = entrypoint\n", - " for i in {1..22} X Y MT; do echo chr$i; done > ${_output[0]:bnnn}.vcf_chr_list\n", - " eqtl_prepare_expression ${_input[0]} ${_input[1]} ${_input[2]} \\\n", - " ${_input[3]} ${_output[0]:nnn} \\\n", - " --chrs ${_output[0]:bnnn}.vcf_chr_list \\\n", - " --tpm_threshold ${tpm_threshold} \\\n", - " --count_threshold ${count_threshold} \\\n", - " --sample_frac_threshold ${sample_frac_threshold} \\\n", - " --normalization_method ${normalization_method} && \\\n", - " rm -f ${_output[0]:bnnn}.vcf_chr_list" + " \n", + " # Create chromosome list file\n", + " chr_file = \"${_output[0]:bnnn}.vcf_chr_list\"\n", + " chrs = [f'chr{i}' for i in range(1,23)] + ['chrX', 'chrY', 'chrMT']\n", + " with open(chr_file, 'w') as f:\n", + " for chr in chrs:\n", + " f.write(chr + '\\n')\n", + " \n", + " # Set qnorm parameter based on quantile_normalize flag\n", + " qnorm_param = 'qnorm' if ${quantile_normalize} else 'none'\n", + " \n", + " # Call the eqtl_prepare_expression function\n", + " output_file = eqtl_prepare_expression(\n", + " tpm_gct=\"${_input[0]}\",\n", + " counts_gct=\"${_input[1]}\",\n", + " annotation_gtf=\"${_input[2]}\",\n", + " sample_to_participant=\"${_input[3]}\",\n", + " prefix=\"${_output[0]:nnn}\",\n", + " output_dir=\".\",\n", + " sample_ids=None,\n", + " chrs=chr_file,\n", + " convert_tpm=False,\n", + " tpm_threshold=${tpm_threshold},\n", + " count_threshold=${count_threshold},\n", + " sample_frac_threshold=${sample_frac_threshold},\n", + " normalization_method=\"${normalization_method}\",\n", + " qnorm=qnorm_param,\n", + " parquet=False\n", + " )\n", + " \n", + " # Clean up temporary file\n", + " if os.path.exists(chr_file):\n", + " os.remove(chr_file)\n", + " \n", + " print(f\"Preparing expression completed. Output saved to: {output_file}\")" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "1b887bc9-1438-442a-9d5b-057975c930ac", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [] } ], "metadata": {