From 035484766e5dd2610572c4ac6c05e5909f411bcb Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 9 Dec 2025 15:42:15 +0100 Subject: [PATCH 01/13] Basic functions and small test --- include/CMakeLists.txt | 2 +- include/graphblas/bsp/rdma.hpp | 252 +++++++++++++++++++++++++++++++++ tests/unit/CMakeLists.txt | 4 + tests/unit/rdma.cpp | 82 +++++++++++ 4 files changed, 339 insertions(+), 1 deletion(-) create mode 100644 include/graphblas/bsp/rdma.hpp create mode 100644 tests/unit/rdma.cpp diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 41265746c..a9c6c6231 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -50,7 +50,7 @@ set( root_files "graphblas/io.hpp" "graphblas/iomode.hpp" "graphblas/matrix.hpp" "graphblas/monoid.hpp" "graphblas/ops.hpp" "graphblas/phase.hpp" "graphblas/pinnedvector.hpp" "graphblas/properties.hpp" "graphblas/rc.hpp" - "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/tags.hpp" + "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/rdma.hpp" "graphblas/tags.hpp" "graphblas/type_traits.hpp" "graphblas/utils.hpp" "graphblas/vector.hpp" "graphblas/synchronizedNonzeroIterator.hpp" "graphblas/nonzeroStorage.hpp" "graphblas/selection_ops.hpp" diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp new file mode 100644 index 000000000..24c62b433 --- /dev/null +++ b/include/graphblas/bsp/rdma.hpp @@ -0,0 +1,252 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * @author G. Gaio + */ + +#ifndef _H_GRB_LPF_RDMA +#define _H_GRB_LPF_RDMA + +#include +#include //size_t + +#include + +#include +#include // get_request, put_request + + +namespace grb { + + /** Superclass implementation for all LPF-backed implementations. */ + class rdma { + private: + std::map< const void* , std::pair< size_t, const lpf_memslot_t > > registered_slots; + std::map< const lpf_memslot_t , const void* > memslots; + + size_t memory_register_size = 0; + + /* + * This is a collective function. It must be called by all the processes with + * the same parameters!! + */ + RC resize_memory_register( const size_t new_register_size ) { + if( new_register_size < memory_register_size ) return grb::SUCCESS; + + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_err_t lpf_rc = LPF_SUCCESS; + + lpf_rc = lpf_rc ? lpf_rc : lpf_resize_memory_register( data.context, new_register_size ); + lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); + if( lpf_rc == LPF_SUCCESS ) { + memory_register_size = new_register_size; + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + + + /** + * Writes a message to another process' registered memory + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename IOType > + RC put( IOType* msg, const size_t &size, const size_t dst_pid, const lpf_memslot_t &dst_memslot ) { + const auto lpf_attr = LPF_MSG_DEFAULT; + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_err_t lpf_rc = LPF_SUCCESS; + lpf_memslot_t src_memslot; + + assert( memslots.find( dst_memslot ) != memslots.end() ); + + // TODO: checks for local registration! Already registered? Register full? + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &src_memslot ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + + /** + * Deregisters a local buffer for RDMA + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename IOType > + RC deregister( const lpf_memslot_t memslot ) { + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_err_t lpf_rc = LPF_SUCCESS; + + assert( registered_slots.size() < memory_register_size ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, &memslot ); + + // TODO: delete registered memory from map + auto it = memslots.find( memslot ); + registered_slots.erase( registered_slots.find( it->second ) ); + memslots.erase( it ); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + + public: + + /* + * This is a collective function: it must be called by all the processes with + * the same parameters!! + */ + rdma( const size_t register_size = 2 ) { + + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_rc = lpf_rc ? lpf_rc : lpf_resize_message_queue( data.context, 42 ); + + registered_slots.clear(); + memslots.clear(); + memory_register_size = 0; + + grb::rdma::resize_memory_register( register_size ); + } + + /* + * This is a collective function: it must be called by all the processes with + * the same parameters!! + */ + ~rdma() { + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_err_t lpf_rc = LPF_SUCCESS; + +#ifdef _DEBUG + std::cerr << "RDMA finalize called." << std::endl; + std::cerr << "\t" << registered_slots.size() << std::endl; +#endif + + for(auto it = registered_slots.begin(); lpf_rc == LPF_SUCCESS && it != registered_slots.end() ; it++){ + const lpf_memslot_t memslot = ((it->second).second); + const auto memslot_it = memslots.find( memslot ); + assert( memslot_it != memslots.end() ); + memslots.erase( memslot_it ); + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, memslot ); + } + lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); + + if( lpf_rc == LPF_SUCCESS ) { + registered_slots.clear(); + memory_register_size = 0; + } + } + /** + * Registers a buffer for RDMA + * + * This is a collective operation! + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename IOType > + RC register_global( IOType* buf, const size_t size, lpf_memslot_t &memslot ) { + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_err_t lpf_rc = LPF_SUCCESS; + + while( memory_register_size <= registered_slots.size() + 1 ){ + grb::rdma::resize_memory_register( 2*memory_register_size ); + } + + assert( registered_slots.find( buf ) == registered_slots.end() ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( + data.context, + static_cast< void* >( buf ), + size, &memslot + ); + lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); + + registered_slots.insert({ static_cast< const void* >( buf ), + std::make_pair( size, memslot ) }); + memslots.insert({ memslot, static_cast< const void* >( buf ) }); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + + template< typename IOType > + inline RC register_global( IOType &buf, lpf_memslot_t &memslot ) { + return register_global( &buf, sizeof(IOType), memslot ); + } + + /** + * Reads a message from another process' registered memory + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename IOType > + RC get( IOType* msg, const size_t size, const size_t &src_pid, const lpf_memslot_t &src_memslot ) { + const auto lpf_attr = LPF_MSG_DEFAULT; + const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + lpf_memslot_t dst_memslot; + lpf_err_t lpf_rc = LPF_SUCCESS; + + // TODO: checks for local registration! Already registered? Register full? + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &dst_memslot ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, dst_memslot ); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + template< typename IOType > + inline RC get( IOType &msg, const size_t src_pid, const lpf_memslot_t &src_memslot ) { + return get( &msg, sizeof(IOType), src_pid, src_memslot ); + } + + template< typename IOType > + inline RC put( IOType&msg, const size_t dst_pid, const lpf_memslot_t&dst_memslot ) { + return put( &msg, sizeof(IOType), dst_pid, dst_memslot ); + } + + + }; // end class ``rdma'' generic LPF implementation + +} // namespace grb + +#endif // end _H_GRB_LPF_RDMA diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 5cd03cf77..e197c1ea7 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -374,6 +374,10 @@ add_grb_executables( launch_benchmark_frommpi_manual launcherAndBenchmarker.cpp COMPILE_DEFINITIONS NO_LPF_AUTO_INIT ) +add_grb_executables( rdma rdma.cpp + BACKENDS bsp1d +) + # targets to list and build the test for this category get_property( unit_tests_list GLOBAL PROPERTY tests_category_unit ) add_custom_target( "list_tests_category_unit" diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp new file mode 100644 index 000000000..4c7fe2665 --- /dev/null +++ b/tests/unit/rdma.cpp @@ -0,0 +1,82 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include "graphblas/bsp/rdma.hpp" + +void grbProgram( const size_t &in, grb::RC &exit_status ) { + (void) in; + grb::RC rc = grb::SUCCESS; + + const size_t s = grb::spmd<>::pid(); + const size_t P = grb::spmd<>::nprocs(); + assert( s < P ); + assert( P > 1 ); + // sleep( 10 ); + int x = 42, y = 69; + + grb::rdma rdma(4); + lpf_memslot_t memslot; + rc = rc ? rc : rdma.register_global( x, memslot ); + + assert( rc == grb::SUCCESS ); + if( s == 0 ){ + rc = rc ? rc : rdma.put( y, 1, memslot ); + x = 0; + } + assert( rc == grb::SUCCESS ); + rc = rc ? rc : grb::spmd<>::sync(); + assert( rc == grb::SUCCESS ); + assert( s != 1 || x == 69 ); + + rc = rc ? rc : grb::spmd<>::sync(); + + if( s == 1 ){ + rc = rc ? rc : rdma.get( y, 0, memslot ); + } + assert( rc == grb::SUCCESS ); + rc = rc ? rc : grb::spmd<>::sync(); + assert( rc == grb::SUCCESS ); + assert( s != 1 || y == 0 ); + + assert( rc == grb::SUCCESS ); + exit_status = rc; + return; +} + +int main( int argc, char ** argv ) { + (void) argc; + size_t in = 42; + grb::RC out; + + std::cout << "This is a functional test " << argv[ 0 ] << "\n"; + grb::Launcher< grb::AUTOMATIC > launcher; + if( launcher.exec( &grbProgram, in, out, true ) != grb::SUCCESS ) { + std::cerr << "Launching test FAILED\n"; + return 255; + } + + if( out == grb::SUCCESS ){ + std::cerr << "Test OK\n"; + }else{ + std::cerr << "Test FAILED\n"; + } +} + From 6e2f24f92e5d9da6eb0a88ff4452c99ad21c8e6a Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 9 Dec 2025 16:00:56 +0100 Subject: [PATCH 02/13] fixup! Basic functions and small test --- include/CMakeLists.txt | 2 +- include/graphblas/bsp/rdma.hpp | 2 +- tests/unit/unittests.sh | 6 ++++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index a9c6c6231..41265746c 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -50,7 +50,7 @@ set( root_files "graphblas/io.hpp" "graphblas/iomode.hpp" "graphblas/matrix.hpp" "graphblas/monoid.hpp" "graphblas/ops.hpp" "graphblas/phase.hpp" "graphblas/pinnedvector.hpp" "graphblas/properties.hpp" "graphblas/rc.hpp" - "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/rdma.hpp" "graphblas/tags.hpp" + "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/tags.hpp" "graphblas/type_traits.hpp" "graphblas/utils.hpp" "graphblas/vector.hpp" "graphblas/synchronizedNonzeroIterator.hpp" "graphblas/nonzeroStorage.hpp" "graphblas/selection_ops.hpp" diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 24c62b433..218ee8146 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -128,7 +128,7 @@ namespace grb { rdma( const size_t register_size = 2 ) { const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); - lpf_rc = lpf_rc ? lpf_rc : lpf_resize_message_queue( data.context, 42 ); + lpf_err_t lpf_rc = lpf_rc ? lpf_rc : lpf_resize_message_queue( data.context, 42 ); registered_slots.clear(); memslots.clear(); diff --git a/tests/unit/unittests.sh b/tests/unit/unittests.sh index 0b49d2421..936ad9a56 100755 --- a/tests/unit/unittests.sh +++ b/tests/unit/unittests.sh @@ -889,6 +889,12 @@ for MODE in ${MODES}; do echo ">>> [x] [ ] Testing BSP1D distribution." echo " " ${LPFRUN} -np 1 ${TEST_BIN_DIR}/distribution_${MODE} + + echo "RDMA unit tests for the BSP1D backend:" + echo " " + echo ">>> [x] [ ] Testing Remote Direct Memory Access interface" + echo " " + ${LPFRUN} -np 2 ${TEST_BIN_DIR}/rdma_${MODE}_bsp1d fi done From 401e5e331f6b832951cd8d7845ca77f4a671040f Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:14:50 +0100 Subject: [PATCH 03/13] Some integration with pre-existing stuff --- include/graphblas/bsp/rdma.hpp | 166 +++++++++---------------------- include/graphblas/bsp1d/init.hpp | 8 +- tests/unit/rdma.cpp | 7 +- 3 files changed, 58 insertions(+), 123 deletions(-) diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 218ee8146..40173b6ec 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -33,63 +33,7 @@ namespace grb { /** Superclass implementation for all LPF-backed implementations. */ - class rdma { - private: - std::map< const void* , std::pair< size_t, const lpf_memslot_t > > registered_slots; - std::map< const lpf_memslot_t , const void* > memslots; - - size_t memory_register_size = 0; - - /* - * This is a collective function. It must be called by all the processes with - * the same parameters!! - */ - RC resize_memory_register( const size_t new_register_size ) { - if( new_register_size < memory_register_size ) return grb::SUCCESS; - - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); - lpf_err_t lpf_rc = LPF_SUCCESS; - - lpf_rc = lpf_rc ? lpf_rc : lpf_resize_memory_register( data.context, new_register_size ); - lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); - if( lpf_rc == LPF_SUCCESS ) { - memory_register_size = new_register_size; - return grb::SUCCESS; - } else { - return grb::PANIC; - } - } - - - /** - * Writes a message to another process' registered memory - * - * @return grb::SUCCESS When all queued communication is executed succesfully. - * @return grb::PANIC When an unrecoverable error occurs. When this value is - * returned, the library enters an undefined state. - */ - template< typename IOType > - RC put( IOType* msg, const size_t &size, const size_t dst_pid, const lpf_memslot_t &dst_memslot ) { - const auto lpf_attr = LPF_MSG_DEFAULT; - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); - lpf_err_t lpf_rc = LPF_SUCCESS; - lpf_memslot_t src_memslot; - - assert( memslots.find( dst_memslot ) != memslots.end() ); - - // TODO: checks for local registration! Already registered? Register full? - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &src_memslot ); - - lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); - - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); - - if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; - } else { - return grb::PANIC; - } - } + namespace rdma { /** * Deregisters a local buffer for RDMA @@ -100,17 +44,18 @@ namespace grb { */ template< typename IOType > RC deregister( const lpf_memslot_t memslot ) { - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + internal::BSP1D_Data & data = internal::grb_BSP1D.load(); lpf_err_t lpf_rc = LPF_SUCCESS; - assert( registered_slots.size() < memory_register_size ); - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, &memslot ); // TODO: delete registered memory from map - auto it = memslots.find( memslot ); - registered_slots.erase( registered_slots.find( it->second ) ); - memslots.erase( it ); + const auto it = data.global_memslots.find( memslot ); + assert( it != data.global_memslots.end() ); + data.registered_slots.erase( data.registered_slots.find( it->second ) ); + data.global_memslots.erase( it ); + + data.signalMemslotReleased( 1 ); if( lpf_rc == LPF_SUCCESS ) { return grb::SUCCESS; @@ -119,53 +64,8 @@ namespace grb { } } - public: - - /* - * This is a collective function: it must be called by all the processes with - * the same parameters!! - */ - rdma( const size_t register_size = 2 ) { - - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); - lpf_err_t lpf_rc = lpf_rc ? lpf_rc : lpf_resize_message_queue( data.context, 42 ); - - registered_slots.clear(); - memslots.clear(); - memory_register_size = 0; - - grb::rdma::resize_memory_register( register_size ); - } - - /* - * This is a collective function: it must be called by all the processes with - * the same parameters!! - */ - ~rdma() { - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); - lpf_err_t lpf_rc = LPF_SUCCESS; - -#ifdef _DEBUG - std::cerr << "RDMA finalize called." << std::endl; - std::cerr << "\t" << registered_slots.size() << std::endl; -#endif - - for(auto it = registered_slots.begin(); lpf_rc == LPF_SUCCESS && it != registered_slots.end() ; it++){ - const lpf_memslot_t memslot = ((it->second).second); - const auto memslot_it = memslots.find( memslot ); - assert( memslot_it != memslots.end() ); - memslots.erase( memslot_it ); - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, memslot ); - } - lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); - - if( lpf_rc == LPF_SUCCESS ) { - registered_slots.clear(); - memory_register_size = 0; - } - } /** - * Registers a buffer for RDMA + * Registers a global buffer for RDMA * * This is a collective operation! * @@ -175,14 +75,13 @@ namespace grb { */ template< typename IOType > RC register_global( IOType* buf, const size_t size, lpf_memslot_t &memslot ) { - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + internal::BSP1D_Data & data = internal::grb_BSP1D.load(); lpf_err_t lpf_rc = LPF_SUCCESS; - while( memory_register_size <= registered_slots.size() + 1 ){ - grb::rdma::resize_memory_register( 2*memory_register_size ); - } + data.ensureMemslotAvailable( 1 ); + data.signalMemslotTaken(); - assert( registered_slots.find( buf ) == registered_slots.end() ); + assert( data.registered_slots.find( buf ) == data.registered_slots.end() ); lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( data.context, @@ -191,9 +90,9 @@ namespace grb { ); lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); - registered_slots.insert({ static_cast< const void* >( buf ), + data.registered_slots.insert({ static_cast< const void* >( buf ), std::make_pair( size, memslot ) }); - memslots.insert({ memslot, static_cast< const void* >( buf ) }); + data.global_memslots.insert({ memslot, static_cast< const void* >( buf ) }); if( lpf_rc == LPF_SUCCESS ) { return grb::SUCCESS; @@ -217,11 +116,12 @@ namespace grb { template< typename IOType > RC get( IOType* msg, const size_t size, const size_t &src_pid, const lpf_memslot_t &src_memslot ) { const auto lpf_attr = LPF_MSG_DEFAULT; - const internal::BSP1D_Data & data = internal::grb_BSP1D.cload(); + internal::BSP1D_Data & data = internal::grb_BSP1D.load(); lpf_memslot_t dst_memslot; lpf_err_t lpf_rc = LPF_SUCCESS; - // TODO: checks for local registration! Already registered? Register full? + data.ensureMemslotAvailable( 1 ); + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &dst_memslot ); lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); @@ -239,6 +139,36 @@ namespace grb { return get( &msg, sizeof(IOType), src_pid, src_memslot ); } + /** + * Writes a message to another process' registered memory + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename IOType > + RC put( IOType* msg, const size_t &size, const size_t dst_pid, const lpf_memslot_t &dst_memslot ) { + const auto lpf_attr = LPF_MSG_DEFAULT; + internal::BSP1D_Data & data = internal::grb_BSP1D.load(); + lpf_err_t lpf_rc = LPF_SUCCESS; + lpf_memslot_t src_memslot; + + data.ensureMemslotAvailable( 1 ); + assert( data.global_memslots.find( dst_memslot ) != data.global_memslots.end() ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &src_memslot ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } + template< typename IOType > inline RC put( IOType&msg, const size_t dst_pid, const lpf_memslot_t&dst_memslot ) { return put( &msg, sizeof(IOType), dst_pid, dst_memslot ); diff --git a/include/graphblas/bsp1d/init.hpp b/include/graphblas/bsp1d/init.hpp index fc7c1df16..4ad59ff0b 100644 --- a/include/graphblas/bsp1d/init.hpp +++ b/include/graphblas/bsp1d/init.hpp @@ -191,9 +191,15 @@ namespace grb { /** Whether a finalize has been called. */ bool destroyed; - /** Mapper to assign IDs to BSP1D containers .*/ + /** Mapper to assign IDs to BSP1D containers. */ utils::DMapper< uintptr_t > mapper; + /** Map of registered addresses. */ + std::map< const void* , std::pair< size_t, const lpf_memslot_t > > registered_slots; + + /** Map of registered memory slots to their address. */ + std::map< const lpf_memslot_t , const void* > global_memslots; + /** * This class is default-constructible. * diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index 4c7fe2665..05684c193 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -32,13 +32,12 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { // sleep( 10 ); int x = 42, y = 69; - grb::rdma rdma(4); lpf_memslot_t memslot; - rc = rc ? rc : rdma.register_global( x, memslot ); + rc = rc ? rc : grb::rdma::register_global( x, memslot ); assert( rc == grb::SUCCESS ); if( s == 0 ){ - rc = rc ? rc : rdma.put( y, 1, memslot ); + rc = rc ? rc : grb::rdma::put( y, 1, memslot ); x = 0; } assert( rc == grb::SUCCESS ); @@ -49,7 +48,7 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { rc = rc ? rc : grb::spmd<>::sync(); if( s == 1 ){ - rc = rc ? rc : rdma.get( y, 0, memslot ); + rc = rc ? rc : grb::rdma::get( y, 0, memslot ); } assert( rc == grb::SUCCESS ); rc = rc ? rc : grb::spmd<>::sync(); From a057d3a1442568a416a06b15204adc359cc4b248 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 13:18:34 +0100 Subject: [PATCH 04/13] Restructuring rdma to conform to proposed public interface --- include/graphblas/bsp/rdma.hpp | 366 +++++++++++++++++++++---------- include/graphblas/bsp1d/init.hpp | 32 ++- tests/unit/rdma.cpp | 8 +- 3 files changed, 283 insertions(+), 123 deletions(-) diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 40173b6ec..34fe32813 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -16,6 +16,7 @@ /* * @author G. Gaio + * @date December, 2025 */ #ifndef _H_GRB_LPF_RDMA @@ -33,147 +34,278 @@ namespace grb { /** Superclass implementation for all LPF-backed implementations. */ + // template<> + // class rdma< GENERIC_BSP > { namespace rdma { + namespace internal { + + /** + * Registers a global buffer for RDMA + * + * This is a collective operation! + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename T > + RC register_global( T* buf, const size_t size ) { + grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + lpf_err_t lpf_rc = LPF_SUCCESS; + lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; + + data.ensureMemslotAvailable( 1 ); + data.signalMemslotTaken(); + + assert( data.registered_slots.find( static_cast< const void* >( buf ) ) == data.registered_slots.end() ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( + data.context, + static_cast< void* >( buf ), + size, &memslot + ); + lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); + + data.registered_slots.insert({ static_cast< const void* >( buf ), + std::make_pair( size, memslot ) }); + data.global_memslots.insert({ memslot, static_cast< const void* >( buf ) }); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } - /** - * Deregisters a local buffer for RDMA - * - * @return grb::SUCCESS When all queued communication is executed succesfully. - * @return grb::PANIC When an unrecoverable error occurs. When this value is - * returned, the library enters an undefined state. - */ - template< typename IOType > - RC deregister( const lpf_memslot_t memslot ) { - internal::BSP1D_Data & data = internal::grb_BSP1D.load(); - lpf_err_t lpf_rc = LPF_SUCCESS; - - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, &memslot ); - - // TODO: delete registered memory from map - const auto it = data.global_memslots.find( memslot ); - assert( it != data.global_memslots.end() ); - data.registered_slots.erase( data.registered_slots.find( it->second ) ); - data.global_memslots.erase( it ); + /** + * Deregisters a local buffer for RDMA + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename T > + RC deregister( const T &buf ) { +#ifdef _DEBUG + std::cout << "deregister: memslot " << memslot << std::endl; +#endif + grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + lpf_err_t lpf_rc = LPF_SUCCESS; + lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; + + + const auto it0 = data.registered_slots.find( buf ) + assert( it0 != data.registered_slots.end() ); + + memslot = it0->second->second; + data.registered_slots.erase( it0 ); + + // TODO: delete registered memory from map + const auto it1 = data.global_memslots.find( memslot ); + assert( it1 != data.global_memslots.end() ); + data.global_memslots.erase( it1 ); + + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, &memslot ); + + data.signalMemslotReleased( 1 ); + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } + } - data.signalMemslotReleased( 1 ); - - if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; - } else { - return grb::PANIC; + /** + * Writes a message to another process' registered memory + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename T1, typename T2 > + RC put( T1* src, const size_t dst_pid, T2* dst, const size_t &size ) { +#ifdef _DEBUG + std::cout << "rdma::put( " << src << ", " << size << ", " << dst_pid << ", " << dst << ") called" << std::endl; +#endif + + const auto lpf_attr = LPF_MSG_DEFAULT; + grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + lpf_err_t lpf_rc = LPF_SUCCESS; + lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; + lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; + + // dynamic checks + if( dst_pid >= data.P ) { + return grb::ILLEGAL; + } + + // check trivial dispatch + if( size == 0 ) { + return grb::SUCCESS; + } + if( data.P == 1 ) { + return grb::SUCCESS; + } + + data.ensureMemslotAvailable( 1 ); + { + const auto it = data.registered_slots.find( dst ); + assert( it != data.registered_slots.end() ); + assert( it->second.first >= size ); + dst_memslot = it->second.second; + } + + const auto it = data.registered_slots.find( src ); + if( it == data.registered_slots.end() ){ + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, src, size, &src_memslot ); + } else { + // there must be a better check... + assert( it->second.first >= size ); // is there enough space? + src_memslot = it->second.second; + } + + lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); + data.put_requests.emplace_back( src, dst_pid, dst_memslot, 0, size ); + + if( it == data.registered_slots.end() ){ + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); + } + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } } - } - /** - * Registers a global buffer for RDMA - * - * This is a collective operation! - * - * @return grb::SUCCESS When all queued communication is executed succesfully. - * @return grb::PANIC When an unrecoverable error occurs. When this value is - * returned, the library enters an undefined state. - */ - template< typename IOType > - RC register_global( IOType* buf, const size_t size, lpf_memslot_t &memslot ) { - internal::BSP1D_Data & data = internal::grb_BSP1D.load(); - lpf_err_t lpf_rc = LPF_SUCCESS; - - data.ensureMemslotAvailable( 1 ); - data.signalMemslotTaken(); - - assert( data.registered_slots.find( buf ) == data.registered_slots.end() ); - - lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( - data.context, - static_cast< void* >( buf ), - size, &memslot - ); - lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); - - data.registered_slots.insert({ static_cast< const void* >( buf ), - std::make_pair( size, memslot ) }); - data.global_memslots.insert({ memslot, static_cast< const void* >( buf ) }); - - if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; - } else { - return grb::PANIC; + /** + * Reads a message from another process' registered memory + * + * @return grb::SUCCESS When all queued communication is executed succesfully. + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename T > + RC get( const size_t &src_pid, T* src, T* dst, const size_t size ) { +#ifdef _DEBUG + std::cout << "rdma::get( " << src << ", " << size << ", " << src_pid << ", " << src_memslot << ") called" << std::endl; +#endif + const auto lpf_attr = LPF_MSG_DEFAULT; + grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; + lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; + lpf_err_t lpf_rc = LPF_SUCCESS; + + // dynamic checks + if( src_pid >= data.P ) { + return ILLEGAL; + } + + // check trivial dispatch + if( size == 0 ) { + return SUCCESS; + } + if( data.P == 1 ) { + return SUCCESS; + } + + data.ensureMemslotAvailable( 1 ); + { + const auto it = data.registered_slots.find( src ); + assert( it != data.registered_slots.end() ); + assert( it->second.first >= size ); + src_memslot = it->second.second; + } + + const auto it = data.registered_slots.find( dst ); + if( it == data.registered_slots.end() ){ + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, dst, size, &dst_memslot ); + } else { + // there must be a better check... + assert( it->second.first >= size ); + dst_memslot = it->second.second; + } + + lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); + data.get_requests.emplace_back( src_pid, src_memslot, 0, src, size ); + + if( it == data.registered_slots.end() ){ + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, dst_memslot ); + } + + if( lpf_rc == LPF_SUCCESS ) { + return grb::SUCCESS; + } else { + return grb::PANIC; + } } - } - template< typename IOType > - inline RC register_global( IOType &buf, lpf_memslot_t &memslot ) { - return register_global( &buf, sizeof(IOType), memslot ); - } + } // namespace internal - /** - * Reads a message from another process' registered memory - * - * @return grb::SUCCESS When all queued communication is executed succesfully. - * @return grb::PANIC When an unrecoverable error occurs. When this value is - * returned, the library enters an undefined state. + /* + * RDMA wrappers for internal functions */ - template< typename IOType > - RC get( IOType* msg, const size_t size, const size_t &src_pid, const lpf_memslot_t &src_memslot ) { - const auto lpf_attr = LPF_MSG_DEFAULT; - internal::BSP1D_Data & data = internal::grb_BSP1D.load(); - lpf_memslot_t dst_memslot; - lpf_err_t lpf_rc = LPF_SUCCESS; - - data.ensureMemslotAvailable( 1 ); + template< typename T > + inline RC register_global( T &buf) { + return internal::register_global( &buf, sizeof(T) ); + } - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &dst_memslot ); - lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); + template< typename T > + inline RC register_global( grb::Vector< T, grb::reference > &inout ) { + const size_t size = grb::internal::getCoordinates( inout ).size(); + const size_t bsize = size * sizeof( T ); + T* raw_ptr = grb::internal::getRaw( inout ); - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, dst_memslot ); + lpf_memslot_t slot = LPF_INVALID_MEMSLOT; + lpf_err_t lpf_rc = LPF_SUCCESS; - if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; - } else { - return grb::PANIC; - } - } - template< typename IOType > - inline RC get( IOType &msg, const size_t src_pid, const lpf_memslot_t &src_memslot ) { - return get( &msg, sizeof(IOType), src_pid, src_memslot ); + return internal::register_global( raw_ptr, bsize ); } - /** - * Writes a message to another process' registered memory - * - * @return grb::SUCCESS When all queued communication is executed succesfully. - * @return grb::PANIC When an unrecoverable error occurs. When this value is - * returned, the library enters an undefined state. - */ - template< typename IOType > - RC put( IOType* msg, const size_t &size, const size_t dst_pid, const lpf_memslot_t &dst_memslot ) { - const auto lpf_attr = LPF_MSG_DEFAULT; - internal::BSP1D_Data & data = internal::grb_BSP1D.load(); - lpf_err_t lpf_rc = LPF_SUCCESS; - lpf_memslot_t src_memslot; - data.ensureMemslotAvailable( 1 ); - assert( data.global_memslots.find( dst_memslot ) != data.global_memslots.end() ); + template< typename T > + inline RC get( const size_t src_pid, T &src, T &dst ) { + return internal::get( src_pid, &src, &dst, sizeof(T) ); + } - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, msg, size, &src_memslot ); + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + inline RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { - lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); + // we only support grb::reference for now + static_assert( grb::reference == backend ); - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); + const size_t size = grb::internal::getCoordinates( dst ).size(); + const size_t bsize = size * sizeof( T ); - if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; - } else { - return grb::PANIC; - } + return internal::get( src_pid, grb::internal::getRaw( src ), grb::internal::getRaw( dst ), bsize ); } - template< typename IOType > - inline RC put( IOType&msg, const size_t dst_pid, const lpf_memslot_t&dst_memslot ) { - return put( &msg, sizeof(IOType), dst_pid, dst_memslot ); + template< typename T > + inline RC put( T &src, const size_t dst_pid, T &dst ) { + return internal::put( &src, dst_pid, &dst, sizeof(T) ); } + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + inline RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { + // we only support grb::reference for now + static_assert( grb::reference == backend ); + const size_t size = grb::internal::getCoordinates( src ).size(); + const size_t bsize = size * sizeof( T ); + + return internal::put( grb::internal::getRaw( src ), dst_pid, grb::internal::getRaw( dst ), bsize ); + } }; // end class ``rdma'' generic LPF implementation diff --git a/include/graphblas/bsp1d/init.hpp b/include/graphblas/bsp1d/init.hpp index 4ad59ff0b..cf9395f09 100644 --- a/include/graphblas/bsp1d/init.hpp +++ b/include/graphblas/bsp1d/init.hpp @@ -54,6 +54,20 @@ namespace grb { size_t src_offset; void * dst; size_t size; + + get_request( + lpf_pid_t src_pid, + lpf_memslot_t src, + size_t src_offset, + void * dst, + size_t size + ) + : src_pid(src_pid) + , src(src) + , src_offset(src_offset) + , dst(dst) + , size(size) + {} }; /** All information corresponding to a put request. */ @@ -63,6 +77,20 @@ namespace grb { lpf_memslot_t dst; size_t dst_offset; size_t size; + + put_request( + void * src, + lpf_pid_t dst_pid, + lpf_memslot_t dst, + size_t dst_offset, + size_t size + ) + : src(src) + , dst_pid(dst_pid) + , dst(dst) + , dst_offset(dst_offset) + , size(size) + {} }; /** @@ -194,8 +222,8 @@ namespace grb { /** Mapper to assign IDs to BSP1D containers. */ utils::DMapper< uintptr_t > mapper; - /** Map of registered addresses. */ - std::map< const void* , std::pair< size_t, const lpf_memslot_t > > registered_slots; + /** Map of globally registered addresses. */ + std::map< const void* , std::pair< const size_t, const lpf_memslot_t > > registered_slots; /** Map of registered memory slots to their address. */ std::map< const lpf_memslot_t , const void* > global_memslots; diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index 05684c193..64c5fe291 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -32,12 +32,12 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { // sleep( 10 ); int x = 42, y = 69; - lpf_memslot_t memslot; - rc = rc ? rc : grb::rdma::register_global( x, memslot ); + rc = rc ? rc : grb::rdma::register_global( x ); + // rc = rc ? rc : grb::rdma::register_global( y ); // sus assert( rc == grb::SUCCESS ); if( s == 0 ){ - rc = rc ? rc : grb::rdma::put( y, 1, memslot ); + rc = rc ? rc : grb::rdma::put( y, 1, x ); x = 0; } assert( rc == grb::SUCCESS ); @@ -48,7 +48,7 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { rc = rc ? rc : grb::spmd<>::sync(); if( s == 1 ){ - rc = rc ? rc : grb::rdma::get( y, 0, memslot ); + rc = rc ? rc : grb::rdma::get( 0, x, y ); } assert( rc == grb::SUCCESS ); rc = rc ? rc : grb::spmd<>::sync(); From 400a46d02dbc04e0fc7eff306c0a3155c1174409 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:37:31 +0100 Subject: [PATCH 05/13] Transformed namespace to class + added base and reference implementation --- include/CMakeLists.txt | 2 +- include/graphblas.hpp | 1 + include/graphblas/base/rdma.hpp | 85 +++++++++++++++++++++++ include/graphblas/bsp/rdma.hpp | 95 ++++++++++++-------------- include/graphblas/bsp1d/init.hpp | 8 +-- include/graphblas/bsp1d/rdma.hpp | 31 +++++++++ include/graphblas/hyperdags/rdma.hpp | 39 +++++++++++ include/graphblas/nonblocking/rdma.hpp | 47 +++++++++++++ include/graphblas/rdma.hpp | 49 +++++++++++++ include/graphblas/reference/rdma.hpp | 94 +++++++++++++++++++++++++ tests/unit/rdma.cpp | 9 ++- 11 files changed, 400 insertions(+), 60 deletions(-) create mode 100644 include/graphblas/base/rdma.hpp create mode 100644 include/graphblas/bsp1d/rdma.hpp create mode 100644 include/graphblas/hyperdags/rdma.hpp create mode 100644 include/graphblas/nonblocking/rdma.hpp create mode 100644 include/graphblas/rdma.hpp create mode 100644 include/graphblas/reference/rdma.hpp diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 41265746c..a9c6c6231 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -50,7 +50,7 @@ set( root_files "graphblas/io.hpp" "graphblas/iomode.hpp" "graphblas/matrix.hpp" "graphblas/monoid.hpp" "graphblas/ops.hpp" "graphblas/phase.hpp" "graphblas/pinnedvector.hpp" "graphblas/properties.hpp" "graphblas/rc.hpp" - "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/tags.hpp" + "graphblas/semiring.hpp" "graphblas/spmd.hpp" "graphblas/rdma.hpp" "graphblas/tags.hpp" "graphblas/type_traits.hpp" "graphblas/utils.hpp" "graphblas/vector.hpp" "graphblas/synchronizedNonzeroIterator.hpp" "graphblas/nonzeroStorage.hpp" "graphblas/selection_ops.hpp" diff --git a/include/graphblas.hpp b/include/graphblas.hpp index dd2f63102..4de4cec42 100644 --- a/include/graphblas.hpp +++ b/include/graphblas.hpp @@ -545,6 +545,7 @@ namespace grb { #include #include #include +#include #ifdef _GRB_WITH_LPF // collects various BSP utilities diff --git a/include/graphblas/base/rdma.hpp b/include/graphblas/base/rdma.hpp new file mode 100644 index 000000000..f336932bd --- /dev/null +++ b/include/graphblas/base/rdma.hpp @@ -0,0 +1,85 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * + * Exposes facilities for Remote Direct Memory Access + * + * @author G. Gaio + * @date 16th of December, 2025 + */ + +#ifndef _H_GRB_BASE_RDMA +#define _H_GRB_BASE_RDMA + +#include //size_t + +#include // SIZE_MAX + +#include +#include + +#include "config.hpp" + + +namespace grb { + + /** + * For backends that support multiple user processes this class defines some + * basic primitives to support RDMA programming. + * + * All backends must implement this interface, including backends that do not + * support multiple user processes. The interface herein defined hence ensures + * to allow for trivial implementations for single user process backends. + */ + template< Backend implementation > + class rdma { + public: + template< typename T > + static grb::RC register_global( T &buf); + + template< typename T > + static grb::RC register_global( grb::Vector< T, grb::reference > &buf ); + + template< typename T > + static grb::RC get( const size_t src_pid, T &src, T &dst ); + + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ); + + template< typename T > + static grb::RC put( const T &src, const size_t dst_pid, T &dst ); + + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst); + }; // end class ``rdma'' + +} // namespace grb + +#endif // end _H_GRB_BASE_SPMD + diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 34fe32813..2f5057cfa 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -34,10 +34,9 @@ namespace grb { /** Superclass implementation for all LPF-backed implementations. */ - // template<> - // class rdma< GENERIC_BSP > { - namespace rdma { - namespace internal { + template<> + class rdma< GENERIC_BSP > { + private: /** * Registers a global buffer for RDMA @@ -49,26 +48,25 @@ namespace grb { * returned, the library enters an undefined state. */ template< typename T > - RC register_global( T* buf, const size_t size ) { + static grb::RC register_global( const T* buf, const size_t size ) { grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; + const void* buf_void = reinterpret_cast< const void* >( buf ); - data.ensureMemslotAvailable( 1 ); + data.ensureMemslotAvailable( 1 ); data.signalMemslotTaken(); - assert( data.registered_slots.find( static_cast< const void* >( buf ) ) == data.registered_slots.end() ); + assert( data.registered_slots.find( buf_void ) == data.registered_slots.end() ); lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( - data.context, - static_cast< void* >( buf ), + data.context, const_cast< void* >( buf_void ), size, &memslot ); lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); - data.registered_slots.insert({ static_cast< const void* >( buf ), - std::make_pair( size, memslot ) }); - data.global_memslots.insert({ memslot, static_cast< const void* >( buf ) }); + data.registered_slots.insert({ buf_void, std::make_pair( size, memslot ) }); + data.global_memslots.insert({ memslot, buf_void }); if( lpf_rc == LPF_SUCCESS ) { return grb::SUCCESS; @@ -85,7 +83,7 @@ namespace grb { * returned, the library enters an undefined state. */ template< typename T > - RC deregister( const T &buf ) { + static grb::RC deregister( const T &buf ) { #ifdef _DEBUG std::cout << "deregister: memslot " << memslot << std::endl; #endif @@ -124,7 +122,7 @@ namespace grb { * returned, the library enters an undefined state. */ template< typename T1, typename T2 > - RC put( T1* src, const size_t dst_pid, T2* dst, const size_t &size ) { + static grb::RC put( const T1* src, const size_t dst_pid, T2* dst, const size_t &size ) { #ifdef _DEBUG std::cout << "rdma::put( " << src << ", " << size << ", " << dst_pid << ", " << dst << ") called" << std::endl; #endif @@ -134,6 +132,7 @@ namespace grb { lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; + const void* src_void = reinterpret_cast< const void * >( src ); // dynamic checks if( dst_pid >= data.P ) { @@ -144,21 +143,18 @@ namespace grb { if( size == 0 ) { return grb::SUCCESS; } - if( data.P == 1 ) { - return grb::SUCCESS; - } - data.ensureMemslotAvailable( 1 ); + // data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { - const auto it = data.registered_slots.find( dst ); + const auto it = data.registered_slots.find( reinterpret_cast< const void* >( dst ) ); assert( it != data.registered_slots.end() ); assert( it->second.first >= size ); dst_memslot = it->second.second; } - const auto it = data.registered_slots.find( src ); + const auto it = data.registered_slots.find( src_void ); if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, src, size, &src_memslot ); + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( src_void ), size, &src_memslot ); } else { // there must be a better check... assert( it->second.first >= size ); // is there enough space? @@ -166,7 +162,7 @@ namespace grb { } lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); - data.put_requests.emplace_back( src, dst_pid, dst_memslot, 0, size ); + data.put_requests.emplace_back( src_void, dst_pid, dst_memslot, 0, size ); if( it == data.registered_slots.end() ){ lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); @@ -187,7 +183,7 @@ namespace grb { * returned, the library enters an undefined state. */ template< typename T > - RC get( const size_t &src_pid, T* src, T* dst, const size_t size ) { + static grb::RC get( const size_t &src_pid, const T* src, T* dst, const size_t size ) { #ifdef _DEBUG std::cout << "rdma::get( " << src << ", " << size << ", " << src_pid << ", " << src_memslot << ") called" << std::endl; #endif @@ -196,23 +192,21 @@ namespace grb { lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; lpf_err_t lpf_rc = LPF_SUCCESS; + const void* dst_void = reinterpret_cast< const void * >( dst ); // dynamic checks if( src_pid >= data.P ) { - return ILLEGAL; + return grb::ILLEGAL; } // check trivial dispatch if( size == 0 ) { - return SUCCESS; - } - if( data.P == 1 ) { - return SUCCESS; + return grb::SUCCESS; } - data.ensureMemslotAvailable( 1 ); + // data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { - const auto it = data.registered_slots.find( src ); + const auto it = data.registered_slots.find( reinterpret_cast< const void* >( src ) ); assert( it != data.registered_slots.end() ); assert( it->second.first >= size ); src_memslot = it->second.second; @@ -220,7 +214,7 @@ namespace grb { const auto it = data.registered_slots.find( dst ); if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, dst, size, &dst_memslot ); + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( dst_void ), size, &dst_memslot ); } else { // there must be a better check... assert( it->second.first >= size ); @@ -241,42 +235,44 @@ namespace grb { } } - } // namespace internal - + public: /* * RDMA wrappers for internal functions */ template< typename T > - inline RC register_global( T &buf) { - return internal::register_global( &buf, sizeof(T) ); + static inline grb::RC register_global( T &buf) { + return register_global( &buf, sizeof(T) ); } - template< typename T > - inline RC register_global( grb::Vector< T, grb::reference > &inout ) { - const size_t size = grb::internal::getCoordinates( inout ).size(); + template< + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static inline grb::RC register_global( grb::Vector< T, backend, Coords > &buf ) { + const size_t size = grb::internal::getCoordinates( buf ).size(); const size_t bsize = size * sizeof( T ); - T* raw_ptr = grb::internal::getRaw( inout ); + T* raw_ptr = grb::internal::getRaw( buf ); lpf_memslot_t slot = LPF_INVALID_MEMSLOT; lpf_err_t lpf_rc = LPF_SUCCESS; - return internal::register_global( raw_ptr, bsize ); + return register_global( raw_ptr, bsize ); } template< typename T > - inline RC get( const size_t src_pid, T &src, T &dst ) { - return internal::get( src_pid, &src, &dst, sizeof(T) ); + static inline grb::RC get( const size_t src_pid, const T &src, T &dst ) { + return get( src_pid, &src, &dst, sizeof(T) ); } template< - grb::Descriptor descr = descriptors::no_operation, grb::Backend backend = grb::reference, typename T, typename Coords > - inline RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { + static inline grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { // we only support grb::reference for now static_assert( grb::reference == backend ); @@ -284,27 +280,26 @@ namespace grb { const size_t size = grb::internal::getCoordinates( dst ).size(); const size_t bsize = size * sizeof( T ); - return internal::get( src_pid, grb::internal::getRaw( src ), grb::internal::getRaw( dst ), bsize ); + return get( src_pid, grb::internal::getRaw( src ), grb::internal::getRaw( dst ), bsize ); } template< typename T > - inline RC put( T &src, const size_t dst_pid, T &dst ) { - return internal::put( &src, dst_pid, &dst, sizeof(T) ); + static inline grb::RC put( const T &src, const size_t dst_pid, T &dst ) { + return put( &src, dst_pid, &dst, sizeof(T) ); } template< - grb::Descriptor descr = descriptors::no_operation, grb::Backend backend = grb::reference, typename T, typename Coords > - inline RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { + static inline grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { // we only support grb::reference for now static_assert( grb::reference == backend ); const size_t size = grb::internal::getCoordinates( src ).size(); const size_t bsize = size * sizeof( T ); - return internal::put( grb::internal::getRaw( src ), dst_pid, grb::internal::getRaw( dst ), bsize ); + return put( grb::internal::getRaw( src ), dst_pid, grb::internal::getRaw( dst ), bsize ); } }; // end class ``rdma'' generic LPF implementation diff --git a/include/graphblas/bsp1d/init.hpp b/include/graphblas/bsp1d/init.hpp index cf9395f09..ce5eebf23 100644 --- a/include/graphblas/bsp1d/init.hpp +++ b/include/graphblas/bsp1d/init.hpp @@ -52,14 +52,14 @@ namespace grb { lpf_pid_t src_pid; lpf_memslot_t src; size_t src_offset; - void * dst; + const void * dst; size_t size; get_request( lpf_pid_t src_pid, lpf_memslot_t src, size_t src_offset, - void * dst, + const void * dst, size_t size ) : src_pid(src_pid) @@ -72,14 +72,14 @@ namespace grb { /** All information corresponding to a put request. */ struct put_request { - void * src; + const void * src; lpf_pid_t dst_pid; lpf_memslot_t dst; size_t dst_offset; size_t size; put_request( - void * src, + const void * src, lpf_pid_t dst_pid, lpf_memslot_t dst, size_t dst_offset, diff --git a/include/graphblas/bsp1d/rdma.hpp b/include/graphblas/bsp1d/rdma.hpp new file mode 100644 index 000000000..3d7b5778e --- /dev/null +++ b/include/graphblas/bsp1d/rdma.hpp @@ -0,0 +1,31 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _H_GRB_BSP1D_RDMA +#define _H_GRB_BSP1D_RDMA + +#include + +namespace grb { + + /** Superclass implementation for all BSP-backed implementations. */ + template<> + class rdma< BSP1D > : public rdma< GENERIC_BSP > {}; + +} // namespace grb + +#endif // end _H_GRB_BSP1D_SPMD diff --git a/include/graphblas/hyperdags/rdma.hpp b/include/graphblas/hyperdags/rdma.hpp new file mode 100644 index 000000000..d1339538f --- /dev/null +++ b/include/graphblas/hyperdags/rdma.hpp @@ -0,0 +1,39 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * + * Provides the RDMA API for the HyperDAGs backend + * + * @author G. Gaio + * @date 16th of December 2025 + */ + +#include //size_t + +#include + +namespace grb { + + template<> + class rdma< hyperdags > : rdma< reference > { + + }; // end class ``rdma'' reference implementation + +} // namespace grb + diff --git a/include/graphblas/nonblocking/rdma.hpp b/include/graphblas/nonblocking/rdma.hpp new file mode 100644 index 000000000..1aaae1220 --- /dev/null +++ b/include/graphblas/nonblocking/rdma.hpp @@ -0,0 +1,47 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * + * Provides the RDMA functions for the nonblocking backend. + * + * @author G. Gaio + * @date 16th of December 2025 + */ + +#ifndef _H_GRB_NONBLOCKING_RDMA +#define _H_GRB_NONBLOCKING_RDMA + +#include //size_t + +#include +#include + + +namespace grb { + + /** The rdma class is based on that of the reference backend */ + template<> + class rdma< nonblocking > : grb::rdma< reference > { + + }; // end class ``rdma'' nonblocking implementation + +} // namespace grb + +#endif // end _H_GRB_NONBLOCKING_RDMA + diff --git a/include/graphblas/rdma.hpp b/include/graphblas/rdma.hpp new file mode 100644 index 000000000..451c2a22b --- /dev/null +++ b/include/graphblas/rdma.hpp @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * @author G. Gaio + * @date December, 2025 + */ + +#ifndef _H_GRB_RDMA +#define _H_GRB_RDMA + +#include "base/config.hpp" +#include "base/rdma.hpp" + +#ifdef _GRB_WITH_REFERENCE + #include "graphblas/reference/rdma.hpp" +#endif +#ifdef _GRB_WITH_HYPERDAGS + #include +#endif +#ifdef _GRB_WITH_NONBLOCKING + #include "graphblas/nonblocking/rdma.hpp" +#endif +#ifdef _GRB_WITH_LPF + #include "graphblas/bsp1d/rdma.hpp" +#endif + +// specify default only if requested during compilation +#ifdef _GRB_BACKEND +namespace grb { + template< Backend implementation = config::default_backend > + class rdma; +} +#endif + +#endif // end _H_GRB_RDMA diff --git a/include/graphblas/reference/rdma.hpp b/include/graphblas/reference/rdma.hpp new file mode 100644 index 000000000..083553269 --- /dev/null +++ b/include/graphblas/reference/rdma.hpp @@ -0,0 +1,94 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * @author A. N. Yzelman + * @date 28th of April, 2017 + */ + +#if ! defined _H_GRB_REFERENCE_RDMA || defined _H_GRB_REFERENCE_OMP_RDMA +#define _H_GRB_REFERENCE_RDMA + +#include //size_t + +#include + +namespace grb { + + /** No implementation notes. */ + template<> + class rdma< reference > { + public: + template< typename T > + static grb::RC register_global( T &buf) { + return grb::SUCCESS; + } + + template< typename T > + static grb::RC register_global( grb::Vector< T, grb::reference > &buf ) { + return grb::SUCCESS; + } + + template< typename T > + static grb::RC get( const size_t src_pid, T &src, T &dst ) { + assert( src_pid == 0 ); + dst = src; + return grb::SUCCESS; + } + + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { + return grb::SUCCESS; + } + + template< typename T > + static grb::RC put( const T &src, const size_t dst_pid, T &dst ) { + assert( dst_pid == 0 ); + dst = src; + return grb::SUCCESS; + } + + template< + grb::Descriptor descr = descriptors::no_operation, + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { + return grb::SUCCESS; + } + }; // end class ``rdma'' reference implementation + +} // namespace grb + +// parse again for reference_omp backend +#ifdef _GRB_WITH_OMP +#ifndef _H_GRB_REFERENCE_OMP_RDMA +#define _H_GRB_REFERENCE_OMP_RDMA +#define reference reference_omp +#include "graphblas/reference/rdma.hpp" +#undef reference +#undef _H_GRB_REFERENCE_OMP_RDMA +#endif +#endif + +#endif // end _H_GRB_REFERENCE_RDMA diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index 64c5fe291..649d115dc 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -19,7 +19,6 @@ #include #include -#include "graphblas/bsp/rdma.hpp" void grbProgram( const size_t &in, grb::RC &exit_status ) { (void) in; @@ -32,12 +31,12 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { // sleep( 10 ); int x = 42, y = 69; - rc = rc ? rc : grb::rdma::register_global( x ); - // rc = rc ? rc : grb::rdma::register_global( y ); // sus + rc = rc ? rc : grb::rdma< >::register_global( x ); + // rc = rc ? rc : grb::rdma< >::register_global( y ); // sus assert( rc == grb::SUCCESS ); if( s == 0 ){ - rc = rc ? rc : grb::rdma::put( y, 1, x ); + rc = rc ? rc : grb::rdma< >::put( y, 1, x ); x = 0; } assert( rc == grb::SUCCESS ); @@ -48,7 +47,7 @@ void grbProgram( const size_t &in, grb::RC &exit_status ) { rc = rc ? rc : grb::spmd<>::sync(); if( s == 1 ){ - rc = rc ? rc : grb::rdma::get( 0, x, y ); + rc = rc ? rc : grb::rdma< >::get( 0, x, y ); } assert( rc == grb::SUCCESS ); rc = rc ? rc : grb::spmd<>::sync(); From 66862bd26116286f0067e4a8dabfbd0562c816ad Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 15:38:10 +0100 Subject: [PATCH 06/13] Review changes --- include/graphblas/base/rdma.hpp | 12 ++++++------ include/graphblas/bsp/rdma.hpp | 7 +------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/include/graphblas/base/rdma.hpp b/include/graphblas/base/rdma.hpp index f336932bd..fc2d0e8eb 100644 --- a/include/graphblas/base/rdma.hpp +++ b/include/graphblas/base/rdma.hpp @@ -51,7 +51,7 @@ namespace grb { class rdma { public: template< typename T > - static grb::RC register_global( T &buf); + static grb::RC register_global( T &buf ); template< typename T > static grb::RC register_global( grb::Vector< T, grb::reference > &buf ); @@ -60,8 +60,8 @@ namespace grb { static grb::RC get( const size_t src_pid, T &src, T &dst ); template< - grb::Descriptor descr = descriptors::no_operation, - grb::Backend backend = grb::reference, + grb::Descriptor descr, + grb::Backend backend, typename T, typename Coords > @@ -71,12 +71,12 @@ namespace grb { static grb::RC put( const T &src, const size_t dst_pid, T &dst ); template< - grb::Descriptor descr = descriptors::no_operation, - grb::Backend backend = grb::reference, + grb::Descriptor descr, + grb::Backend backend, typename T, typename Coords > - static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst); + static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst ); }; // end class ``rdma'' } // namespace grb diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 2f5057cfa..17544162f 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -55,7 +55,6 @@ namespace grb { const void* buf_void = reinterpret_cast< const void* >( buf ); data.ensureMemslotAvailable( 1 ); - data.signalMemslotTaken(); assert( data.registered_slots.find( buf_void ) == data.registered_slots.end() ); @@ -65,6 +64,7 @@ namespace grb { ); lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); + data.signalMemslotTaken(); data.registered_slots.insert({ buf_void, std::make_pair( size, memslot ) }); data.global_memslots.insert({ memslot, buf_void }); @@ -91,7 +91,6 @@ namespace grb { lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; - const auto it0 = data.registered_slots.find( buf ) assert( it0 != data.registered_slots.end() ); @@ -216,7 +215,6 @@ namespace grb { if( it == data.registered_slots.end() ){ lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( dst_void ), size, &dst_memslot ); } else { - // there must be a better check... assert( it->second.first >= size ); dst_memslot = it->second.second; } @@ -255,9 +253,6 @@ namespace grb { const size_t bsize = size * sizeof( T ); T* raw_ptr = grb::internal::getRaw( buf ); - lpf_memslot_t slot = LPF_INVALID_MEMSLOT; - lpf_err_t lpf_rc = LPF_SUCCESS; - return register_global( raw_ptr, bsize ); } From 7245bfa9f8f216a4afe670efa4df07eeedd9c7c8 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 15:39:47 +0100 Subject: [PATCH 07/13] Implemented reference RDMA --- include/graphblas/reference/rdma.hpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/graphblas/reference/rdma.hpp b/include/graphblas/reference/rdma.hpp index 083553269..4e8c8c3d5 100644 --- a/include/graphblas/reference/rdma.hpp +++ b/include/graphblas/reference/rdma.hpp @@ -57,7 +57,8 @@ namespace grb { typename Coords > static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { - return grb::SUCCESS; + assert( dst_pid == 0 ); + return grb::set< descr >( dst, src ); } template< typename T > @@ -74,7 +75,8 @@ namespace grb { typename Coords > static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { - return grb::SUCCESS; + assert( dst_pid == 0 ); + return grb::set< descr >( dst, src ); } }; // end class ``rdma'' reference implementation From 119e7a35b19d2d9d7d7fae8bf01d6b9369d1ad26 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 15:42:42 +0100 Subject: [PATCH 08/13] fixup! Implemented reference RDMA --- include/graphblas/reference/rdma.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/graphblas/reference/rdma.hpp b/include/graphblas/reference/rdma.hpp index 4e8c8c3d5..5c23275c2 100644 --- a/include/graphblas/reference/rdma.hpp +++ b/include/graphblas/reference/rdma.hpp @@ -57,7 +57,7 @@ namespace grb { typename Coords > static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { - assert( dst_pid == 0 ); + assert( src_pid == 0 ); return grb::set< descr >( dst, src ); } From adb26f0cfbb19afba84e83d04d9c878768c1de94 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:54:02 +0100 Subject: [PATCH 09/13] Fixed signatures and such in base & reference --- include/graphblas/base/rdma.hpp | 14 ++++++++------ include/graphblas/reference/rdma.hpp | 19 +++++++++++++------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/include/graphblas/base/rdma.hpp b/include/graphblas/base/rdma.hpp index fc2d0e8eb..acf335052 100644 --- a/include/graphblas/base/rdma.hpp +++ b/include/graphblas/base/rdma.hpp @@ -51,13 +51,15 @@ namespace grb { class rdma { public: template< typename T > - static grb::RC register_global( T &buf ); + static inline grb::RC register_global( T &buf ); template< typename T > - static grb::RC register_global( grb::Vector< T, grb::reference > &buf ); + static inline grb::RC register_global( grb::Vector< T, grb::reference > &buf ); + + static inline grb::RC localRegisterSize( const size_t size ); template< typename T > - static grb::RC get( const size_t src_pid, T &src, T &dst ); + static inline grb::RC get( const size_t src_pid, T &src, T &dst ); template< grb::Descriptor descr, @@ -65,10 +67,10 @@ namespace grb { typename T, typename Coords > - static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ); + static inline grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ); template< typename T > - static grb::RC put( const T &src, const size_t dst_pid, T &dst ); + static inline grb::RC put( const T &src, const size_t dst_pid, T &dst ); template< grb::Descriptor descr, @@ -76,7 +78,7 @@ namespace grb { typename T, typename Coords > - static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst ); + static inline grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst ); }; // end class ``rdma'' } // namespace grb diff --git a/include/graphblas/reference/rdma.hpp b/include/graphblas/reference/rdma.hpp index 5c23275c2..0db335cae 100644 --- a/include/graphblas/reference/rdma.hpp +++ b/include/graphblas/reference/rdma.hpp @@ -34,17 +34,24 @@ namespace grb { class rdma< reference > { public: template< typename T > - static grb::RC register_global( T &buf) { + static inline grb::RC register_global( T &buf ) { + (void) buf; return grb::SUCCESS; } template< typename T > - static grb::RC register_global( grb::Vector< T, grb::reference > &buf ) { + static inline grb::RC register_global( grb::Vector< T, grb::reference > &buf ) { + (void) buf; + return grb::SUCCESS; + } + + static inline grb::RC localRegisterSize( const size_t size ) { + (void) size; return grb::SUCCESS; } template< typename T > - static grb::RC get( const size_t src_pid, T &src, T &dst ) { + static inline grb::RC get( const size_t src_pid, T &src, T &dst ) { assert( src_pid == 0 ); dst = src; return grb::SUCCESS; @@ -56,13 +63,13 @@ namespace grb { typename T, typename Coords > - static grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { + static inline grb::RC get( const size_t src_pid, const grb::Vector< T, backend, Coords > &src, grb::Vector< T, backend, Coords > &dst ) { assert( src_pid == 0 ); return grb::set< descr >( dst, src ); } template< typename T > - static grb::RC put( const T &src, const size_t dst_pid, T &dst ) { + static inline grb::RC put( const T &src, const size_t dst_pid, T &dst ) { assert( dst_pid == 0 ); dst = src; return grb::SUCCESS; @@ -74,7 +81,7 @@ namespace grb { typename T, typename Coords > - static grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { + static inline grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst ) { assert( dst_pid == 0 ); return grb::set< descr >( dst, src ); } From 04f491a3778f8eb48c09837b75a134e32d495772 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Tue, 16 Dec 2025 16:54:30 +0100 Subject: [PATCH 10/13] Better test + some fixes | a race condition remains... --- include/graphblas/bsp/rdma.hpp | 30 ++++++---- tests/unit/rdma.cpp | 105 +++++++++++++++++++++++++++++---- 2 files changed, 109 insertions(+), 26 deletions(-) diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 17544162f..6d5c83778 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -51,10 +51,11 @@ namespace grb { static grb::RC register_global( const T* buf, const size_t size ) { grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); lpf_err_t lpf_rc = LPF_SUCCESS; + grb::RC rc = grb::SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; const void* buf_void = reinterpret_cast< const void* >( buf ); - data.ensureMemslotAvailable( 1 ); + rc = rc ? rc : data.ensureMemslotAvailable( 1 ); assert( data.registered_slots.find( buf_void ) == data.registered_slots.end() ); @@ -69,7 +70,7 @@ namespace grb { data.global_memslots.insert({ memslot, buf_void }); if( lpf_rc == LPF_SUCCESS ) { - return grb::SUCCESS; + return rc; } else { return grb::PANIC; } @@ -94,7 +95,7 @@ namespace grb { const auto it0 = data.registered_slots.find( buf ) assert( it0 != data.registered_slots.end() ); - memslot = it0->second->second; + memslot = it0->second.second; data.registered_slots.erase( it0 ); // TODO: delete registered memory from map @@ -143,7 +144,7 @@ namespace grb { return grb::SUCCESS; } - // data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync + // rc = rc ? rc : data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { const auto it = data.registered_slots.find( reinterpret_cast< const void* >( dst ) ); assert( it != data.registered_slots.end() ); @@ -203,7 +204,7 @@ namespace grb { return grb::SUCCESS; } - // data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync + // rc = rc ? rc : data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { const auto it = data.registered_slots.find( reinterpret_cast< const void* >( src ) ); assert( it != data.registered_slots.end() ); @@ -234,11 +235,9 @@ namespace grb { } public: - /* - * RDMA wrappers for internal functions - */ + template< typename T > - static inline grb::RC register_global( T &buf) { + static inline grb::RC register_global( const T &buf ) { return register_global( &buf, sizeof(T) ); } @@ -248,18 +247,24 @@ namespace grb { typename T, typename Coords > - static inline grb::RC register_global( grb::Vector< T, backend, Coords > &buf ) { + static inline grb::RC register_global( const grb::Vector< T, backend, Coords > &buf ) { const size_t size = grb::internal::getCoordinates( buf ).size(); const size_t bsize = size * sizeof( T ); - T* raw_ptr = grb::internal::getRaw( buf ); + const T* raw_ptr = grb::internal::getRaw( buf ); return register_global( raw_ptr, bsize ); } + static inline grb::RC localRegisterSize( const size_t size ) { + grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + + grb::RC rc = data.ensureMemslotAvailable( size ); + return rc; + } template< typename T > static inline grb::RC get( const size_t src_pid, const T &src, T &dst ) { - return get( src_pid, &src, &dst, sizeof(T) ); + return get( src_pid, &src, &dst, sizeof(T) ); } template< @@ -296,7 +301,6 @@ namespace grb { return put( grb::internal::getRaw( src ), dst_pid, grb::internal::getRaw( dst ), bsize ); } - }; // end class ``rdma'' generic LPF implementation } // namespace grb diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index 649d115dc..f214a5f66 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -17,64 +17,143 @@ #include #include +#include #include -void grbProgram( const size_t &in, grb::RC &exit_status ) { - (void) in; - grb::RC rc = grb::SUCCESS; +const int LPF_MPI_AUTO_INITIALIZE = 0; +void grbProgram( const size_t &n, grb::RC &rc ) { const size_t s = grb::spmd<>::pid(); const size_t P = grb::spmd<>::nprocs(); + assert( s < P ); assert( P > 1 ); - // sleep( 10 ); + + if( s == 0 ){ + std::cout << "Testing RDMA of POD" << std::endl; + } int x = 42, y = 69; + if( s == 0 ){ + std::cout << "\t register" << std::endl; + } rc = rc ? rc : grb::rdma< >::register_global( x ); - // rc = rc ? rc : grb::rdma< >::register_global( y ); // sus - assert( rc == grb::SUCCESS ); + + if( s == 0 ){ + std::cout << "\t put" << std::endl; + } if( s == 0 ){ rc = rc ? rc : grb::rdma< >::put( y, 1, x ); x = 0; } - assert( rc == grb::SUCCESS ); rc = rc ? rc : grb::spmd<>::sync(); assert( rc == grb::SUCCESS ); assert( s != 1 || x == 69 ); rc = rc ? rc : grb::spmd<>::sync(); + if( s == 0 ){ + std::cout << "\t get" << std::endl; + } if( s == 1 ){ rc = rc ? rc : grb::rdma< >::get( 0, x, y ); } + rc = rc ? rc : grb::spmd<>::sync(); + assert( s != 1 || y == 0 ); + rc = rc ? rc : grb::spmd<>::sync(); + + if( rc != grb::SUCCESS ) return; + if( s == 0 ){ + std::cout << "Testing RDMA of grb::Vector" << std::endl; + } + + using T=float; + grb::Vector< T, grb::reference > a ( n ); + grb::Vector< T, grb::reference > b ( n ); + constexpr T v1 = 42.69; + + if( s == 0 ){ + std::cout << "\t register" << std::endl; + } + + rc = rc ? rc : grb::set( a, static_cast( s ) ); + rc = rc ? rc : grb::set( b, static_cast( 0 ) ); + + rc = rc ? rc : grb::rdma<>::register_global( a ); assert( rc == grb::SUCCESS ); rc = rc ? rc : grb::spmd<>::sync(); + + rc = rc ? rc : grb::rdma<>::localRegisterSize( 2 ); + + if( s == 0 ){ + std::cout << "\t put" << std::endl; + } + if( s == 0 ){ + rc = rc ? rc : grb::rdma<>::put( a, 1, a ); + } + rc = rc ? rc : grb::spmd<>::sync(); assert( rc == grb::SUCCESS ); - assert( s != 1 || y == 0 ); + if( s == 1 ){ + for(const auto &i : a ){ + assert( i.second == static_cast< T >( 0 ) ); + } + } + + rc = rc ? rc : grb::rdma<>::localRegisterSize( 2 ); + rc = rc ? rc : grb::spmd<>::sync(); + assert( rc == grb::SUCCESS ); + if( s == 0 ){ + std::cout << "\t get" << std::endl; + rc = rc ? rc : grb::set( a, v1 ); + } + rc = rc ? rc : grb::spmd<>::sync(); + if( s == 1 ){ + rc = rc ? rc : grb::rdma<>::get( 0, a, b ); + } + rc = rc ? rc : grb::spmd<>::sync(); assert( rc == grb::SUCCESS ); - exit_status = rc; + + if( s == 1 ){ + sleep(1); + for(const auto &i : b ){ + std::cerr << i.first << " " << i.second << std::endl; + assert( i.second == v1 ); + } + } + return; } int main( int argc, char ** argv ) { (void) argc; - size_t in = 42; - grb::RC out; + size_t n = 42; + grb::RC out = grb::SUCCESS; + + if( MPI_Init( &argc, &argv ) != MPI_SUCCESS ) { + std::cerr << "MPI_Init returns with non-SUCCESS exit code." << std::endl; + return 10; + } std::cout << "This is a functional test " << argv[ 0 ] << "\n"; - grb::Launcher< grb::AUTOMATIC > launcher; - if( launcher.exec( &grbProgram, in, out, true ) != grb::SUCCESS ) { + grb::Launcher< grb::FROM_MPI > launcher; + if( launcher.exec( &grbProgram, n, out, true ) != grb::SUCCESS ) { std::cerr << "Launching test FAILED\n"; return 255; } + if( MPI_Finalize() != MPI_SUCCESS ) { + std::cerr << "MPI_Finalize returns with non-SUCCESS exit code." << std::endl; + return 50; + } + if( out == grb::SUCCESS ){ std::cerr << "Test OK\n"; }else{ std::cerr << "Test FAILED\n"; } + } From 1ee28fd9f59431703a3e6cc958cf65e30a135ef0 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Wed, 17 Dec 2025 13:38:24 +0100 Subject: [PATCH 11/13] RDMA now working + documentation - TODO: local registration --- include/graphblas/bsp/rdma.hpp | 248 ++++++++++++++++++++++++--------- tests/unit/rdma.cpp | 1 - 2 files changed, 184 insertions(+), 65 deletions(-) diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index 6d5c83778..fc3b1a562 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -38,36 +38,37 @@ namespace grb { class rdma< GENERIC_BSP > { private: - /** + /* * Registers a global buffer for RDMA * - * This is a collective operation! + * \warning This is a collective operation, therefore it must be called by all the processes! * - * @return grb::SUCCESS When all queued communication is executed succesfully. + * @param[in] buf Pointer to the start of the buffer of memory to be registered. + * @param[in] size Size of the buffer in bytes. + * + * @return grb::SUCCESS When registration is completed successfully * @return grb::PANIC When an unrecoverable error occurs. When this value is * returned, the library enters an undefined state. */ - template< typename T > - static grb::RC register_global( const T* buf, const size_t size ) { + static grb::RC register_global( const void *buf, const size_t size ) { grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); lpf_err_t lpf_rc = LPF_SUCCESS; grb::RC rc = grb::SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; - const void* buf_void = reinterpret_cast< const void* >( buf ); rc = rc ? rc : data.ensureMemslotAvailable( 1 ); - assert( data.registered_slots.find( buf_void ) == data.registered_slots.end() ); + assert( data.registered_slots.find( buf ) == data.registered_slots.end() ); lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( - data.context, const_cast< void* >( buf_void ), + data.context, const_cast< void* >( buf ), size, &memslot ); lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); data.signalMemslotTaken(); - data.registered_slots.insert({ buf_void, std::make_pair( size, memslot ) }); - data.global_memslots.insert({ memslot, buf_void }); + data.registered_slots.insert({ buf, std::make_pair( size, memslot ) }); + data.global_memslots.insert({ memslot, buf }); if( lpf_rc == LPF_SUCCESS ) { return rc; @@ -77,14 +78,17 @@ namespace grb { } /** - * Deregisters a local buffer for RDMA + * Deregisters a buffer for RDMA. If the buffer is global, this is a collective + * function * - * @return grb::SUCCESS When all queued communication is executed succesfully. + * \warning This is a collective operation, therefore it must be called by all the processes! + * @param[in] buf Pointer to the start of the buffer of memory to be deregistered. + * + * @return grb::SUCCESS When deregistration is completed successfully * @return grb::PANIC When an unrecoverable error occurs. When this value is * returned, the library enters an undefined state. */ - template< typename T > - static grb::RC deregister( const T &buf ) { + static grb::RC deregister( const void *buf ) { #ifdef _DEBUG std::cout << "deregister: memslot " << memslot << std::endl; #endif @@ -92,7 +96,7 @@ namespace grb { lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; - const auto it0 = data.registered_slots.find( buf ) + const auto it0 = data.registered_slots.find( buf ); assert( it0 != data.registered_slots.end() ); memslot = it0->second.second; @@ -103,7 +107,7 @@ namespace grb { assert( it1 != data.global_memslots.end() ); data.global_memslots.erase( it1 ); - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, &memslot ); + lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, memslot ); data.signalMemslotReleased( 1 ); @@ -115,27 +119,34 @@ namespace grb { } /** - * Writes a message to another process' registered memory + * Reads a message from another process' registered memory + * + * Before calling this function, the user is responsible for ensuring that + * enough registers are free, using the function localRegisterSize. Each call + * to function will use one register if the source pointer has not been + * already registered (locally or globally), and zero otherwise. + * + * @param[in] src_pid PID of process whose memory to access. + * @param[in] src Local pointer to the buffer whose associated pointer in the source process where to read. + * @param[out] dst Pointer to the begin of the output buffer. + * @param[in] size Number of bytes to copy. * * @return grb::SUCCESS When all queued communication is executed succesfully. * @return grb::PANIC When an unrecoverable error occurs. When this value is * returned, the library enters an undefined state. */ - template< typename T1, typename T2 > - static grb::RC put( const T1* src, const size_t dst_pid, T2* dst, const size_t &size ) { + static grb::RC get( const size_t &src_pid, const void *src, void *dst, const size_t size ) { #ifdef _DEBUG - std::cout << "rdma::put( " << src << ", " << size << ", " << dst_pid << ", " << dst << ") called" << std::endl; + std::cout << "rdma::get( " << src << ", " << size << ", " << src_pid << ", " << src_memslot << ") called" << std::endl; #endif - const auto lpf_attr = LPF_MSG_DEFAULT; grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); - lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; - const void* src_void = reinterpret_cast< const void * >( src ); + lpf_err_t lpf_rc = LPF_SUCCESS; // dynamic checks - if( dst_pid >= data.P ) { + if( src_pid >= data.P ) { return grb::ILLEGAL; } @@ -144,29 +155,25 @@ namespace grb { return grb::SUCCESS; } - // rc = rc ? rc : data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { - const auto it = data.registered_slots.find( reinterpret_cast< const void* >( dst ) ); + const auto it = data.registered_slots.find( src ); assert( it != data.registered_slots.end() ); assert( it->second.first >= size ); - dst_memslot = it->second.second; + src_memslot = it->second.second; } - const auto it = data.registered_slots.find( src_void ); + const auto it = data.registered_slots.find( dst ); if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( src_void ), size, &src_memslot ); + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( dst ), size, &dst_memslot ); + data.signalMemslotTaken(); } else { - // there must be a better check... - assert( it->second.first >= size ); // is there enough space? - src_memslot = it->second.second; + assert( it->first == dst ); + assert( it->second.first >= size ); + dst_memslot = it->second.second; } - lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); - data.put_requests.emplace_back( src_void, dst_pid, dst_memslot, 0, size ); - - if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, src_memslot ); - } + lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); + data.get_requests.emplace_back( src_pid, src_memslot, 0, src, size ); if( lpf_rc == LPF_SUCCESS ) { return grb::SUCCESS; @@ -176,26 +183,35 @@ namespace grb { } /** - * Reads a message from another process' registered memory + * Writes a message to another process' registered memory. + * + * Before calling this function, the user is responsible for ensuring that + * enough registers are free, using the function localRegisterSize. Each call + * to function will use one register if the destination pointer has not been + * already registered (locally or globally), and zero otherwise. + * + * @param[in] src Pointer to the begin of the input buffer. + * @param[in] dst_pid PID of process whose memory to access. + * @param[out] dst Local pointer to the buffer whose associated pointer in the source process where to write. + * @param[in] size Number of bytes to copy. * * @return grb::SUCCESS When all queued communication is executed succesfully. * @return grb::PANIC When an unrecoverable error occurs. When this value is * returned, the library enters an undefined state. */ - template< typename T > - static grb::RC get( const size_t &src_pid, const T* src, T* dst, const size_t size ) { + static grb::RC put( const void *src, const size_t dst_pid, void *dst, const size_t &size ) { #ifdef _DEBUG - std::cout << "rdma::get( " << src << ", " << size << ", " << src_pid << ", " << src_memslot << ") called" << std::endl; + std::cout << "rdma::put( " << src << ", " << size << ", " << dst_pid << ", " << dst << ") called" << std::endl; #endif + const auto lpf_attr = LPF_MSG_DEFAULT; grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); + lpf_err_t lpf_rc = LPF_SUCCESS; lpf_memslot_t src_memslot = LPF_INVALID_MEMSLOT; lpf_memslot_t dst_memslot = LPF_INVALID_MEMSLOT; - lpf_err_t lpf_rc = LPF_SUCCESS; - const void* dst_void = reinterpret_cast< const void * >( dst ); // dynamic checks - if( src_pid >= data.P ) { + if( dst_pid >= data.P ) { return grb::ILLEGAL; } @@ -206,26 +222,24 @@ namespace grb { // rc = rc ? rc : data.ensureMemslotAvailable( 1 ); // this function calls lpf_sync { - const auto it = data.registered_slots.find( reinterpret_cast< const void* >( src ) ); + const auto it = data.registered_slots.find( dst ); assert( it != data.registered_slots.end() ); assert( it->second.first >= size ); - src_memslot = it->second.second; + dst_memslot = it->second.second; } - const auto it = data.registered_slots.find( dst ); + const auto it = data.registered_slots.find( src ); if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( dst_void ), size, &dst_memslot ); + lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( src ), size, &src_memslot ); + data.signalMemslotTaken(); } else { - assert( it->second.first >= size ); - dst_memslot = it->second.second; + assert( it->first == src ); + assert( it->second.first >= size ); // is there enough space? + src_memslot = it->second.second; } - lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); - data.get_requests.emplace_back( src_pid, src_memslot, 0, src, size ); - - if( it == data.registered_slots.end() ){ - lpf_rc = lpf_rc ? lpf_rc : lpf_deregister( data.context, dst_memslot ); - } + lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); + data.put_requests.emplace_back( src, dst_pid, dst_memslot, 0, size ); if( lpf_rc == LPF_SUCCESS ) { return grb::SUCCESS; @@ -236,12 +250,34 @@ namespace grb { public: + /* + * Registers a global buffer for RDMA on a POD variable. + * + * \warning This is a collective operation, therefore it must be called by all the processes! + * + * @param[in] buf Scalar to be registered + * + * @return grb::SUCCESS When registration is completed successfully + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ template< typename T > static inline grb::RC register_global( const T &buf ) { - return register_global( &buf, sizeof(T) ); + return register_global( reinterpret_cast< const void* >( &buf ), sizeof(T) ); } - + /* + * Registers a global buffer for RDMA + * + * \warning This is a collective operation, therefore it must be called by all the processes! + * + * @param[in] buf Pointer to the start of the buffer of memory to be reserved. + * @param[in] size Size of the buffer in bytes. + * + * @return grb::SUCCESS When registration is completed successfully + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ template< grb::Backend backend = grb::reference, typename T, @@ -250,11 +286,25 @@ namespace grb { static inline grb::RC register_global( const grb::Vector< T, backend, Coords > &buf ) { const size_t size = grb::internal::getCoordinates( buf ).size(); const size_t bsize = size * sizeof( T ); - const T* raw_ptr = grb::internal::getRaw( buf ); + const void *raw_ptr = reinterpret_cast< const void* >( grb::internal::getRaw( buf ) ); return register_global( raw_ptr, bsize ); } + /* + * Reserve space for size additional registers. This function should be used + * before calling put and/or get. These RDMA functions automatically register + * local unregistered buffers, so need space in LPF register to do so. + * One register is needed for each put/get between successive a syncs. + * + * \warning This is a collective operation. A sync could be called internally, therefore it must be called by all the processes! + * + * @param[in] size The number of registers to reserve space for. + * + * @return grb::SUCCESS The register space was successfully ensured. + * @return PANIC Could not ensure a large enough buffer space. The state + * of the library has become undefined. + */ static inline grb::RC localRegisterSize( const size_t size ) { grb::internal::BSP1D_Data & data = grb::internal::grb_BSP1D.load(); @@ -262,11 +312,45 @@ namespace grb { return rc; } + /* + * Copy a variable from a remote process. + * Source variable must have been globally registered. + * + * See private get function for more details. + * + * @param[in] src_pid Source Process ID. + * @param[in] src Object associated with the remote object to read from. + * @param[out] dst Destination object to write to. + * + * @return grb::SUCCESS The register space was successfully ensured. + * @return PANIC Could not ensure a large enough buffer space. The state + * of the library has become undefined. + */ template< typename T > static inline grb::RC get( const size_t src_pid, const T &src, T &dst ) { - return get( src_pid, &src, &dst, sizeof(T) ); + + const void *src_ptr = reinterpret_cast< const void* >( &src ); + void *dst_ptr = reinterpret_cast< void* >( &dst ); + constexpr size_t size = sizeof(T); + + return get( src_pid, src_ptr, dst_ptr, size ); } + /* + * + * Copy a grb::Vector from a remote process. + * Source grb::Vector must have been globally registered. + * + * See private get function for more details. + * + * @param[in] src_pid Source Process ID. + * @param[in] src Object associated with the remote Vector to read from. + * @param[out] dst Destination Vector to write to. + * + * @return grb::SUCCESS The register space was successfully ensured. + * @return PANIC Could not ensure a large enough buffer space. The state + * of the library has become undefined. + */ template< grb::Backend backend = grb::reference, typename T, @@ -279,15 +363,48 @@ namespace grb { const size_t size = grb::internal::getCoordinates( dst ).size(); const size_t bsize = size * sizeof( T ); + const void *src_ptr = reinterpret_cast< const void* >( grb::internal::getRaw( src ) ); + void *dst_ptr = reinterpret_cast< void* >( grb::internal::getRaw( dst ) ); - return get( src_pid, grb::internal::getRaw( src ), grb::internal::getRaw( dst ), bsize ); + return get( src_pid, src_ptr, dst_ptr, bsize ); } + /* + * Write a scalar variable to another process. + * + * Vectors must be of the same size (in the respective processes)! + * + * See private put function for more details. + * + * @param[in] src Variable to read from. + * @param[in] dst_pid Destination Process ID. + * @param[out] dst Variable associated with the scalar in the destination process to write to. + * + * @return grb::SUCCESS The register space was successfully ensured. + * @return PANIC Could not ensure a large enough buffer space. The state + * of the library has become undefined. + */ template< typename T > static inline grb::RC put( const T &src, const size_t dst_pid, T &dst ) { - return put( &src, dst_pid, &dst, sizeof(T) ); + + const void *src_ptr = reinterpret_cast< const void* >( &src ); + void *dst_ptr = reinterpret_cast< void* >( &dst ); + constexpr size_t size = sizeof(T); + + return put( src_ptr, dst_pid, dst_ptr, size ); } + /* + * Write a grb::Vector to another process. + * + * @param[in] src grb::Vector to read from. + * @param[in] dst_pid Destination Process ID. + * @param[out] dst grb::Vector whose associated object in the destination process to write to. + * + * @return grb::SUCCESS The register space was successfully ensured. + * @return PANIC Could not ensure a large enough buffer space. The state + * of the library has become undefined. + */ template< grb::Backend backend = grb::reference, typename T, @@ -296,10 +413,13 @@ namespace grb { static inline grb::RC put( const grb::Vector< T, backend, Coords > &src, const size_t dst_pid, grb::Vector< T, backend, Coords > &dst) { // we only support grb::reference for now static_assert( grb::reference == backend ); + const size_t size = grb::internal::getCoordinates( src ).size(); const size_t bsize = size * sizeof( T ); + const void *src_ptr = reinterpret_cast< const void* >( grb::internal::getRaw( src ) ); + void *dst_ptr = reinterpret_cast< void* >( grb::internal::getRaw( dst ) ); - return put( grb::internal::getRaw( src ), dst_pid, grb::internal::getRaw( dst ), bsize ); + return put( src_ptr, dst_pid, dst_ptr, bsize ); } }; // end class ``rdma'' generic LPF implementation diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index f214a5f66..e76018c33 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -119,7 +119,6 @@ void grbProgram( const size_t &n, grb::RC &rc ) { if( s == 1 ){ sleep(1); for(const auto &i : b ){ - std::cerr << i.first << " " << i.second << std::endl; assert( i.second == v1 ); } } From 08f30cabac575042d03d79f94f3c7ba47d0f5be5 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Wed, 7 Jan 2026 09:42:10 +0100 Subject: [PATCH 12/13] Added deregistration + small bugfix for random crashing with multiple registrations --- include/graphblas/base/rdma.hpp | 10 ++++-- include/graphblas/bsp/rdma.hpp | 49 ++++++++++++++++++++++++++-- include/graphblas/reference/rdma.hpp | 16 +++++++-- tests/unit/rdma.cpp | 5 ++- 4 files changed, 72 insertions(+), 8 deletions(-) diff --git a/include/graphblas/base/rdma.hpp b/include/graphblas/base/rdma.hpp index acf335052..f790ab5ff 100644 --- a/include/graphblas/base/rdma.hpp +++ b/include/graphblas/base/rdma.hpp @@ -51,10 +51,16 @@ namespace grb { class rdma { public: template< typename T > - static inline grb::RC register_global( T &buf ); + static inline grb::RC register_global( const T &buf ); template< typename T > - static inline grb::RC register_global( grb::Vector< T, grb::reference > &buf ); + static inline grb::RC register_global( const grb::Vector< T, grb::reference > &buf ); + + template< typename T > + static inline grb::RC deregister( const T &buf ); + + template< typename T > + static inline grb::RC deregister( const grb::Vector< T, grb::reference > &buf ); static inline grb::RC localRegisterSize( const size_t size ); diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index fc3b1a562..dbad48d0e 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -56,9 +56,15 @@ namespace grb { grb::RC rc = grb::SUCCESS; lpf_memslot_t memslot = LPF_INVALID_MEMSLOT; - rc = rc ? rc : data.ensureMemslotAvailable( 1 ); + const bool buffer_already_registered = (data.registered_slots.find( buf ) != data.registered_slots.end()); + if( buffer_already_registered ){ +#ifdef _DEBUG + std::cerr << "Buffer already registered. Nothing to do." << std::endl; +#endif + return grb::SUCCESS; + } - assert( data.registered_slots.find( buf ) == data.registered_slots.end() ); + rc = rc ? rc : data.ensureMemslotAvailable( 1 ); lpf_rc = lpf_rc ? lpf_rc : lpf_register_global( data.context, const_cast< void* >( buf ), @@ -102,7 +108,6 @@ namespace grb { memslot = it0->second.second; data.registered_slots.erase( it0 ); - // TODO: delete registered memory from map const auto it1 = data.global_memslots.find( memslot ); assert( it1 != data.global_memslots.end() ); data.global_memslots.erase( it1 ); @@ -291,6 +296,44 @@ namespace grb { return register_global( raw_ptr, bsize ); } + /* + * Deregisters a global buffer on a POD variable. + * + * \warning This is a collective operation, therefore it must be called by all the processes! + * + * @param[in] buf Scalar to be deregistered + * + * @return grb::SUCCESS When registration is completed successfully + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< typename T > + static inline grb::RC deregister( const T &buf ) { + return deregister( reinterpret_cast< const void* >( &buf ) ); + } + + /* + * Deregisters a global buffer. + * + * \warning This is a collective operation, therefore it must be called by all the processes! + * + * @param[in] buf Pointer to the start of the buffer of memory to be deregistered. + * + * @return grb::SUCCESS When registration is completed successfully + * @return grb::PANIC When an unrecoverable error occurs. When this value is + * returned, the library enters an undefined state. + */ + template< + grb::Backend backend = grb::reference, + typename T, + typename Coords + > + static inline grb::RC deregister( const grb::Vector< T, backend, Coords > &buf ) { + const void *raw_ptr = reinterpret_cast< const void* >( grb::internal::getRaw( buf ) ); + + return deregister( raw_ptr ); + } + /* * Reserve space for size additional registers. This function should be used * before calling put and/or get. These RDMA functions automatically register diff --git a/include/graphblas/reference/rdma.hpp b/include/graphblas/reference/rdma.hpp index 0db335cae..47b28a5ab 100644 --- a/include/graphblas/reference/rdma.hpp +++ b/include/graphblas/reference/rdma.hpp @@ -34,13 +34,25 @@ namespace grb { class rdma< reference > { public: template< typename T > - static inline grb::RC register_global( T &buf ) { + static inline grb::RC register_global( const T &buf ) { (void) buf; return grb::SUCCESS; } template< typename T > - static inline grb::RC register_global( grb::Vector< T, grb::reference > &buf ) { + static inline grb::RC register_global( const grb::Vector< T, grb::reference > &buf ) { + (void) buf; + return grb::SUCCESS; + } + + template< typename T > + static inline grb::RC deregister( const T &buf ) { + (void) buf; + return grb::SUCCESS; + } + + template< typename T > + static inline grb::RC deregister( const grb::Vector< T, grb::reference > &buf ) { (void) buf; return grb::SUCCESS; } diff --git a/tests/unit/rdma.cpp b/tests/unit/rdma.cpp index e76018c33..c893ab67d 100644 --- a/tests/unit/rdma.cpp +++ b/tests/unit/rdma.cpp @@ -62,6 +62,7 @@ void grbProgram( const size_t &n, grb::RC &rc ) { } rc = rc ? rc : grb::spmd<>::sync(); assert( s != 1 || y == 0 ); + rc = rc ? rc : grb::rdma< >::deregister( x ); rc = rc ? rc : grb::spmd<>::sync(); if( rc != grb::SUCCESS ) return; @@ -79,7 +80,7 @@ void grbProgram( const size_t &n, grb::RC &rc ) { } rc = rc ? rc : grb::set( a, static_cast( s ) ); - rc = rc ? rc : grb::set( b, static_cast( 0 ) ); + rc = rc ? rc : grb::set( b, static_cast( s+1 ) ); rc = rc ? rc : grb::rdma<>::register_global( a ); assert( rc == grb::SUCCESS ); @@ -122,6 +123,8 @@ void grbProgram( const size_t &n, grb::RC &rc ) { assert( i.second == v1 ); } } + rc = rc ? rc : grb::rdma<>::deregister( a ); + rc = rc ? rc : grb::spmd<>::sync(); return; } From ca8f86ef19f303783b876dfcbedf3f5cb6885940 Mon Sep 17 00:00:00 2001 From: Giovanni Gaio <48856010+GiovaGa@users.noreply.github.com> Date: Wed, 7 Jan 2026 10:14:17 +0100 Subject: [PATCH 13/13] Clearer registered slot backend + remember also local slots --- include/graphblas/bsp/rdma.hpp | 29 ++++++++++++++----------- include/graphblas/bsp1d/init.hpp | 37 ++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/include/graphblas/bsp/rdma.hpp b/include/graphblas/bsp/rdma.hpp index dbad48d0e..daef166be 100644 --- a/include/graphblas/bsp/rdma.hpp +++ b/include/graphblas/bsp/rdma.hpp @@ -73,7 +73,7 @@ namespace grb { lpf_rc = lpf_rc ? lpf_rc : lpf_sync( data.context, LPF_SYNC_DEFAULT ); data.signalMemslotTaken(); - data.registered_slots.insert({ buf, std::make_pair( size, memslot ) }); + data.registered_slots.insert({ buf, grb::internal::registered_slot(buf, memslot, size, true ) }); data.global_memslots.insert({ memslot, buf }); if( lpf_rc == LPF_SUCCESS ) { @@ -105,7 +105,7 @@ namespace grb { const auto it0 = data.registered_slots.find( buf ); assert( it0 != data.registered_slots.end() ); - memslot = it0->second.second; + memslot = it0->second.slot; data.registered_slots.erase( it0 ); const auto it1 = data.global_memslots.find( memslot ); @@ -163,18 +163,20 @@ namespace grb { { const auto it = data.registered_slots.find( src ); assert( it != data.registered_slots.end() ); - assert( it->second.first >= size ); - src_memslot = it->second.second; + assert( it->second.size >= size ); + assert( it->second.global ); + src_memslot = it->second.slot; } const auto it = data.registered_slots.find( dst ); if( it == data.registered_slots.end() ){ lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( dst ), size, &dst_memslot ); data.signalMemslotTaken(); + data.registered_slots.insert({dst, grb::internal::registered_slot( dst, dst_memslot, size, false )}); } else { - assert( it->first == dst ); - assert( it->second.first >= size ); - dst_memslot = it->second.second; + assert( it->second.buf == dst ); + assert( it->second.size >= size ); + dst_memslot = it->second.slot; } lpf_rc = lpf_rc ? lpf_rc : lpf_get( data.context, src_pid, src_memslot , 0, dst_memslot, 0, size, lpf_attr ); @@ -229,18 +231,21 @@ namespace grb { { const auto it = data.registered_slots.find( dst ); assert( it != data.registered_slots.end() ); - assert( it->second.first >= size ); - dst_memslot = it->second.second; + assert( it->second.buf == dst ); + assert( it->second.size >= size ); + assert( it->second.global ); + dst_memslot = it->second.slot; } const auto it = data.registered_slots.find( src ); if( it == data.registered_slots.end() ){ lpf_rc = lpf_rc ? lpf_rc : lpf_register_local( data.context, const_cast< void* >( src ), size, &src_memslot ); data.signalMemslotTaken(); + data.registered_slots.insert({src, grb::internal::registered_slot( src, src_memslot, size, false )}); } else { - assert( it->first == src ); - assert( it->second.first >= size ); // is there enough space? - src_memslot = it->second.second; + assert( it->second.buf == src ); + assert( it->second.size >= size ); // is there enough space? + src_memslot = it->second.slot; } lpf_rc = lpf_rc ? lpf_rc : lpf_put( data.context, src_memslot, 0, dst_pid, dst_memslot, 0, size, lpf_attr ); diff --git a/include/graphblas/bsp1d/init.hpp b/include/graphblas/bsp1d/init.hpp index ce5eebf23..e49ee5b57 100644 --- a/include/graphblas/bsp1d/init.hpp +++ b/include/graphblas/bsp1d/init.hpp @@ -56,11 +56,11 @@ namespace grb { size_t size; get_request( - lpf_pid_t src_pid, - lpf_memslot_t src, - size_t src_offset, + const lpf_pid_t src_pid, + const lpf_memslot_t src, + const size_t src_offset, const void * dst, - size_t size + const size_t size ) : src_pid(src_pid) , src(src) @@ -80,10 +80,10 @@ namespace grb { put_request( const void * src, - lpf_pid_t dst_pid, - lpf_memslot_t dst, - size_t dst_offset, - size_t size + const lpf_pid_t dst_pid, + const lpf_memslot_t dst, + const size_t dst_offset, + const size_t size ) : src(src) , dst_pid(dst_pid) @@ -93,6 +93,25 @@ namespace grb { {} }; + struct registered_slot { + const void * buf; + lpf_memslot_t slot; + size_t size; + bool global; + + registered_slot( + const void * buf, + const lpf_memslot_t slot, + const size_t size, + const bool global + ) + : buf(buf) + , slot(slot) + , size(size) + , global(global) + {} + }; + /** * These are all user process local data elements required to successfully * execute parallel ALP calls. @@ -223,7 +242,7 @@ namespace grb { utils::DMapper< uintptr_t > mapper; /** Map of globally registered addresses. */ - std::map< const void* , std::pair< const size_t, const lpf_memslot_t > > registered_slots; + std::map< const void* , registered_slot > registered_slots; /** Map of registered memory slots to their address. */ std::map< const lpf_memslot_t , const void* > global_memslots;