Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 75 additions & 42 deletions src/biocatalyzer/bioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import logging
import multiprocessing
import os
import tempfile
import time
import uuid
from typing import Union
from pathlib import Path

import pandas as pd
from tqdm import tqdm
Expand All @@ -13,7 +15,7 @@
from biocatalyzer.chem import ChemUtils
from biocatalyzer.io_utils import Loaders

DATA_FILES = os.path.dirname(__file__)
DATA_FILES = Path(__file__).resolve().parent


class BioReactor:
Expand Down Expand Up @@ -59,7 +61,6 @@ def __init__(self,
# silence RDKit logger
ChemUtils.rdkit_logs(False)
self._compounds_path = compounds_path
self._output_path = output_path
self._neutralize = neutralize_compounds
self._organisms_path = organisms_path
self._reaction_rules_path = reaction_rules_path
Expand All @@ -68,7 +69,7 @@ def __init__(self,
self._set_up_files()
self._orgs = Loaders.load_organisms(self._organisms_path)
self._reaction_rules = Loaders.load_reaction_rules(self._reaction_rules_path, orgs=self._orgs)
self._set_output_path(self._output_path)
self._set_output_path(output_path)
self._compounds = Loaders.load_compounds(self._compounds_path, self._neutralize)
self._molecules_to_remove = Loaders.load_byproducts_to_remove(self._molecules_to_remove_path)
self._patterns_to_remove = Loaders.load_patterns_to_remove(self._patterns_to_remove_path)
Expand All @@ -77,7 +78,7 @@ def __init__(self,
self._n_jobs = multiprocessing.cpu_count()
else:
self._n_jobs = n_jobs
self._new_compounds_path = os.path.join(self._output_path, 'new_compounds.tsv')
self._new_compounds_path = Path(self._output_path) / 'new_compounds.tsv'
self._new_compounds = None

@property
Expand Down Expand Up @@ -393,15 +394,13 @@ def n_jobs(self, n_jobs: int):

def _set_up_files(self):
if self._reaction_rules_path == 'default':
self._reaction_rules_path = os.path.join(
DATA_FILES, 'data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2')
self._reaction_rules_path = DATA_FILES / 'data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2'
if self._molecules_to_remove_path == 'default':
self._molecules_to_remove_path = os.path.join(DATA_FILES, 'data/byproducts_to_remove/byproducts.tsv')
self._molecules_to_remove_path = DATA_FILES / 'data/byproducts_to_remove/byproducts.tsv'
if self._patterns_to_remove_path == 'default':
self._patterns_to_remove_path = os.path.join(DATA_FILES, 'data/patterns_to_remove/patterns.tsv')
self._patterns_to_remove_path = DATA_FILES / 'data/patterns_to_remove/patterns.tsv'

@staticmethod
def _set_output_path(output_path: str):
def _set_output_path(self, output_path: str):
"""
Make the output directory if it does not exist.

Expand All @@ -410,12 +409,15 @@ def _set_output_path(output_path: str):
output_path: str
The path to the output directory.
"""
if not os.path.exists(output_path):
os.makedirs(output_path)
output_path = Path(output_path)
if not output_path.exists():
output_path.mkdir(parents=True)
else:
if os.path.exists(output_path + '/results.tsv') or os.path.exists(output_path + '/new_compounds.tsv'):
raise FileExistsError(f"Results in {output_path} already exists. Define a different output path so "
f"that previous results are not overwritten.")
if (output_path / "results.tsv").exists() or (output_path / "new_compounds.tsv").exists():
raise FileExistsError(
f"Results in {output_path} already exist. Define a different output path so that previous results are not overwritten."
)
self._output_path = output_path

def _match_patterns(self, smiles: str):
"""
Expand Down Expand Up @@ -498,7 +500,7 @@ def _match_conditions(self, smiles: str):
bool
True if mol matches conditions to remove, False otherwise.
"""
if not smiles:
if smiles is None:
return False
if '*' in smiles:
return False
Expand Down Expand Up @@ -563,16 +565,16 @@ def process_results(self, save: bool = True, overwrite: bool = True):
results['EC_Numbers'] = results['EC_Numbers'].apply(lambda x: _merge_fields(x))
if save:
if overwrite:
results_file_proc = os.path.join(self._output_path, 'new_compounds.tsv')
results_file_proc = self._output_path / 'new_compounds.tsv'
results.to_csv(results_file_proc, sep='\t', index=False)
else:
results_file_proc = os.path.join(self._output_path, 'new_compounds_processed.tsv')
results_file_proc = self._output_path / 'new_compounds_processed.tsv'
results.to_csv(results_file_proc, sep='\t', index=False)
else:
results_file_proc = self._new_compounds_path
return results, results_file_proc

def _react_single(self, smiles: str, smarts: str):
def _react_single(self, smiles: str, smarts: str, result_queue: multiprocessing.Queue):
"""
React a single compound with a single reaction rule.
Writes the results to the output files.
Expand All @@ -583,40 +585,71 @@ def _react_single(self, smiles: str, smarts: str):
The smiles of the reactant.
smarts: str
The SMARTS string of the reaction.
result_queue: multiprocessing.Queue
The queue to store the results.
"""
reactants = self._reaction_rules[self._reaction_rules.SMARTS == smarts].Reactants.values[0]
reactants = reactants.replace("Any", smiles).split(';')
results = ChemUtils.react(reactants, smarts)
if len(results) > 0:
smiles_id = self._compounds[self._compounds.smiles == smiles].compound_id.values[0]
smarts_id = self._reaction_rules[self._reaction_rules.SMARTS == smarts].InternalID.values[0]
most_similar_products_set = set()
for i, result in enumerate(results):
products = result.split('>')[-1].split('.')
# keep only the most similar compound to the input compound
most_similar_product = ChemUtils.most_similar_compound(smiles, products)
most_similar_product = ChemUtils.smiles_to_isomerical_smiles(most_similar_product)
if most_similar_product not in most_similar_products_set:
most_similar_products_set.add(most_similar_product)
if self._match_conditions(most_similar_product):
if self._neutralize:
most_similar_product = ChemUtils.uncharge_smiles(most_similar_product)
ecs = self._get_ec_numbers(smarts_id)
with open(self._new_compounds_path, 'a') as f:
f.write(f"{smiles_id}\t{smiles}\t{smarts_id}\t{smiles_id}_{uuid.uuid4()}\t"
f"{most_similar_product}\t{result}\t{ecs}\n")
if len(results) == 0:
return
smiles_id = self._compounds[self._compounds.smiles == smiles].compound_id.values[0]
smarts_id = self._reaction_rules[self._reaction_rules.SMARTS == smarts].InternalID.values[0]
most_similar_products_set = set()
# Collect results in a list
output_rows = []
for result in results:
products = result.split('>')[-1].split('.')
most_similar_product = ChemUtils.most_similar_compound(smiles, products)
most_similar_product = ChemUtils.smiles_to_isomerical_smiles(most_similar_product)

if most_similar_product not in most_similar_products_set:
most_similar_products_set.add(most_similar_product)
if self._match_conditions(most_similar_product):
if self._neutralize:
most_similar_product = ChemUtils.uncharge_smiles(most_similar_product)
ecs = self._get_ec_numbers(smarts_id)
output_rows.append(f"{smiles_id}\t{smiles}\t{smarts_id}\t{smiles_id}_{uuid.uuid4()}\t"
f"{most_similar_product}\t{result}\t{ecs}\n")

# Write output to a temporary file, then add the filename to the result queue
if output_rows:
temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='\n')
with open(temp_file.name, 'w') as f:
f.writelines(output_rows)
result_queue.put(temp_file.name)

def react(self):
"""
Transform reactants into products using the reaction rules.
Writes results incrementally and handles large files.
"""
t0 = time.time()
with open(self._new_compounds_path, 'w') as f:
f.write('OriginalCompoundID\tOriginalCompoundSmiles\tOriginalReactionRuleID\tNewCompoundID\t'
'NewCompoundSmiles\tNewReactionSmiles\tEC_Numbers\n')
header = (
'OriginalCompoundID\tOriginalCompoundSmiles\tOriginalReactionRuleID\tNewCompoundID\t'
'NewCompoundSmiles\tNewReactionSmiles\tEC_Numbers\n'
)
# Ensure header is written to the final output file
with open(self._new_compounds_path, 'w', newline='\n') as f:
f.write(header)

params = list(itertools.product(self._compounds.smiles, self._reaction_rules.SMARTS))
with multiprocessing.Pool(self._n_jobs) as pool:
pool.starmap(self._react_single, tqdm(params, total=len(params)))
# Create a multiprocessing Manager to hold the result queue
with multiprocessing.Manager() as manager:
result_queue = manager.Queue()

# Start the multiprocessing pool
with multiprocessing.Pool(self._n_jobs) as pool:
pool.starmap(self._react_single, [(smiles, smarts, result_queue) for smiles, smarts in params])

# Once all processes are done, write the results from all temporary files
with open(self._new_compounds_path, 'a', newline='\n') as f:
while not result_queue.empty():
temp_file = result_queue.get()
with open(temp_file, 'r') as temp_f:
f.write(temp_f.read())
os.remove(temp_file) # Clean up the temporary file

self._new_compounds = f"New products saved to {self._new_compounds_path}"
t1 = time.time()
logging.info(f"Time elapsed: {t1 - t0} seconds")
Expand Down
8 changes: 4 additions & 4 deletions src/biocatalyzer/clis/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
import os
from pathlib import Path

import click

from biocatalyzer.bioreactor import BioReactor
from biocatalyzer.matcher import MSDataMatcher

DATA_FILES = os.path.dirname(__file__)
DATA_FILES = Path(__file__).resolve().parent


@click.command()
Expand Down Expand Up @@ -105,8 +105,8 @@ def biocatalyzer_cli(compounds,
logging.basicConfig(filename=f'{output_path}logging.log', level=logging.DEBUG)
if reaction_rules is None:
logging.info(f"Using default reaction rules file.")
reaction_rules = os.path.join(
DATA_FILES, '../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2')
reaction_rules = DATA_FILES / "../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2"
reaction_rules = reaction_rules.resolve()
br = BioReactor(compounds_path=compounds,
output_path=output_path,
reaction_rules_path=reaction_rules,
Expand Down
8 changes: 4 additions & 4 deletions src/biocatalyzer/clis/cli_bioreactor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
import os
from pathlib import Path

import click

from biocatalyzer import BioReactor

DATA_FILES = os.path.dirname(__file__)
DATA_FILES = Path(__file__).resolve().parent


@click.command()
Expand Down Expand Up @@ -82,8 +82,8 @@ def bioreactor_cli(compounds,
output_path: Path to the output directory.
"""
if reaction_rules is None:
reaction_rules = os.path.join(
DATA_FILES, '../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2')
reaction_rules = DATA_FILES / "../data/reactionrules/reaction_rules_biocatalyzer.tsv.bz2"
reaction_rules = reaction_rules.resolve()
br = BioReactor(compounds_path=compounds,
output_path=output_path,
reaction_rules_path=reaction_rules,
Expand Down
24 changes: 15 additions & 9 deletions src/biocatalyzer/io_utils/loaders.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
import os
from pathlib import Path
from typing import Union, List

import pandas as pd
from rdkit.Chem import MolFromSmarts, MolFromSmiles
Expand Down Expand Up @@ -30,6 +32,7 @@ def load_compounds(path: str, neutralize: bool = False):
pandas dataframe with the compounds to use.
"""
if Loaders._verify_file(path):
path = Path(path)
compounds = pd.read_csv(path, header=0, sep='\t')
if 'smiles' not in compounds.columns:
raise ValueError('The compounds file must contain a column named "smiles".')
Expand All @@ -47,7 +50,7 @@ def load_compounds(path: str, neutralize: bool = False):
raise FileNotFoundError(f"File {path} not found.")

@staticmethod
def load_reaction_rules(path, orgs='ALL'):
def load_reaction_rules(path: str, orgs: Union[str, List[str]] = 'ALL') -> pd.DataFrame:
"""
Load the reaction rules to use.

Expand All @@ -65,7 +68,8 @@ def load_reaction_rules(path, orgs='ALL'):
"""
if not Loaders._verify_file(path):
raise FileNotFoundError(f"File {path} not found.")
if path.endswith('.bz2'):
path = Path(path)
if path.suffix == '.bz2':
rules = pd.read_csv(path, header=0, sep='\t', compression='bz2')
else:
rules = pd.read_csv(path, header=0, sep='\t')
Expand All @@ -87,15 +91,14 @@ def match_org(value, orgs_list):
return False

if not isinstance(orgs, str):
# TODO: check if adding spontaneous reactions actually makes sense
orgs.append('spontaneous_reaction')
rules['has_org'] = rules.apply(lambda x: match_org(x['Organisms'], orgs), axis=1)
rules = rules[rules['has_org']]
rules.drop('has_org', axis=1, inplace=True)
return rules

@staticmethod
def load_organisms(path):
def load_organisms(path: str) -> Union[str, List[str]]:
"""
Load the organisms to use.

Expand All @@ -106,17 +109,18 @@ def load_organisms(path):

Returns
-------
pd.DataFrame:
pandas dataframe with the organisms to use.
Union[str, List[str]]:
List of organisms identifiers.
"""
if path is None or path == 'None':
return 'ALL'
if Loaders._verify_file(path):
path = Path(path)
orgs = pd.read_csv(path, header=0, sep='\t')
if 'org_id' not in orgs.columns:
raise ValueError('The organisms file must contain a column named "org_id".')
logging.info(f'Using {list(orgs.org_id.values)} as the Organisms.')
return list(orgs.org_id.values)
logging.info(f'Using {orgs.org_id.to_list()} as the Organisms.')
return orgs.org_id.to_list()
elif len(path.split('.')) > 1:
raise FileNotFoundError(f"File {path} not found.")
else:
Expand All @@ -140,6 +144,7 @@ def load_byproducts_to_remove(path):
"""
if path is None or path == 'None':
return []
path = Path(path)
byproducts = pd.read_csv(path, header=0, sep='\t')
if 'smiles' not in byproducts.columns:
raise ValueError('The molecules to remove file must contain a column named "smiles".')
Expand All @@ -162,6 +167,7 @@ def load_patterns_to_remove(path):
"""
if path is None or path == 'None':
return []
path = Path(path)
patterns = pd.read_csv(path, header=0, sep='\t')
if 'smarts' not in patterns.columns:
raise ValueError('The patterns to remove file must contain a column named "smarts".')
Expand All @@ -182,7 +188,7 @@ def _verify_file(path: str):
bool:
True if the file exists, False otherwise.
"""
if not os.path.exists(path):
if not Path(path).exists():
return False
return True

Expand Down
Loading
Loading