Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ompi/communicator/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,11 @@ int ompi_comm_get_rprocs (ompi_communicator_t *local_comm, ompi_communicator_t *
goto err_exit;
}

/* When a process gets spawned, every local_comm process needs to create
* an intercomm with the spawnees to communicate. These spawned procs needs
* to be remembered for cleaning later on */
ompi_proc_retain_spawned_jobids(rprocs, rsize);

err_exit:
/* rprocs isn't freed unless we have an error,
since it is used in the communicator */
Expand Down
5 changes: 4 additions & 1 deletion ompi/dpm/dpm.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ int ompi_dpm_connect_accept(ompi_communicator_t *comm, int root,
} while (!opal_list_is_empty(&ilist));

/* call add_procs on the new ones */
rc = MCA_PML_CALL(add_procs(new_proc_list, opal_list_get_size(&ilist)));
rc = MCA_PML_CALL(add_procs(new_proc_list, i));
/* Register spawned procs names to clean them up after */
ompi_proc_retain_spawned_jobids(new_proc_list, i);

free(new_proc_list);
new_proc_list = NULL;
if (OMPI_SUCCESS != rc) {
Expand Down
82 changes: 80 additions & 2 deletions ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "instance.h"

#include "opal/util/arch.h"
#include "opal/util/proc.h"

#include "opal/util/show_help.h"
#include "opal/util/argv.h"
Expand All @@ -39,6 +40,7 @@
#include "ompi/dpm/dpm.h"
#include "ompi/file/file.h"
#include "ompi/mpiext/mpiext.h"
#include "ompi/runtime/ompi_rte.h"

#include "ompi/mca/hook/base/base.h"
#include "ompi/mca/op/base/base.h"
Expand Down Expand Up @@ -110,13 +112,17 @@ static void ompi_instance_construct (ompi_instance_t *instance)
instance->i_name[0] = '\0';
instance->i_flags = 0;
instance->i_keyhash = NULL;
OBJ_CONSTRUCT(&instance->i_spawned_proc_namelists, opal_list_t);
OBJ_CONSTRUCT(&instance->i_spawned_proc_lock, opal_mutex_t);
OBJ_CONSTRUCT(&instance->s_lock, opal_mutex_t);
instance->errhandler_type = OMPI_ERRHANDLER_TYPE_INSTANCE;
instance->bsend_buffer = NULL;
}

static void ompi_instance_destruct(ompi_instance_t *instance)
{
OBJ_DESTRUCT(&instance->i_spawned_proc_namelists);
OBJ_DESTRUCT(&instance->i_spawned_proc_lock);
OBJ_DESTRUCT(&instance->s_lock);
}

Expand Down Expand Up @@ -177,18 +183,90 @@ static int ompi_instance_print_error (const char *error, int ret)
return ret;
}

/* This function is only needed for the world paradigm because it's the only one
* we can spawn processes in it for now */
void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_procs, size_t list_size) {
ompi_proc_t *spawned_proc;
opal_namelist_t *registered_proc;
ompi_process_name_t name;
ompi_rte_cmp_bitmask_t mask;

/* NULL if session paradigm, not NULL if world paradigm */
if (ompi_mpi_instance_default == NULL) {
return;
}

/* return the proc-struct which matches this jobid */
mask = OMPI_RTE_CMP_JOBID;

opal_mutex_lock(&ompi_mpi_instance_default->i_spawned_proc_lock);
for (size_t i = 0; i < list_size; i++) {
/* The idea is to filter the procs that have the same jobid,
* aka the jobs in the same instance.
* After that we lookup if the jobid is already present, meaning this
* instance is already registered via the jobid of its procs.
* If the jobid is not present we add it */

int found = 0;
spawned_proc = spawned_procs[i];
if (OMPI_PROC_MY_NAME->jobid == spawned_proc->super.proc_name.jobid) {
continue;
}

name.jobid = spawned_proc->super.proc_name.jobid;
name.vpid = spawned_proc->super.proc_name.vpid;

OPAL_LIST_FOREACH(registered_proc,
&ompi_mpi_instance_default->i_spawned_proc_namelists,
opal_namelist_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask,
&registered_proc->name, &name)) {
found = 1;
break;
}
}

if (0 == found) {
opal_namelist_t *namelist = OBJ_NEW(opal_namelist_t);
namelist->name.jobid = name.jobid;
namelist->name.vpid = 0; /* not needed for lookup */
opal_list_append(&ompi_mpi_instance_default->i_spawned_proc_namelists,
&namelist->super);
}
}
opal_mutex_unlock(&ompi_mpi_instance_default->i_spawned_proc_lock);

return;
}

static int ompi_mpi_instance_cleanup_pml (void)
{
/* call del_procs on all allocated procs even though some may not be known
* to the pml layer. the pml layer is expected to be resilient and ignore
* any unknown procs. */
size_t nprocs = 0;
ompi_proc_t **procs;
opal_namelist_t *registered_name, *next;

procs = ompi_proc_get_allocated (&nprocs);
MCA_PML_CALL(del_procs(procs, nprocs));
free(procs);

/* If we are in a world paradigm and spawned processes we need to clean */
if (ompi_mpi_instance_default != NULL) {

/* Let's loop on all spawned jobids and del_proc the concerned procs */
OPAL_LIST_FOREACH_SAFE(registered_name, next,
&ompi_mpi_instance_default->i_spawned_proc_namelists,
opal_namelist_t) {

procs = ompi_proc_get_by_name(&registered_name->name, &nprocs);
MCA_PML_CALL(del_procs(procs, nprocs));
opal_list_remove_item(&ompi_mpi_instance_default->i_spawned_proc_namelists,
&registered_name->super);
}
}

return OMPI_SUCCESS;
}

Expand Down Expand Up @@ -992,14 +1070,14 @@ int ompi_mpi_instance_finalize (ompi_instance_t **instance)
{
int ret = OMPI_SUCCESS;

OBJ_RELEASE(*instance);

opal_mutex_lock (&instance_lock);
if (0 == opal_atomic_add_fetch_32 (&ompi_instance_count, -1)) {
ret = ompi_mpi_instance_finalize_common ();
}
opal_mutex_unlock (&instance_lock);

OBJ_RELEASE(*instance);

*instance = &ompi_mpi_instance_null.instance;

return ret;
Expand Down
12 changes: 11 additions & 1 deletion ompi/instance/instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ struct ompi_instance_t {

/* Attributes */
opal_hash_table_t *i_keyhash;
opal_mutex_t i_spawned_proc_lock;
opal_list_t i_spawned_proc_namelists;

/* index in Fortran <-> C translation array (for when I get around
* to implementing fortran support-- UGH) */
Expand Down Expand Up @@ -88,7 +90,7 @@ OBJ_CLASS_DECLARATION(ompi_instance_t);
* the PREDEFINED_COMMUNICATOR_PAD macro?
* A: Most likely not, but it would be good to check.
*/
#define PREDEFINED_INSTANCE_PAD 512
#define PREDEFINED_INSTANCE_PAD 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit problematic as it represents an ABI break. any code using MPI_SESSION_NULL would get a runtime warning about an object being resized if it was originally linked against libmpi.so from the 5.0.x release but was not being run against the libmpi.so from main or v6.0.x. Did you have to increase this to get the code to compile?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some run the spawn tests in the ompi-tests repo and did not see any regressions from main. Note several tests continued to fail but that's not due to changes in this PR.

Copy link
Author

@Dupratj Dupratj Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize it was an ABI break.
Yes the instance size is bigger than this PREDEFINED_INSTANCE_PAD with the additions of this PR. One thing I could do is use a pointer to a struct and allocate the needed memory after that way we would not exceed the 512 bytes limit.
Unless you see a better way

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to change the char i_name[MPI_MAX_OBJECT_NAME]; to being a pointer. See for example commit 2d68804 for how this was done for communicators

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may want to do that sort of a change in separate PR.


struct ompi_predefined_instance_t {
ompi_instance_t instance;
Expand Down Expand Up @@ -120,6 +122,14 @@ int ompi_mpi_instance_retain (void);
*/
void ompi_mpi_instance_release (void);

/**
* @brief Saves jobid of spawned procs to cleanup upon finalize
*
* @param[in] spawned_proc_list list of procs that were spawned
* @param[in] list_size size of the list of procs that were spawned
*/
void ompi_proc_retain_spawned_jobids(ompi_proc_t **spawned_proc_list, size_t list_size);

/**
* @brief Create a new MPI instance
*
Expand Down
25 changes: 20 additions & 5 deletions ompi/mca/pml/ubcl/pml_ubcl_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,18 @@ static enum ubcl_endpoint_type_t mca_pml_ubcl_get_higher_transport(
void mca_pml_ubcl_endpoint_retain(ompi_proc_t *proc)
{
mca_common_ubcl_endpoint_t *endpoint = NULL;
assert(NULL != proc);
if (NULL == proc) {
OPAL_OUTPUT_VERBOSE((90, mca_pml_ubcl_component.output,
"pml_ubcl_endpoint release : proc is NULL"));
return OMPI_SUCCESS;
}

endpoint = (proc)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
assert(NULL != endpoint);

if (NULL == endpoint) {
OPAL_OUTPUT_VERBOSE((50, mca_pml_ubcl_component.output,
"pml_ubcl_endpoint release : endpoint is NULL"));
return OMPI_SUCCESS;
}
opal_atomic_fetch_add_32(&endpoint->refcount, 1);
mca_pml_ubcl_component.nprocs++;
OBJ_RETAIN(proc);
Expand Down Expand Up @@ -423,10 +430,18 @@ int mca_pml_ubcl_endpoint_release(ompi_proc_t *proc)
ubcl_error_t ret = UBCL_SUCCESS;
int ompi_error = OMPI_SUCCESS;
mca_common_ubcl_endpoint_t *endpoint = NULL;
assert(NULL != proc);

if (NULL == proc) {
OPAL_OUTPUT_VERBOSE((90, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : proc is NULL"));
return OMPI_SUCCESS;
}

endpoint = (proc)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
assert(NULL != endpoint);

if (NULL == endpoint) {
OPAL_OUTPUT_VERBOSE((50, mca_pml_ubcl_component.output, "pml_ubcl_endpoint release : endpoint is NULL"));
return OMPI_SUCCESS;
}

endpoint_refcount = opal_atomic_sub_fetch_32(&endpoint->refcount, 1);
if (0 == endpoint_refcount) {
Expand Down
25 changes: 16 additions & 9 deletions ompi/proc/proc.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,25 +417,19 @@ int ompi_proc_world_size (void)
return ompi_process_info.num_procs;
}

ompi_proc_t **ompi_proc_get_allocated (size_t *size)
ompi_proc_t **ompi_proc_get_by_name(ompi_process_name_t *name, size_t *size)
{
ompi_proc_t **procs;
ompi_proc_t *proc;
size_t count = 0;
ompi_rte_cmp_bitmask_t mask;
ompi_process_name_t my_name;

/* check bozo case */
if (NULL == ompi_proc_local_proc) {
return NULL;
}
mask = OMPI_RTE_CMP_JOBID;
my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);

/* First count how many match this jobid */
opal_mutex_lock (&ompi_proc_lock);
OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), name)) {
++count;
}
}
Expand All @@ -450,7 +444,7 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size)
/* now save only the procs that match this jobid */
count = 0;
OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
/* DO NOT RETAIN THIS OBJECT - the reference count on this
* object will be adjusted by external callers. The intent
* here is to allow the reference count to drop to zero if
Expand All @@ -474,6 +468,19 @@ ompi_proc_t **ompi_proc_get_allocated (size_t *size)
return procs;
}

ompi_proc_t **ompi_proc_get_allocated (size_t *size)
{
ompi_process_name_t my_name;

/* check bozo case */
if (NULL == ompi_proc_local_proc) {
return NULL;
}
my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);

return ompi_proc_get_by_name(&my_name, size);
}

ompi_proc_t **ompi_proc_world (size_t *size)
{
ompi_proc_t **procs;
Expand Down
18 changes: 18 additions & 0 deletions ompi/proc/proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ OMPI_DECLSPEC ompi_proc_t** ompi_proc_world(size_t* size);

OMPI_DECLSPEC int ompi_proc_world_size (void);

/**
* Returns the list of proc with the given name
* Returns the list of proc associated with the jobid of the given
* name. If at least one proc with the jobid, then the name is known and we
* return the procs.
*
* @note The reference count of each process in the array is
* NOT incremented.
*
* @param[in] name Name containing the jobid of wanted processes
* @param[in] size Number of processes in the ompi_proc_t array
*
* @return Array of pointers to proc instances under the same name in the current
* MPI_COMM_WORLD, or NULL if there is an internal failure.
*/
OMPI_DECLSPEC ompi_proc_t **ompi_proc_get_by_name(ompi_process_name_t *name,
size_t *size);

/**
* Returns the list of proc instances associated with this job.
*
Expand Down