Skip to content

Commit ef78ca9

Browse files
committed
fix mongodb problem, only one worker can be spawned by a single
n3fit run, server now is started only if a connection to a database fails, --restart is removed, now only restart is allowed
1 parent a470562 commit ef78ca9

File tree

5 files changed

+212
-299
lines changed

5 files changed

+212
-299
lines changed

n3fit/src/n3fit/hyper_optimization/hyper_scan.py

Lines changed: 66 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515

1616
import copy
1717
import logging
18-
import os
1918

19+
import hyperopt
2020
from hyperopt.pyll.base import scope
2121
import numpy as np
2222

23-
import hyperopt
2423
from n3fit.backends import MetaLayer, MetaModel
2524
from n3fit.hyper_optimization.filetrials import FileTrials
2625

@@ -101,6 +100,8 @@ def optimizer_arg_wrapper(hp_key, option_dict):
101100
choice = hp_uniform(hp_key, min_lr, max_lr)
102101
elif sampling == "log":
103102
choice = hp_loguniform(hp_key, min_lr, max_lr)
103+
else:
104+
raise ValueError(f"Sampling {sampling} not understood")
104105
return choice
105106

106107

@@ -129,58 +130,49 @@ def hyper_scan_wrapper(replica_path_set, model_trainer, hyperscanner, max_evals=
129130
# Tell the trainer we are doing hpyeropt
130131
model_trainer.set_hyperopt(True, keys=hyperscanner.hyper_keys)
131132

132-
if hyperscanner.restart_hyperopt:
133-
# For parallel hyperopt restarts, extract the database tar file
134-
if hyperscanner.parallel_hyperopt:
135-
tar_file_to_extract = f"{replica_path_set}/{hyperscanner.db_name}.tar.gz"
136-
log.info("Restarting hyperopt run using the MongoDB database %s", tar_file_to_extract)
137-
MongoFileTrials.extract_mongodb_database(tar_file_to_extract, path=os.getcwd())
138-
else:
139-
# For sequential hyperopt restarts, reset the state of `FileTrials` saved in the pickle file
140-
pickle_file_to_load = f"{replica_path_set}/tries.pkl"
141-
log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load)
142-
trials = FileTrials.from_pkl(pickle_file_to_load)
143-
133+
# Generate the trials object, as a MongoFileTrial or a simple sequential FileTrial
144134
if hyperscanner.parallel_hyperopt:
145-
# start MongoDB database by launching `mongod`
146-
hyperscanner.mongod_runner.ensure_database_dir_exists()
147-
mongod = hyperscanner.mongod_runner.start()
135+
# If we are running in parallel:
136+
# 1) Check whether the database is already on, start it otherwise
137+
if not hyperscanner.mongod_runner.is_up():
138+
hyperscanner.mongod_runner.start()
148139

149-
# Generate the trials object
150-
if hyperscanner.parallel_hyperopt:
151-
# Instantiate `MongoFileTrials`
152-
# Mongo database should have already been initiated at this point
140+
# Instantiate `MongoFileTrials` as trials to give to the worker later
153141
trials = MongoFileTrials(
154142
replica_path_set,
155-
db_host=hyperscanner.db_host,
156-
db_port=hyperscanner.db_port,
157-
db_name=hyperscanner.db_name,
158-
num_workers=hyperscanner.num_mongo_workers,
143+
hyperscanner.mongod_runner,
144+
num_workers=1, # Only one worker per n3fit job will run
159145
parameters=hyperscanner.as_dict(),
160146
)
161147
else:
148+
# If we are not running in parallel, check whether there's a pickle to load and restart
149+
# For sequential hyperopt restarts, reset the state of `FileTrials` saved in the pickle file
150+
pickle_file_to_load = replica_path_set / "tries.pkl"
151+
if pickle_file_to_load.exists():
152+
log.info("Restarting hyperopt run using the pickle file %s", pickle_file_to_load)
153+
trials = FileTrials.from_pkl(pickle_file_to_load)
162154
# Instantiate `FileTrials`
163155
trials = FileTrials(replica_path_set, parameters=hyperscanner.as_dict())
164156

165157
# Initialize seed for hyperopt
166158
trials.rstate = np.random.default_rng(HYPEROPT_SEED)
159+
# And prepare the generic arguments to fmin
160+
fmin_args = {
161+
"fn": model_trainer.hyperparametrizable,
162+
"space": hyperscanner.as_dict(),
163+
"algo": hyperopt.tpe.suggest,
164+
"max_evals": max_evals,
165+
"trials": trials,
166+
"rstate": trials.rstate,
167+
}
167168

168-
# Call to hyperopt.fmin
169-
fmin_args = dict(
170-
fn=model_trainer.hyperparametrizable,
171-
space=hyperscanner.as_dict(),
172-
algo=hyperopt.tpe.suggest,
173-
max_evals=max_evals,
174-
trials=trials,
175-
rstate=trials.rstate,
176-
)
177169
if hyperscanner.parallel_hyperopt:
178170
trials.start_mongo_workers()
179-
hyperopt.fmin(**fmin_args, show_progressbar=True, max_queue_len=trials.num_workers)
171+
# TODO benchmark how the behaviour depends on max_queue_len (if it does)
172+
hyperopt.fmin(**fmin_args, show_progressbar=True, max_queue_len=4)
180173
trials.stop_mongo_workers()
181174
# stop mongod command and compress database
182-
hyperscanner.mongod_runner.stop(mongod)
183-
trials.compress_mongodb_database()
175+
hyperscanner.mongod_runner.stop()
184176
else:
185177
hyperopt.fmin(**fmin_args, show_progressbar=False, trials_save_file=trials.pkl_file)
186178

@@ -212,56 +204,47 @@ class HyperScanner:
212204
It takes cares of known correlation between parameters by tying them together
213205
It also provides methods for updating the parameter dictionaries after using hyperopt
214206
215-
It takes as inpujt the dictionaries defining the NN/fit and the hyperparameter scan
207+
It takes as input the dictionaries defining the NN/fit and the hyperparameter scan
216208
from the NNPDF runcard and substitutes in `parameters` samplers according to the
217209
`hyper_scan` dictionary.
218210
211+
In the sampling dict,
212+
213+
214+
Parameters
215+
----------
216+
`parameters`: dict
217+
the `fitting[parameters]` dictionary of the NNPDF runcard
218+
`sampling_dict`: dict
219+
the `hyperscan` dictionary of the NNPDF runcard defining the search space of the scan
220+
`steps`: int
221+
when taking discrete steps between two parameters, number of steps to take
219222
220-
# Arguments:
221-
- `parameters`: the `fitting[parameters]` dictionary of the NNPDF runcard
222-
- `sampling_dict`: the `hyperscan` dictionary of the NNPDF runcard defining
223-
the search space of the scan
224-
- `steps`: when taking discrete steps between two parameters, number of steps
225-
to take
226-
227-
# Parameters accepted by `sampling_dict`:
228-
- `stopping`:
229-
- min_epochs, max_epochs
230-
- min_patience, max_patience
231223
"""
232224

233-
def __init__(self, parameters, sampling_dict, steps=5):
225+
def __init__(
226+
self, parameters, sampling_dict, steps=5, db_host=None, db_port=None, db_path=None
227+
):
234228
self._original_parameters = parameters
235229
self.parameter_keys = parameters.keys()
236230
self.parameters = copy.deepcopy(parameters)
237231
self.steps = steps
238232

239-
# adding extra options for restarting
240-
restart_config = sampling_dict.get("restart")
241-
self.restart_hyperopt = True if restart_config else False
242-
243233
# adding extra options for parallel execution
244-
parallel_config = sampling_dict.get("parallel")
245-
if parallel_config is None:
246-
self.parallel_hyperopt = False
247-
elif _has_pymongo:
234+
self._db_path = db_path
235+
self._db_host = db_host
236+
self._db_port = db_port
237+
self.mongod_runner = None
238+
self.parallel_hyperopt = False
239+
240+
if db_path is not None:
241+
# If we get a db_path, assume we want to run in parallel, therefore check whether we can
242+
if not _has_pymongo:
243+
raise ModuleNotFoundError(
244+
"Could not import pymongo modules, please install with `.[parallelhyperopt]`"
245+
)
248246
self.parallel_hyperopt = True
249-
else:
250-
raise ModuleNotFoundError(
251-
"Could not import pymongo modules, please install with `.[parallelhyperopt]`"
252-
)
253-
254-
self.parallel_hyperopt = True if parallel_config else False
255-
256-
# setting up MondoDB options
257-
if self.parallel_hyperopt:
258-
# add output_path to db name to avoid conflicts
259-
db_name = f'{sampling_dict.get("db_name")}-{sampling_dict.get("output_path")}'
260-
self.db_host = sampling_dict.get("db_host")
261-
self.db_port = sampling_dict.get("db_port")
262-
self.db_name = db_name
263-
self.num_mongo_workers = sampling_dict.get("num_mongo_workers")
264-
self.mongod_runner = MongodRunner(self.db_name, self.db_port)
247+
self.mongod_runner = MongodRunner(self._db_path, self._db_host, self._db_port)
265248

266249
self.hyper_keys = set([])
267250

@@ -323,14 +306,11 @@ def _update_param(self, key, sampler):
323306

324307
if key not in self.parameter_keys and key != "parameters":
325308
raise ValueError(
326-
"Trying to update a parameter not declared in the `parameters` dictionary: {0} @ HyperScanner._update_param".format(
327-
key
328-
)
309+
f"Trying to update a parameter not declared in the `parameters` dictionary: {key} @ HyperScanner._update_param"
329310
)
330311

331312
self.hyper_keys.add(key)
332-
log.info("Adding key {0} with value {1}".format(key, sampler))
333-
313+
log.info(f"Adding key {key} with value {sampler}")
334314
self.parameters[key] = sampler
335315

336316
def stopping(self, min_epochs=None, max_epochs=None, min_patience=None, max_patience=None):
@@ -376,8 +356,8 @@ def optimizer(self, optimizers):
376356
]
377357
and will sample one from this list.
378358
379-
Note that the keys within the dictionary (`optimizer_name` and `learning_rate`) should be named
380-
as the keys used by the compiler of the model as they are used as they come.
359+
Note that the keys within the dictionary (`optimizer_name` and `learning_rate`)
360+
should be named as the keys used by the compiler of the model.
381361
"""
382362
# Get all accepted optimizer to check against
383363
all_optimizers = MetaModel.accepted_optimizers
@@ -393,7 +373,7 @@ def optimizer(self, optimizers):
393373
name = optimizer[optname_key]
394374
optimizer_dictionary = {optname_key: name}
395375

396-
if name not in all_optimizers.keys():
376+
if name not in all_optimizers:
397377
raise NotImplementedError(
398378
f"HyperScanner: Optimizer {name} not implemented in MetaModel.py"
399379
)
@@ -476,8 +456,8 @@ def architecture(
476456
else:
477457
if min_units is None or max_units is None:
478458
raise ValueError(
479-
"A max/min number of units must always be defined if the number of layers is to be sampled"
480-
"i.e., make sure you add the keywords 'min_units' and 'max_units' to the 'architecutre' dict"
459+
"A max/min number of units must always be defined when the number of layers"
460+
"is to be sampled, i.e., add 'min_units' and 'max_units' to 'architecture' dict"
481461
)
482462

483463
activation_key = "activation_per_layer"
@@ -497,7 +477,7 @@ def architecture(
497477
for n in n_layers:
498478
units = []
499479
for i in range(n):
500-
units_label = "nl{0}:-{1}/{0}".format(n, i)
480+
units_label = f"nl{n}:-{i}/{n}"
501481
units_sampler = hp_quniform(
502482
units_label, min_units, max_units, step_size=1, make_int=True
503483
)
@@ -516,7 +496,7 @@ def architecture(
516496
for ini_name in initializers:
517497
if ini_name not in imp_init_names:
518498
raise NotImplementedError(
519-
"HyperScanner: Initializer {0} not implemented in MetaLayer.py".format(ini_name)
499+
f"HyperScanner: Initializer {ini_name} not implemented in MetaLayer.py"
520500
)
521501
# For now we are going to use always all initializers and with default values
522502
ini_choices.append(ini_name)

0 commit comments

Comments
 (0)