Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from ... import config, logging
from ...utils.misc import flatten, unflatten, str2bool, dict_diff
from ...utils.ram_estimator import RamEstimator
from ...utils.filemanip import (
md5,
ensure_list,
Expand Down Expand Up @@ -207,6 +208,11 @@ def __init__(
self.needed_outputs = needed_outputs
self.config = None

# Dynamic RAM estimator
self.ram_estimator = None
self.ram_estimator_str = None
self._ram_estimated = False

if hasattr(self._interface, "write_cmdline"):
self._interface.write_cmdline = True

Expand Down Expand Up @@ -258,6 +264,21 @@ def mem_gb(self):

return self._mem_gb

@property
def mem_gb_runtime(self):
"""Get estimated memory (GB), updated based on input if a ram estimator was specified"""
if (
self.ram_estimator is not None
and self._ram_estimated is False
and isinstance(self.ram_estimator, RamEstimator)
):
self._get_inputs()
self._mem_gb, self.ram_estimator_str = self.ram_estimator(self.inputs)
self._ram_estimated = True

return self.mem_gb


@property
def n_procs(self):
"""Get the estimated number of processes/threads"""
Expand Down Expand Up @@ -1234,6 +1255,7 @@ def _make_nodes(self, cwd=None):
**deepcopy(self._interface.inputs.trait_get())
)
node.interface.resource_monitor = self._interface.resource_monitor
node.ram_estimator = self.ram_estimator
for field in self.iterfield:
if self.nested:
fieldvals = flatten(ensure_list(getattr(self.inputs, field)))
Expand Down
3 changes: 3 additions & 0 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def write_node_report(node, result=None, is_mapnode=False):
if hasattr(result.runtime, prop):
rst_dict[prop] = getattr(result.runtime, prop)

if node.ram_estimator_str:
rst_dict["ram_estimator"] = node.ram_estimator_str

lines.append(write_rst_dict(rst_dict))

# Collect terminal output
Expand Down
3 changes: 2 additions & 1 deletion nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
if not submit:
continue


# Check requirements of this job
next_job_gb = min(self.procs[jobid].mem_gb, self.memory_gb)
next_job_gb = min(self.procs[jobid].mem_gb_runtime, self.memory_gb)
next_job_th = min(self.procs[jobid].n_procs, self.processors)
next_job_gpu_th = min(self.procs[jobid].n_procs, self.n_gpu_procs)

Expand Down
230 changes: 230 additions & 0 deletions nipype/utils/ram_estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
import nibabel as nib
import math
from ..interfaces.base import isdefined
import os


class RamEstimator:
"""
Base class for Nipype RAM estimators.

A RamEstimator provides a lightweight, user-defined mechanism to
estimate the peak RAM usage (in GB) of a Nipype node *before execution*,
based on the node inputs.

The estimator aggregates RAM contributions from selected input traits
using user-defined multipliers and returns:

- an estimated RAM value (mem_gb)
- a human-readable debug string describing the estimate

The estimator is intended to be attached to a Node or MapNode via the
``node.ram_estimator`` attribute and is evaluated automatically when
executing workflows with the ``MultiProcPlugin``.

Notes
-----
* The estimator is only used when running a workflow with
``MultiProcPlugin`` (or subclasses thereof).
* For ``MapNode`` instances, the estimator is inherited by all subnodes
and evaluated on a *single representative iteration*.
The resulting ``mem_gb`` is interpreted as the per-task memory
requirement (i.e., the worst-case memory for one mapped job).
* The debug string produced by the estimator is stored in the node
runtime report (``_report/report.rst``) under the ``runtime`` section.

Examples
--------
Define a tool-specific RAM estimator and attach it to a node::

class FlirtRamEstimator(RamEstimator):
def __init__(self):
super().__init__(
input_multipliers={
'in_file': 32,
'reference': 4,
},
overhead_gb=0.3,
min_gb=0.5,
max_gb=4.0
)

from nipype.interfaces.fsl import FLIRT
from nipype.pipeline.engine import Node

flirt = Node(
FLIRT(dof=6),
name="flirt"
)

flirt.ram_estimator = FlirtRamEstimator()
"""

def __init__(
self,
input_multipliers=None,
overhead_gb=0.3,
min_gb=0.5,
max_gb=8.0
):
"""
Parameters
----------
input_multipliers : dict, optional
Mapping ``input_name -> multiplier``.

The interpretation of the multiplier depends on the input type:

* File-like image inputs:
``multiplier`` scales with the total number of spatial voxels.
The contribution is computed as::

contribution_gb = voxels * multiplier / 1024**3

* Numeric inputs (int, float, or lists thereof):
``multiplier`` scales linearly with the numeric value(s).

The choice of multipliers is tool-specific and left to the user.

overhead_gb : float, optional
Fixed RAM overhead (in GB) added to the estimate to account for
control structures, buffers, and library overhead.

min_gb : float, optional
Minimum allowed RAM estimate (GB).

max_gb : float, optional
Maximum allowed RAM estimate (GB).
"""

self.input_multipliers = input_multipliers or {}
self.overhead_gb = overhead_gb
self.min_gb = min_gb
self.max_gb = max_gb

@staticmethod
def voxels(path):
"""Return number of spatial voxels (ignores time dimension)."""
img = nib.load(path)
shape = img.header.get_data_shape()
return math.prod(shape[:3])

@staticmethod
def clamp(value, min_val=None, max_val=None):
"""Clamp a value between min_val and max_val."""
if min_val is not None:
value = max(min_val, value)
if max_val is not None:
value = min(max_val, value)
return value

def __call__(self, inputs):
"""
Estimate RAM usage based on Nipype input traits.

- File-like image inputs contribute via voxel count
- Numeric inputs contribute via their numeric value
- Lists are supported for both files and numbers

Returns
-------
mem_gb : float
Estimated RAM in GB
estimator_string : str
Debug string for node report
"""
total_gb = 0.0
debug_lines = []

traits = inputs.traits()

for attr, multiplier in self.input_multipliers.items():
if attr not in traits:
debug_lines.append(f"{attr}: trait not found")
continue

val = getattr(inputs, attr, None)

if not isdefined(val) or val is None:
debug_lines.append(f"{attr}: undefined")
continue

# --------------------------------------------------
# FILE-LIKE VALUES (string or list/tuple of strings)
# --------------------------------------------------
paths = None

if isinstance(val, str):
paths = [val]

elif isinstance(val, (list, tuple)) and any(isinstance(v, str) for v in val):
paths = [v for v in val if isinstance(v, str)]

if paths is not None:
vox_total = 0
valid_files = 0

for p in paths:
if not isinstance(p, str) or not os.path.exists(p):
continue
try:
vox_total += self.voxels(p)
valid_files += 1
except Exception:
# exists but not a readable image (e.g. txt)
continue

if valid_files > 0:
contribution = vox_total * multiplier / (1024 ** 3)
total_gb += contribution

debug_lines.append(
f"{attr}: voxels={vox_total}, multiplier={multiplier}, "
f"contribution={contribution:.3f} GB"
)
else:
debug_lines.append(f"{attr}: no readable image files")

continue

# --------------------------------------------------
# NUMERIC VALUES (scalar or list/tuple of numbers)
# --------------------------------------------------
values = None

if isinstance(val, (int, float)):
values = [val]

elif isinstance(val, (list, tuple)) and all(isinstance(v, (int, float)) for v in val):
values = val

if values is not None:
contribution = sum(float(v) for v in values) * multiplier
total_gb += contribution

debug_lines.append(
f"{attr}: values={values}, multiplier={multiplier}, "
f"contribution={contribution:.3f} GB"
)
continue

# --------------------------------------------------
# UNSUPPORTED TYPE
# --------------------------------------------------
debug_lines.append(
f"{attr}: unsupported value type ({type(val).__name__})"
)

# ------------------------------------------------------
# OVERHEAD + CLAMP
# ------------------------------------------------------
mem_gb = total_gb + self.overhead_gb
debug_lines.append(
f"Overhead={self.overhead_gb} GB, total estimated RAM={mem_gb:.3f} GB"
)

mem_gb = self.clamp(mem_gb, self.min_gb, self.max_gb)
debug_lines.append(f"Clamp={mem_gb:.3f} GB")

estimator_string = " | ".join(debug_lines)
return float(mem_gb), estimator_string
Loading