Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 202 additions & 32 deletions code/molecular_phenotypes/QC/bulk_expression_normalization.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 4,
"id": "fewer-niagara",
"metadata": {
"kernel": "Bash"
Expand All @@ -191,6 +191,8 @@
"name": "stdout",
"output_type": "stream",
"text": [
"/home/caox2/.pixi/envs/python/lib/python3.12/site-packages/sos/targets.py:22: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.\n",
" import pkg_resources\n",
"usage: sos run bulk_expression_normalization.ipynb\n",
" [workflow_name | -t targets] [options] [workflow_options]\n",
" workflow_name: Single or combined workflows defined in this script\n",
Expand Down Expand Up @@ -218,9 +220,11 @@
" --tpm-threshold 0.1 (as float)\n",
" --count-threshold 6 (as int)\n",
" --sample-frac-threshold 0.2 (as float)\n",
" --normalization-method tmm\n",
" Normalization method: TMM (tmm) or quantile\n",
" --normalization-method 'tmm_cpm_voom'\n",
" Normalization method: TMM + CPM(voom) (tmm_cpm_voom),\n",
" TMM + CPM(edgeR) (tmm_cpm_edger), or quantile\n",
" normalization (qn)\n",
" --[no-]quantile-normalize (default to True)\n",
" --job-size 1 (as int)\n",
" For cluster jobs, number commands to run per job\n",
" --walltime 5h\n",
Expand Down Expand Up @@ -265,8 +269,10 @@
"parameter: tpm_threshold = 0.1\n",
"parameter: count_threshold = 6\n",
"parameter: sample_frac_threshold = 0.2\n",
"# Normalization method: TMM (tmm) or quantile normalization (qn)\n",
"parameter: normalization_method = 'tmm'\n",
"# Normalization method: TMM + CPM(voom) (tmm_cpm_voom), TMM + CPM(edgeR) (tmm_cpm_edger), or quantile normalization (qn)\n",
"parameter: normalization_method = 'tmm_cpm_voom'\n",
"# Quantile Normalization after rescale\n",
"parameter: quantile_normalize = True\n",
"# For cluster jobs, number commands to run per job\n",
"parameter: job_size = 1\n",
"# Wall clock time expected\n",
Expand All @@ -282,8 +288,8 @@
},
{
"cell_type": "code",
"execution_count": 2,
"id": "front-jaguar",
"execution_count": null,
"id": "1b887bc9-1438-442a-9d5b-057975c930ac",
"metadata": {
"kernel": "SoS"
},
Expand All @@ -292,43 +298,207 @@
"[normalize]\n",
"# Path to the input molecular phenotype data, should be a processd and indexed bed.gz file, with tabix index.\n",
"input: tpm_gct, counts_gct, annotation_gtf, sample_participant_lookup\n",
"output: f'{cwd:a}/{_input[0]:bnnn}.{normalization_method}.expression.bed.gz'\n",
"output: f'{cwd:a}/{_input[0]:bnnn}.{normalization_method}.{\"no_qnorm.\" if not quantile_normalize else \"\"}expression.bed.gz'\n",
"task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output[0]:bn}' \n",
"python: expand = \"${ }\", stderr = f'{_output[0]:nn}.stderr', stdout = f'{_output[0]:nn}.stdout',container = container, entrypoint = entrypoint\n",
" import pandas as pd \n",
" #read the sample map file\n",
" import numpy as np\n",
" import pandas as pd\n",
" import os\n",
" import qtl.io\n",
" import qtl.norm\n",
" \n",
" # ============= Function definition (packaged from eqtl_prepare_expression.py) =============\n",
" # This function is adapted from the GTEx pipeline's eqtl_prepare_expression.py script\n",
" # Source: https://github.com/broadinstitute/gtex-pipeline/blob/master/qtl/src/eqtl_prepare_expression.py\n",
" # \n",
" # Modifications from the original script:\n",
" # 1. Converted command-line interface (argparse) to function parameters for direct Python calling\n",
" # 2. Encapsulated the main script logic into a callable function\n",
" # 3. Made all parameters explicit function arguments with appropriate defaults\n",
" # 4. Preserved all core processing logic and algorithms from the original implementation\n",
" # 5. Supports three normalization methods:\n",
" # - TMM + CPM with voom transformation (tmm_cpm_voom) [default]\n",
" # - TMM + CPM with edgeR (tmm_cpm_edger)\n",
" # - Quantile normalization (qn)\n",
" # 6. Added optional inverse normal transformation via the 'qnorm' parameter:\n",
" # - 'qnorm': Apply inverse normal transformation after primary normalization\n",
" # - 'none': Skip inverse normal transformation [default]\n",
" #\n",
" # This implementation allows direct function calls within the workflow without external script execution,\n",
" # maintaining compatibility with the original GTEx pipeline while providing better integration.\n",
" # =========================================================================================\n",
" \n",
" def eqtl_prepare_expression(tpm_gct, counts_gct, annotation_gtf, sample_to_participant,\n",
" prefix, output_dir='.', sample_ids=None, chrs=None,\n",
" convert_tpm=False, tpm_threshold=0.1, count_threshold=6,\n",
" sample_frac_threshold=0.2, normalization_method='tmm_cpm_voom',\n",
" qnorm='none', parquet=False):\n",
" \"\"\"\n",
" Generate normalized expression BED files for eQTL analyses\n",
" \n",
" This function packages the main logic from eqtl_prepare_expression.py\n",
" \"\"\"\n",
" \n",
" # First, define the prepare_expression function (from the original script)\n",
" def prepare_expression(counts_df, tpm_df, sample_frac_threshold=0.2,\n",
" count_threshold=6, tpm_threshold=0.1, mode='tmm_cpm_voom', qnorm='none'):\n",
" \"\"\"\n",
" Genes are filtered using the following expression thresholds:\n",
" TPM >= tpm_threshold in >= sample_frac_threshold * samples\n",
" read counts >= count_threshold in sample_frac_threshold * samples\n",
"\n",
" The filtered counts matrix is then normalized using:\n",
" TMM (mode='tmm'; default) or\n",
" quantile normalization (mode='qn')\n",
" \"\"\"\n",
" # expression thresholds\n",
" ns = tpm_df.shape[1]\n",
" mask = (\n",
" (np.sum(tpm_df >= tpm_threshold, axis=1) >= sample_frac_threshold * ns) &\n",
" (np.sum(counts_df >= count_threshold, axis=1) >= sample_frac_threshold * ns)\n",
" ).values\n",
"\n",
" # apply normalization\n",
" if mode.lower() == 'tmm_cpm_edger':\n",
" norm_counts_df = qtl.norm.edger_cpm(counts_df, normalized_lib_sizes=True)\n",
" norm_counts_df = norm_counts_df[mask]\n",
" elif mode.lower() == 'tmm_cpm_voom':\n",
" norm_counts_df = qtl.norm.voom_transform(counts_df)\n",
" norm_counts_df = norm_counts_df[mask]\n",
" elif mode.lower() == 'qn':\n",
" norm_counts_df = qtl.norm.normalize_quantiles(tpm_df.loc[mask])\n",
" else:\n",
" raise ValueError(f'Unsupported mode {mode}')\n",
"\n",
" # apply quantile normalization\n",
" if qnorm.lower() == 'qnorm':\n",
" result_df = qtl.norm.inverse_normal_transform(norm_counts_df)\n",
" elif qnorm.lower() == 'none':\n",
" result_df = norm_counts_df\n",
" else:\n",
" raise ValueError(f'Unsupported qnorm mode {qnorm}')\n",
" \n",
" return result_df\n",
" \n",
" # Main logic (from the original __main__ block)\n",
" print('Loading expression data', flush=True)\n",
" \n",
" # Handle sample_ids if provided\n",
" if sample_ids is not None:\n",
" if isinstance(sample_ids, str): # If it's a file path\n",
" with open(sample_ids) as f:\n",
" sample_ids = f.read().strip().split('\\n')\n",
" print(f' * Loading {len(sample_ids)} samples', flush=True)\n",
" \n",
" # Load expression data\n",
" counts_df = qtl.io.read_gct(counts_gct, sample_ids=sample_ids, load_description=False)\n",
" tpm_df = qtl.io.read_gct(tpm_gct, sample_ids=sample_ids, load_description=False)\n",
" \n",
" # Load sample to participant mapping\n",
" sample_to_participant_s = pd.read_csv(sample_to_participant, sep='\\t', index_col=0,\n",
" header=None, dtype=str).squeeze('columns')\n",
" \n",
" # Check inputs\n",
" if not counts_df.columns.equals(tpm_df.columns):\n",
" raise ValueError('Sample IDs in the TPM and read counts files must match.')\n",
" missing_ids = ~counts_df.columns.isin(sample_to_participant_s.index)\n",
" if missing_ids.any():\n",
" raise ValueError(f\"Sample IDs in expression files and participant lookup table must match ({missing_ids.sum()} sample IDs missing from {os.path.basename(sample_to_participant)}).\")\n",
" \n",
" # Convert to TPM if requested\n",
" if convert_tpm:\n",
" print(' * Converting to TPM', flush=True)\n",
" tpm_df = tpm_df / tpm_df.sum(0) * 1e6\n",
" \n",
" # Normalize data\n",
" print(f'Normalizing data ({normalization_method})', flush=True)\n",
" norm_df = prepare_expression(counts_df, tpm_df,\n",
" sample_frac_threshold=sample_frac_threshold,\n",
" count_threshold=count_threshold,\n",
" tpm_threshold=tpm_threshold,\n",
" mode=normalization_method,\n",
" qnorm=qnorm)\n",
" print(f' * {counts_df.shape[0]} genes in input tables', flush=True)\n",
" print(f' * {norm_df.shape[0]} genes remain after thresholding', flush=True)\n",
" \n",
" # Change sample IDs to participant IDs\n",
" norm_df.rename(columns=sample_to_participant_s, inplace=True)\n",
" \n",
" # Handle chromosome list\n",
" if chrs is not None:\n",
" if isinstance(chrs, str): # If it's a file path\n",
" with open(chrs) as f:\n",
" chrs = f.read().strip().split('\\n')\n",
" else:\n",
" chrs = [f'chr{i}' for i in range(1,23)] + ['chrX']\n",
" \n",
" # Prepare BED\n",
" bed_template_df = qtl.io.gtf_to_tss_bed(annotation_gtf, feature='transcript')\n",
" bed_template_df = bed_template_df[bed_template_df['chr'].isin(chrs)]\n",
" bed_df = pd.merge(bed_template_df, norm_df, left_index=True, right_index=True)\n",
" qtl.io.sort_bed(bed_df, inplace=True)\n",
" print(f' * {bed_df.shape[0]} genes remain after selecting chromosomes', flush=True)\n",
" \n",
" # Write output\n",
" print('Writing BED file', flush=True)\n",
" output_file = os.path.join(output_dir, f'{prefix}.expression.bed.gz')\n",
" if not parquet:\n",
" qtl.io.write_bed(bed_df, output_file)\n",
" else:\n",
" output_file = os.path.join(output_dir, f'{prefix}.expression.bed.parquet')\n",
" bed_df.to_parquet(output_file)\n",
" \n",
" return output_file\n",
" \n",
" # ============= Now use the function in your workflow =============\n",
" \n",
" # Check for duplicates first\n",
" sample_map = pd.read_table(\"${_input[3]}\")\n",
" duplicated = sample_map.loc[sample_map.duplicated(subset=['participant_id'])]\n",
"\n",
" \n",
" if duplicated.shape[0] > 0:\n",
" print(\"Duplicate samples found. Please remove duplicates from ${_input[3]} before normalizing.\")\n",
" print(\"Duplicates:\")\n",
" print(duplicated)\n",
" raise ValueError\n",
" else:\n",
" print(\"No duplicates found. Proceeding with normalization...\")\n",
"\n",
"bash: expand = \"${ }\", stderr = f'{_output[0]:nn}.stderr', stdout = f'{_output[0]:nn}.stdout',container = container, entrypoint = entrypoint\n",
" for i in {1..22} X Y MT; do echo chr$i; done > ${_output[0]:bnnn}.vcf_chr_list\n",
" eqtl_prepare_expression ${_input[0]} ${_input[1]} ${_input[2]} \\\n",
" ${_input[3]} ${_output[0]:nnn} \\\n",
" --chrs ${_output[0]:bnnn}.vcf_chr_list \\\n",
" --tpm_threshold ${tpm_threshold} \\\n",
" --count_threshold ${count_threshold} \\\n",
" --sample_frac_threshold ${sample_frac_threshold} \\\n",
" --normalization_method ${normalization_method} && \\\n",
" rm -f ${_output[0]:bnnn}.vcf_chr_list"
" \n",
" # Create chromosome list file\n",
" chr_file = \"${_output[0]:bnnn}.vcf_chr_list\"\n",
" chrs = [f'chr{i}' for i in range(1,23)] + ['chrX', 'chrY', 'chrMT']\n",
" with open(chr_file, 'w') as f:\n",
" for chr in chrs:\n",
" f.write(chr + '\\n')\n",
" \n",
" # Set qnorm parameter based on quantile_normalize flag\n",
" qnorm_param = 'qnorm' if ${quantile_normalize} else 'none'\n",
" \n",
" # Call the eqtl_prepare_expression function\n",
" output_file = eqtl_prepare_expression(\n",
" tpm_gct=\"${_input[0]}\",\n",
" counts_gct=\"${_input[1]}\",\n",
" annotation_gtf=\"${_input[2]}\",\n",
" sample_to_participant=\"${_input[3]}\",\n",
" prefix=\"${_output[0]:nnn}\",\n",
" output_dir=\".\",\n",
" sample_ids=None,\n",
" chrs=chr_file,\n",
" convert_tpm=False,\n",
" tpm_threshold=${tpm_threshold},\n",
" count_threshold=${count_threshold},\n",
" sample_frac_threshold=${sample_frac_threshold},\n",
" normalization_method=\"${normalization_method}\",\n",
" qnorm=qnorm_param,\n",
" parquet=False\n",
" )\n",
" \n",
" # Clean up temporary file\n",
" if os.path.exists(chr_file):\n",
" os.remove(chr_file)\n",
" \n",
" print(f\"Preparing expression completed. Output saved to: {output_file}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1b887bc9-1438-442a-9d5b-057975c930ac",
"metadata": {
"kernel": "SoS"
},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down