From 20a04005b34d399fd91c2814b9234e9857c9ecde Mon Sep 17 00:00:00 2001 From: xuzhenbao Date: Tue, 20 Jan 2026 21:36:02 +0800 Subject: [PATCH 1/3] Add remote service ranking feature --- .../gtest/src/RsaJsonRpcUnitTestSuite.cc | 30 +- .../src/rsa_json_rpc_proxy_impl.c | 44 +- .../topology_manager/CMakeLists.txt | 2 +- .../topology_manager/gtest/CMakeLists.txt | 4 +- .../TopologyManagerErrorInjectionTestSuite.cc | 135 +++++ .../gtest/src/TopologyManagerTestSuite.cc | 287 ++++++++++ .../src/TopologyManagerTestSuiteBaseClass.h | 15 +- .../topology_manager/src/topology_manager.c | 520 ++++++++++++------ libs/utils/src/array_list.c | 2 +- 9 files changed, 829 insertions(+), 210 deletions(-) diff --git a/bundles/remote_services/rsa_rpc_json/gtest/src/RsaJsonRpcUnitTestSuite.cc b/bundles/remote_services/rsa_rpc_json/gtest/src/RsaJsonRpcUnitTestSuite.cc index 9c5bc660b..eebd254bc 100644 --- a/bundles/remote_services/rsa_rpc_json/gtest/src/RsaJsonRpcUnitTestSuite.cc +++ b/bundles/remote_services/rsa_rpc_json/gtest/src/RsaJsonRpcUnitTestSuite.cc @@ -44,6 +44,8 @@ #include "celix_framework_version.h" #include #include +#include +#include extern "C" { #include "remote_interceptors_handler.h" } @@ -360,14 +362,6 @@ class RsaJsonRpcProxyUnitTestSuite2 : public RsaJsonRpcProxyUnitTestSuite { long proxyId{-1}; }; -TEST_F(RsaJsonRpcProxyUnitTestSuite, FailedToCreateSendRequestMethodLock) { - celix_autoptr(endpoint_description_t) endpoint = CreateEndpointDescription(); - long proxyId = -1L; - celix_ei_expect_celixThreadRwlock_create((void*)&rsaJsonRpcProxy_factoryCreate, 0, CELIX_ENOMEM); - auto status = rsaJsonRpc_createProxy(jsonRpc.get(), endpoint, SendRequest, nullptr, &proxyId); - EXPECT_EQ(CELIX_ENOMEM, status); -} - TEST_F(RsaJsonRpcProxyUnitTestSuite, FailedToCreateProxiesHashMap) { auto endpoint = CreateEndpointDescription(); long proxyId = -1L; @@ -458,14 +452,26 @@ TEST_F(RsaJsonRpcProxyUnitTestSuite2, InvokeProxyServiceWithInvalidParams) { } TEST_F(RsaJsonRpcProxyUnitTestSuite2, InvokeProxyServiceWhenProxyIsDestroying) { - auto found = celix_bundleContext_useService(ctx.get(), RSA_RPC_JSON_TEST_SERVICE, this, [](void *handle, void *svc) { - auto self = static_cast(handle); - rsaJsonRpc_destroyProxy(self->jsonRpc.get(), self->proxyId); + std::atomic syncStatus{0}; + std::thread t([this, &syncStatus](){ + while (syncStatus.load() != 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + syncStatus.store(2); + rsaJsonRpc_destroyProxy(jsonRpc.get(), proxyId); + }); + auto found = celix_bundleContext_useService(ctx.get(), RSA_RPC_JSON_TEST_SERVICE, &syncStatus, [](void *handle, void *svc) { + auto syncStatus = static_cast*>(handle); + syncStatus->store(1); + while (syncStatus->load() != 2) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } auto proxySvc = static_cast(svc); EXPECT_NE(nullptr, proxySvc); - EXPECT_EQ(CELIX_ILLEGAL_STATE, proxySvc->test(proxySvc->handle)); + EXPECT_EQ(CELIX_SUCCESS, proxySvc->test(proxySvc->handle)); }); EXPECT_TRUE(found); + t.join(); } TEST_F(RsaJsonRpcProxyUnitTestSuite2, FailedToPrepareInvokeRequest) { diff --git a/bundles/remote_services/rsa_rpc_json/src/rsa_json_rpc_proxy_impl.c b/bundles/remote_services/rsa_rpc_json/src/rsa_json_rpc_proxy_impl.c index a023d2eaf..a0a065e5c 100644 --- a/bundles/remote_services/rsa_rpc_json/src/rsa_json_rpc_proxy_impl.c +++ b/bundles/remote_services/rsa_rpc_json/src/rsa_json_rpc_proxy_impl.c @@ -46,7 +46,6 @@ struct rsa_json_rpc_proxy_factory { endpoint_description_t *endpointDesc; celix_long_hash_map_t *proxies;//Key:requestingBundle, Value: rsa_json_rpc_proxy_t *. Work on the celix_event thread , so locks are not required remote_interceptors_handler_t *interceptorsHandler; - celix_thread_rwlock_t sendRequestLock; //protects sendRequest celix_rsa_send_request_fp sendRequest; void* sendRequestHandle; }; @@ -65,7 +64,6 @@ static void rsaJsonRpcProxy_ungetService(void *handle, const celix_bundle_t *req static celix_status_t rsaJsonRpcProxy_create(rsa_json_rpc_proxy_factory_t *proxyFactory, const celix_bundle_t *requestingBundle, rsa_json_rpc_proxy_t **proxyOut); static void rsaJsonRpcProxy_destroy(rsa_json_rpc_proxy_t *proxy); -static void rsaJsonRpcProxy_unregisterFacSvcDone(void *data); celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx, celix_log_helper_t* logHelper, @@ -95,14 +93,6 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx, proxyFactory->sendRequestHandle = sendRequestHandle; proxyFactory->serialProtoId = serialProtoId; - - celix_status_t status = celixThreadRwlock_create(&proxyFactory->sendRequestLock, NULL); - if (status != CELIX_SUCCESS) { - celix_logHelper_error(logHelper, "Proxy: Error creating sendRequest lock. %d", status); - return status; - } - celix_autoptr(celix_thread_rwlock_t) sendRequestLock = &proxyFactory->sendRequestLock; - CELIX_BUILD_ASSERT(sizeof(long) == sizeof(void*)); // The hash_map uses the pointer as key, so this should be true celix_autoptr(celix_long_hash_map_t) proxies = proxyFactory->proxies = celix_longHashMap_create(); if (proxyFactory->proxies == NULL) { @@ -121,7 +111,7 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx, proxyFactory->factory.getService = rsaJsonRpcProxy_getService; proxyFactory->factory.ungetService = rsaJsonRpcProxy_ungetService; celix_properties_t* svcProperties = NULL; - status = celix_rsaUtils_createServicePropertiesFromEndpointProperties(endpointDesc->properties, &svcProperties); + celix_status_t status = celix_rsaUtils_createServicePropertiesFromEndpointProperties(endpointDesc->properties, &svcProperties); if (status != CELIX_SUCCESS) { return status; } @@ -135,34 +125,21 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx, celix_steal_ptr(endpointDescCopy); celix_steal_ptr(proxies); - celix_steal_ptr(sendRequestLock); *proxyFactoryOut = celix_steal_ptr(proxyFactory); return CELIX_SUCCESS; } void rsaJsonRpcProxy_factoryDestroy(rsa_json_rpc_proxy_factory_t *proxyFactory) { assert(proxyFactory != NULL); - { - celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&proxyFactory->sendRequestLock); - proxyFactory->sendRequest = NULL; - } - celix_bundleContext_unregisterServiceAsync(proxyFactory->ctx, proxyFactory->factorySvcId, - proxyFactory, rsaJsonRpcProxy_unregisterFacSvcDone); -} - -long rsaJsonRpcProxy_factorySvcId(rsa_json_rpc_proxy_factory_t *proxyFactory) { - return proxyFactory->factorySvcId; -} - -static void rsaJsonRpcProxy_unregisterFacSvcDone(void *data) { - assert(data); - rsa_json_rpc_proxy_factory_t *proxyFactory = (rsa_json_rpc_proxy_factory_t *)data; + celix_bundleContext_unregisterService(proxyFactory->ctx, proxyFactory->factorySvcId); endpointDescription_destroy(proxyFactory->endpointDesc); assert(celix_longHashMap_size(proxyFactory->proxies) == 0); celix_longHashMap_destroy(proxyFactory->proxies); - celixThreadRwlock_destroy(&proxyFactory->sendRequestLock); free(proxyFactory); - return; +} + +long rsaJsonRpcProxy_factorySvcId(rsa_json_rpc_proxy_factory_t *proxyFactory) { + return proxyFactory->factorySvcId; } static void* rsaJsonRpcProxy_getService(void *handle, const celix_bundle_t *requestingBundle, @@ -244,15 +221,8 @@ static void rsaJsonRpcProxy_serviceFunc(void *userData, void *args[], void *retu proxyFactory->endpointDesc->properties, dynFunction_getName(entry->dynFunc), &metadata); if (cont) { struct iovec requestIovec = {invokeRequest,strlen(invokeRequest) + 1}; - celixThreadRwlock_readLock(&proxyFactory->sendRequestLock); - if (proxyFactory->sendRequest != NULL) { - status = proxyFactory->sendRequest(proxyFactory->sendRequestHandle, proxyFactory->endpointDesc, + status = proxyFactory->sendRequest(proxyFactory->sendRequestHandle, proxyFactory->endpointDesc, metadata, &requestIovec, &replyIovec); - } else { - status = CELIX_ILLEGAL_STATE; - celix_logHelper_warning(proxyFactory->logHelper,"Maybe the \"%s\" service is stopping.", proxyFactory->endpointDesc->serviceName); - } - celixThreadRwlock_unlock(&proxyFactory->sendRequestLock); if (status == CELIX_SUCCESS && dynFunction_hasReturn(entry->dynFunc)) { if (replyIovec.iov_base != NULL) { int rsErrno = CELIX_SUCCESS; diff --git a/bundles/remote_services/topology_manager/CMakeLists.txt b/bundles/remote_services/topology_manager/CMakeLists.txt index ca742341e..6e43433f8 100644 --- a/bundles/remote_services/topology_manager/CMakeLists.txt +++ b/bundles/remote_services/topology_manager/CMakeLists.txt @@ -22,7 +22,7 @@ add_celix_bundle(rsa_topology_manager src/topology_manager.c src/scope.c src/activator.c - VERSION 0.9.0 + VERSION 0.10.0 SYMBOLIC_NAME "apache_celix_rs_topology_manager" GROUP "Celix/RSA" NAME "Apache Celix RS Topology Manager" diff --git a/bundles/remote_services/topology_manager/gtest/CMakeLists.txt b/bundles/remote_services/topology_manager/gtest/CMakeLists.txt index f063854e6..ec77f6abc 100644 --- a/bundles/remote_services/topology_manager/gtest/CMakeLists.txt +++ b/bundles/remote_services/topology_manager/gtest/CMakeLists.txt @@ -49,12 +49,14 @@ if (EI_TESTS) Celix::framework Celix::threads_ei # Celix::bundle_ctx_ei -# Celix::string_hash_map_ei + Celix::string_hash_map_ei Celix::long_hash_map_ei Celix::array_list_ei Celix::properties_ei Celix::utils_ei Celix::malloc_ei + Celix::filter_ei + Celix::asprintf_ei GTest::gtest GTest::gtest_main ) diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc index 1faddaec2..b8f468e59 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerErrorInjectionTestSuite.cc @@ -26,6 +26,9 @@ #include "celix_properties_ei.h" #include "celix_utils_ei.h" #include "celix_threads_ei.h" +#include "celix_string_hash_map_ei.h" +#include "celix_filter_ei.h" +#include "asprintf_ei.h" class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test { public: @@ -45,6 +48,10 @@ class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test { celix_ei_expect_calloc(nullptr, 0, nullptr); celix_ei_expect_celixThreadMutex_create(nullptr, 0, 0); celix_ei_expect_celix_longHashMap_create(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_create(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_putLong(nullptr, 1, 0); + celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_createWithOptions(nullptr, 0, nullptr); } std::shared_ptr fw{}; @@ -60,6 +67,42 @@ TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AllocingMemoryErrorTest) EXPECT_EQ(CELIX_ENOMEM, status); } +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceRankingOffsetMapErrorTest) { + celix_ei_expect_celix_stringHashMap_create((void*)topologyManager_create, 0, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, ImportedServiceRankingOffsetStringDuplicationErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1); + celix_ei_expect_celix_utils_strdup((void*)topologyManager_create, 1, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AddingImportedServiceRankingOffsetErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1); + celix_ei_expect_celix_stringHashMap_putLong((void*)topologyManager_create, 1, ENOMEM); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(ENOMEM, status); + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceMapErrorTest) { + celix_ei_expect_celix_stringHashMap_createWithOptions((void*)topologyManager_create, 0, nullptr); + void *scope{}; + topology_manager_t *tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(CELIX_ENOMEM, status); +} + TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingMutexErrorTest) { celix_ei_expect_celixThreadMutex_create((void*)topologyManager_create, 0, CELIX_ENOMEM); void *scope{}; @@ -121,6 +164,12 @@ class TopologyManagerErrorInjectionTestSuite : public TopologyManagerTestSuiteBa celix_ei_expect_celix_properties_set(nullptr, 0, 0); celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr); celix_ei_expect_celix_arrayList_add(nullptr, 0, 0); + celix_ei_expect_celix_filter_create(nullptr, 0, nullptr); + celix_ei_expect_celix_properties_getAsStringArrayList(nullptr, 0, 0); + celix_ei_expect_celix_properties_setLong(nullptr, 0, 0); + celix_ei_expect_asprintf(nullptr, 0, 0); + celix_ei_expect_celix_arrayList_createWithOptions(nullptr, 0, nullptr); + celix_ei_expect_celix_stringHashMap_put(nullptr, 0, 0); } void TestExportServiceFailure(void (*errorInject)(void)) { @@ -263,5 +312,91 @@ TEST_F(TopologyManagerErrorInjectionTestSuite, AddDynamicIpEndpointToListErrorTe }); } +TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingFilterErrorWhenImportScopeChangedTest) { + celix_ei_expect_celix_filter_create(CELIX_EI_UNKNOWN_CALLER, 0, nullptr, 2); + auto status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, AllocingMemoryForImportedServiceEntryErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_ENOMEM, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, GettingServiceImportedConfigsErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_getAsStringArrayList(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + celix_properties_unset(importEndpoint->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_ENOMEM, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, CopyImportedServicePropertiesErrorTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_copy(CELIX_EI_UNKNOWN_CALLER, 0, nullptr);//called in endpointDescription_clone + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, SettingImportedServiceRankingErrorTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_properties_setLong(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM);//called in endpointDescription_clone + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, GeneratingImportsKeyFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_asprintf(CELIX_EI_UNKNOWN_CALLER, 0, -1); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + celix_ei_expect_asprintf(CELIX_EI_UNKNOWN_CALLER, 0, -1); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + }); +} + +TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingImportsListFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_arrayList_createWithOptions(CELIX_EI_UNKNOWN_CALLER, 0, nullptr); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} +TEST_F(TopologyManagerErrorInjectionTestSuite, AddingImportsListToMapFailureTest) { + TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) { + celix_ei_expect_celix_stringHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM); + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(ENOMEM, status); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} \ No newline at end of file diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc index 6f3d028a1..4f3b4a9d5 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuite.cc @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include "TopologyManagerTestSuiteBaseClass.h" @@ -469,4 +470,290 @@ TEST_F(TopologyManagerTestSuite, ImportService2Test) { status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); EXPECT_EQ(CELIX_SUCCESS, status); }); +} + +TEST_F(TopologyManagerTestSuite, ImportHigherRankingEndpointTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10,tm_test_config_type2=20", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, "tm_test_config_type2"); + importEndpoint2->id = celix_properties_get(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, nullptr); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, ImportLowerRankingEndpointTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=20,tm_test_config_type2=10", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, "tm_test_config_type2"); + importEndpoint2->id = celix_properties_get(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, nullptr); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, AddRsaAfterImportServiceWithMultiEndpointsTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10,tm_test_config_type2=20", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + //first add the rsa and then the import endpoint + auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, "tm_test_config_type2"); + importEndpoint2->id = celix_properties_get(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, nullptr); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type2)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, CloseImportsTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + status = topologyManager_closeImports(tm); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + //Cannot add new imports after close + celix_autoptr(endpoint_description_t) importEndpoint2 = endpointDescription_clone(importEndpoint); + celix_properties_set(importEndpoint2->properties, CELIX_RSA_ENDPOINT_ID, "319bddfa-0252-4654-a3bd-298354d30208"); + status = topologyManager_addImportedService(tm, importEndpoint2, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportScopeChangedTest) { + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type2)"); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_service_filter_options_t opts{}; + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = tm_removeImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)"); + EXPECT_EQ(CELIX_SUCCESS, status); + opts.filter = (char *)"(service.imported.configs=tm_test_config_type)"; + svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_EQ(svcId, -1); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); +} + +TEST_F(TopologyManagerTestSuite, ImportedServiceRankingOverflowUpwardTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=1", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_properties_setLong(importEndpoint->properties, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MAX); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + celix_service_filter_options_t opts{}; + char filter[256]; + (void)snprintf(filter, 256, "(&(%s=tm_test_config_type)(%s=%ld))", CELIX_RSA_SERVICE_IMPORTED_CONFIGS, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MAX); + opts.filter = filter; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); +} + +TEST_F(TopologyManagerTestSuite, ImportedServiceRankingOverflowDownwardTest) { + setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=-1", 1); + tm.reset(); + void* scope = nullptr; + topology_manager_t* tmPtr{}; + auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); + EXPECT_EQ(status, CELIX_SUCCESS); + tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + + TestImportService([this](topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint) { + auto status = topologyManager_rsaAdded(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + celix_properties_setLong(importEndpoint->properties, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MIN); + status = topologyManager_addImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + + celix_service_filter_options_t opts{}; + char filter[256]; + (void)snprintf(filter, 256, "(&(%s=tm_test_config_type)(%s=%ld))", CELIX_RSA_SERVICE_IMPORTED_CONFIGS, CELIX_FRAMEWORK_SERVICE_RANKING, LONG_MIN); + opts.filter = filter; + auto svcId = celix_bundleContext_findServiceWithOptions(ctx.get(), &opts); + EXPECT_GE(svcId, 0); + + status = topologyManager_removeImportedService(tm, importEndpoint, nullptr); + EXPECT_EQ(CELIX_SUCCESS, status); + status = topologyManager_rsaRemoved(tm, rsaSvcRef, rsaSvc); + EXPECT_EQ(CELIX_SUCCESS, status); + }); + + unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS"); } \ No newline at end of file diff --git a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h index 27540175e..5c40ad29b 100644 --- a/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h +++ b/bundles/remote_services/topology_manager/gtest/src/TopologyManagerTestSuiteBaseClass.h @@ -24,6 +24,8 @@ extern "C" { #endif #include +#include +#include #include #include "celix_errno.h" #include "celix_constants.h" @@ -38,6 +40,7 @@ extern "C" { struct import_registration { endpoint_description_t *endpoint; + long proxyServiceId; }; struct export_reference { @@ -71,6 +74,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope); EXPECT_EQ(status, CELIX_SUCCESS); tm = std::shared_ptr{tmPtr, [](auto t) {topologyManager_destroy(t);}}; + tms = std::unique_ptr{scope, [](void*) {}}; } ~TopologyManagerTestSuiteBaseClass() = default; @@ -217,7 +221,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { } } - void TestImportService(void (*testBody)(topology_manager_t* tm, service_reference_pt rsaSvcRef, void* rsaSvc, endpoint_description_t *importEndpoint)) { + void TestImportService(const std::function &testBody) { remote_service_admin_t rsa{}; rsa.ctx = ctx.get(); remote_service_admin_service_t rsaSvc{}; @@ -227,10 +231,18 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { auto importReg = (import_registration_t*)calloc(1, sizeof(import_registration_t)); importReg->endpoint = endpoint; *registration = importReg; + static struct TmTestService { + void* handle; + } tmTestService; + celix_properties_t* props = celix_properties_copy(endpoint->properties); + EXPECT_NE(nullptr, props); + importReg->proxyServiceId = celix_bundleContext_registerService(admin->ctx, &tmTestService, "tmTestImportedService", props); + EXPECT_GE(importReg->proxyServiceId, 0); return CELIX_SUCCESS; }; rsaSvc.importRegistration_close = [](remote_service_admin_t* admin, import_registration_t* registration) -> celix_status_t { (void)admin; + celix_bundleContext_unregisterService(admin->ctx, registration->proxyServiceId); free(registration); return CELIX_SUCCESS; }; @@ -265,6 +277,7 @@ class TopologyManagerTestSuiteBaseClass : public ::testing::Test { std::shared_ptr ctx{}; std::shared_ptr logHelper{}; std::shared_ptr tm{}; + std::unique_ptr tms{nullptr, [](void*) {}}; }; #ifdef __cplusplus diff --git a/bundles/remote_services/topology_manager/src/topology_manager.c b/bundles/remote_services/topology_manager/src/topology_manager.c index 799017b8a..8fd4c3d06 100644 --- a/bundles/remote_services/topology_manager/src/topology_manager.c +++ b/bundles/remote_services/topology_manager/src/topology_manager.c @@ -28,6 +28,7 @@ #include #include #include +#include #include @@ -47,14 +48,16 @@ #include "service_reference.h" #include "service_registration.h" #include "celix_log_helper.h" -#include "topology_manager.h" #include "scope.h" #include "hash_map.h" #include "celix_array_list.h" +#include "celix_string_hash_map.h" +#include "celix_convert_utils.h" //The prefix of the config property which is used to store the interfaces of a port. e.g. CELIX_RSA_INTERFACES_OF_PORT_8080. The value is a comma-separated list of interface names. #define CELIX_RSA_INTERFACES_OF_PORT_PREFIX "CELIX_RSA_INTERFACES_OF_PORT_" +#define CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS "CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS" typedef struct celix_rsa_service_entry { remote_service_admin_service_t* rsa; @@ -66,6 +69,13 @@ typedef struct celix_exported_service_entry { celix_long_hash_map_t* registrations; //key:rsa service id, val:celix_array_list_t } celix_exported_service_entry_t; +typedef struct celix_imported_service_entry { + endpoint_description_t* endpoint; + import_registration_t* registration; + long rsaSvcId; + long ranking; +} celix_imported_service_entry_t; + struct topology_manager { celix_bundle_context_t *context; @@ -77,7 +87,7 @@ struct topology_manager { celix_long_hash_map_t* exportedServices;//key:service id, val:celix_exported_service_entry_t* - hash_map_pt importedServices; + celix_string_hash_map_t* importedServices;//key:the string of -, val:celix_array_list_t bool closed; @@ -87,10 +97,11 @@ struct topology_manager { scope_pt scope; celix_log_helper_t *loghelper; + celix_string_hash_map_t *importedServiceRankingOffsets; //key:config type, val:ranking offset(Type is long) }; -celix_status_t topologyManager_exportScopeChanged(void *handle, char *service_name); -celix_status_t topologyManager_importScopeChanged(void *handle, char *service_name); +celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr); +celix_status_t topologyManager_importScopeChanged(void *handle, char *filterStr); static celix_status_t topologyManager_notifyListenersEndpointAdded(topology_manager_pt manager, remote_service_admin_service_t *rsa, celix_array_list_t *registrations); static celix_status_t topologyManager_notifyListenersEndpointRemoved(topology_manager_pt manager, remote_service_admin_service_t *rsa, export_registration_t *export); @@ -99,6 +110,37 @@ static celix_status_t topologyManager_addImportedService_nolock(void *handle, en static celix_status_t topologyManager_removeImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter); static celix_status_t topologyManager_addExportedService_nolock(void * handle, service_reference_pt reference); static void topologyManager_removeExportedService_nolock(void * handle, service_reference_pt reference); +static void topologyManager_closeImportRegistration(topology_manager_t* manager, celix_imported_service_entry_t* importedEntry); + +static celix_status_t parseRankingOffsetsString(const char* offsetsStr, celix_string_hash_map_t* rankingOffsets) { + celix_autofree char* offsetsStrDup = celix_utils_strdup(offsetsStr); + if (offsetsStrDup == NULL) { + return CELIX_ENOMEM; + } + + char* save= NULL; + char* token = strtok_r(offsetsStrDup, ",", &save); + while (token != NULL) { + celix_status_t status = CELIX_ILLEGAL_ARGUMENT; + char *equalSign = strchr(token, '='); + if (equalSign != NULL) { + *equalSign = '\0'; + const char *configType = token; + const char *offsetStr = equalSign + 1; + bool converted = false; + long offset = celix_utils_convertStringToLong(offsetStr, 0, &converted); + if (converted) { + status = celix_stringHashMap_putLong(rankingOffsets, configType, offset); + } + } + if (status != CELIX_SUCCESS) { + return status; + } + token = strtok_r(NULL, ",", &save); + } + + return CELIX_SUCCESS; +} celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log_helper_t *logHelper, topology_manager_pt *manager, void **scope) { celix_status_t status = CELIX_SUCCESS; @@ -112,6 +154,22 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log tm->loghelper = logHelper; tm->closed = false; + celix_autoptr(celix_string_hash_map_t) importedServiceRankingOffsets = tm->importedServiceRankingOffsets = celix_stringHashMap_create(); + if (importedServiceRankingOffsets == NULL) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating string hash map for imported service ranking offsets."); + return CELIX_ENOMEM; + } + const char *offsetsStr = celix_bundleContext_getProperty(context, CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS, NULL); + if (offsetsStr != NULL) { + status = parseRankingOffsetsString(offsetsStr, importedServiceRankingOffsets); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error parsing imported service ranking offsets string '%s'.", offsetsStr); + return status; + } + } + status = celixThreadMutex_create(&tm->lock, NULL); if (status != CELIX_SUCCESS) { celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating mutex."); @@ -145,12 +203,19 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log //TODO remove deprecated hashMap (*manager)->listenerList = hashMap_create(serviceReference_hashCode, NULL, serviceReference_equals2, NULL); - (*manager)->importedServices = hashMap_create(NULL, NULL, NULL, NULL); + celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS; + opts.simpleRemovedCallback = (void*)celix_arrayList_destroy; + celix_autoptr(celix_string_hash_map_t) importedServices = tm->importedServices = celix_stringHashMap_createWithOptions(&opts); + if (importedServices == NULL) { + celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating string hash map for imported services."); + hashMap_destroy(tm->listenerList, false, false); + return CELIX_ENOMEM; + } status = scope_scopeCreate(tm, &tm->scope); if (status != CELIX_SUCCESS) { celix_logHelper_error(logHelper, "TOPOLOGY_MANAGER: Error creating scope."); - hashMap_destroy(tm->importedServices, false, false); hashMap_destroy(tm->listenerList, false, false); return status; } @@ -158,11 +223,13 @@ celix_status_t topologyManager_create(celix_bundle_context_t *context, celix_log scope_setImportScopeChangedCallback(tm->scope, topologyManager_importScopeChanged); *scope = tm->scope; + celix_steal_ptr(importedServices); celix_steal_ptr(exportedServices); celix_steal_ptr(networkIfNames); celix_steal_ptr(dynamicIpEndpoints); celix_steal_ptr(rsaMap); celix_steal_ptr(lock); + celix_steal_ptr(importedServiceRankingOffsets); celix_steal_ptr(tm); return status; @@ -175,7 +242,7 @@ celix_status_t topologyManager_destroy(topology_manager_pt manager) { celixThreadMutex_lock(&manager->lock); - hashMap_destroy(manager->importedServices, false, false); + celix_stringHashMap_destroy(manager->importedServices); hashMap_destroy(manager->listenerList, false, false); assert(celix_longHashMap_size(manager->exportedServices) == 0); @@ -196,52 +263,28 @@ celix_status_t topologyManager_destroy(topology_manager_pt manager) { celixThreadMutex_unlock(&manager->lock); celixThreadMutex_destroy(&manager->lock); + celix_stringHashMap_destroy(manager->importedServiceRankingOffsets); + free(manager); return status; } celix_status_t topologyManager_closeImports(topology_manager_pt manager) { - celix_status_t status; - - status = celixThreadMutex_lock(&manager->lock); - - manager->closed = true; - - hash_map_iterator_pt iter = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - endpoint_description_t *ep = hashMapEntry_getKey(entry); - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports != NULL) { - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", ep->serviceName, ep->id); - hash_map_iterator_pt importsIter = hashMapIterator_create(imports); - - while (hashMapIterator_hasNext(importsIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importsIter); - - remote_service_admin_service_t *rsa = hashMapEntry_getKey(entry); - import_registration_t *import = hashMapEntry_getValue(entry); - - status = rsa->importRegistration_close(rsa->admin, import); - if (status == CELIX_SUCCESS) { - hashMapIterator_remove(importsIter); - } - } - hashMapIterator_destroy(importsIter); - - hashMapIterator_remove(iter); + celix_auto(celix_mutex_lock_guard_t) lockGuard = celixMutexLockGuard_init(&manager->lock); - hashMap_destroy(imports, false, false); - } - } - hashMapIterator_destroy(iter); + manager->closed = true; - status = celixThreadMutex_unlock(&manager->lock); + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_array_list_t* imports = iter.value.ptrValue; + int size = celix_arrayList_size(imports); + for (int i = 0; i < size; ++i) { + topologyManager_closeImportRegistration(manager, celix_arrayList_get(imports, i)); + } + } + celix_stringHashMap_clear(manager->importedServices); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_rsaAdding(void * handle, service_reference_pt reference, void **service) { @@ -535,6 +578,68 @@ static void topologyManager_removeDynamicIpEndpointsForExportedService(topology_ return; } +//XXX: call in locked section +static void topologyManager_closeImportRegistration(topology_manager_t* manager, celix_imported_service_entry_t* importedEntry) { + if (importedEntry->registration != NULL) { + celix_rsa_service_entry_t* rsaSvcEntry = celix_longHashMap_get(manager->rsaMap, importedEntry->rsaSvcId); + remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; + celix_status_t status = rsa->importRegistration_close(rsa->admin, importedEntry->registration); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error closing import registration for imported service (%s; %s), error:%d.", importedEntry->endpoint->serviceName, importedEntry->endpoint->id, status); + } + importedEntry->registration = NULL; + importedEntry->rsaSvcId = -1; + } +} + +//XXX: call in locked section +static void topologyManager_updateActiveImportedService(topology_manager_t* manager, const celix_array_list_t* imports, const char* removingEndpointId, long removingRsaSvcId) { + const int importsSize = celix_arrayList_size(imports); + const celix_imported_service_entry_t* highestRankingEntry = NULL; + for (int i = 0; i < importsSize; ++i) { + celix_imported_service_entry_t* entry = celix_arrayList_get(imports, i); + endpoint_description_t* ep = entry->endpoint; + if (entry->registration != NULL && entry->rsaSvcId != removingRsaSvcId + && !celix_utils_stringEquals(entry->endpoint->id, removingEndpointId)) { + highestRankingEntry = entry;//Imports are in descending order of ranking, so the first found is the highest ranking. + break; + } + + if (entry->registration != NULL || !scope_allowImport(manager->scope, ep) || + celix_utils_stringEquals(entry->endpoint->id, removingEndpointId)) { + continue; + } + + CELIX_LONG_HASH_MAP_ITERATE(manager->rsaMap, iter) { + if (removingRsaSvcId == iter.key) { + continue; + } + celix_rsa_service_entry_t* rsaSvcEntry = iter.value.ptrValue; + import_registration_t* import = NULL; + remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; + celix_status_t status = rsa->importService(rsa->admin, ep, &import); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error importing service (%s; %s), error:%d.", ep->serviceName, ep->id, status); + } else if (import != NULL) { + entry->registration = import; + entry->rsaSvcId = iter.key; + highestRankingEntry = entry; + break; + } + } + if (highestRankingEntry != NULL) { + break; + } + } + + for (int i = 0; i < importsSize; ++i) { + celix_imported_service_entry_t* entry = celix_arrayList_get(imports, i); + if (highestRankingEntry != entry) { + topologyManager_closeImportRegistration(manager, entry); + } + } +} + celix_status_t topologyManager_rsaAdded(void * handle, service_reference_pt rsaSvcRef, void * service) { topology_manager_pt manager = (topology_manager_pt) handle; celix_properties_t *serviceProperties = NULL; @@ -567,29 +672,11 @@ celix_status_t topologyManager_rsaAdded(void * handle, service_reference_pt rsaS celix_steal_ptr(rsaSvcEntry); // add already imported services to new rsa - hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(importedServicesIterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); - endpoint_description_t *endpoint = hashMapEntry_getKey(entry); - if (scope_allowImport(manager->scope, endpoint)) { - import_registration_t *import = NULL; - celix_status_t status = rsa->importService(rsa->admin, endpoint, &import); - if (status == CELIX_SUCCESS) { - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports == NULL) { - imports = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(manager->importedServices, endpoint, imports); - } - - hashMap_put(imports, service, import); - } - } + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_array_list_t* imports = iter.value.ptrValue; + topologyManager_updateActiveImportedService(manager, imports, NULL, -1); } - hashMapIterator_destroy(importedServicesIterator); - // add already exported services to new rsa CELIX_LONG_HASH_MAP_ITERATE(manager->exportedServices, iter) { celix_exported_service_entry_t* svcEntry = iter.value.ptrValue; @@ -634,7 +721,6 @@ celix_status_t topologyManager_rsaModified(void * handle, service_reference_pt r } celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt reference, void * service) { - celix_status_t status = CELIX_SUCCESS; topology_manager_pt manager = (topology_manager_pt) handle; remote_service_admin_service_t *rsa = (remote_service_admin_service_t *) service; long rsaSvcId = serviceReference_getServiceId(reference); @@ -662,22 +748,10 @@ celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt re } } - hash_map_iterator_pt importedSvcIter = hashMapIterator_create(manager->importedServices); - - while (hashMapIterator_hasNext(importedSvcIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedSvcIter); - hash_map_pt imports = hashMapEntry_getValue(entry); - - import_registration_t *import = (import_registration_t *)hashMap_remove(imports, rsa); - - if (import != NULL) { - celix_status_t subStatus = rsa->importRegistration_close(rsa->admin, import); - if (subStatus != CELIX_SUCCESS) { - celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Failed to close imported endpoint."); - } - } + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_array_list_t* imports = iter.value.ptrValue; + topologyManager_updateActiveImportedService(manager, imports, NULL, rsaSvcId); } - hashMapIterator_destroy(importedSvcIter); free(celix_longHashMap_get(manager->rsaMap, rsaSvcId)); celix_longHashMap_remove(manager->rsaMap, rsaSvcId); @@ -686,7 +760,7 @@ celix_status_t topologyManager_rsaRemoved(void * handle, service_reference_pt re celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_INFO, "TOPOLOGY_MANAGER: Removed RSA"); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr) { @@ -748,42 +822,187 @@ celix_status_t topologyManager_exportScopeChanged(void *handle, char *filterStr) return status; } -celix_status_t topologyManager_importScopeChanged(void *handle, char *service_name) { - celix_status_t status = CELIX_SUCCESS; - endpoint_description_t *endpoint; - topology_manager_pt manager = (topology_manager_pt) handle; - bool found = false; +celix_status_t topologyManager_importScopeChanged(void *handle, char *filterStr) { + topology_manager_pt manager = (topology_manager_pt) handle; + celix_autoptr(celix_filter_t) filter = celix_filter_create(filterStr); + if (filter == NULL) { + celix_logHelper_logTssErrors(manager->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(manager->loghelper,"filter creating failed\n"); + return CELIX_ILLEGAL_ARGUMENT; + } - // add already imported services to new rsa - celixThreadMutex_lock(&manager->lock); + celix_auto(celix_mutex_lock_guard_t) lockGuard = celixMutexLockGuard_init(&manager->lock); + + CELIX_STRING_HASH_MAP_ITERATE(manager->importedServices, iter) { + celix_array_list_t* imports = iter.value.ptrValue; + bool found = false; + int size = celix_arrayList_size(imports); + for (int i = 0; i < size; ++i) { + celix_imported_service_entry_t* entry = celix_arrayList_get(imports, i); + if (celix_filter_match(filter, entry->endpoint->properties)) { + topologyManager_closeImportRegistration(manager, entry); + found = true; + } + } + if (found) { + topologyManager_updateActiveImportedService(manager, imports, NULL, -1); + } + } - hash_map_iterator_pt importedServicesIterator = hashMapIterator_create(manager->importedServices); - while (!found && hashMapIterator_hasNext(importedServicesIterator)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importedServicesIterator); - endpoint = hashMapEntry_getKey(entry); + return CELIX_SUCCESS; +} - const char* name = celix_properties_get(endpoint->properties, CELIX_FRAMEWORK_SERVICE_NAME, ""); - // Test if a service with the same name is imported - if (strcmp(name, service_name) == 0) { - found = true; - } - } - hashMapIterator_destroy(importedServicesIterator); +static celix_status_t topologyManager_getRankingOffsetForEndpoint(topology_manager_t* tm, const endpoint_description_t* endpoint, long* offset) { + *offset = 0; + celix_autoptr(celix_array_list_t) serviceImportedConfigs = NULL; + celix_status_t status = celix_properties_getAsStringArrayList(endpoint->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS, NULL, &serviceImportedConfigs); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(tm->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error getting property %s.", CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + return status; + } else if (serviceImportedConfigs == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Endpoint description is missing property %s.", CELIX_RSA_SERVICE_IMPORTED_CONFIGS); + return CELIX_ILLEGAL_ARGUMENT; + } + int size = celix_arrayList_size(serviceImportedConfigs); + for (int i = 0; i < size; ++i) { + const char* config = celix_arrayList_getString(serviceImportedConfigs, i); + if (celix_stringHashMap_hasKey(tm->importedServiceRankingOffsets, config)) { + //According to the OSGi specification, the service.imported.configs property lists configuration types that must refer to the same endpoint. + //So we take the first match. + *offset = celix_stringHashMap_getLong(tm->importedServiceRankingOffsets, config, 0); + break; + } + } + return CELIX_SUCCESS; +} - if (found) { - status = topologyManager_removeImportedService_nolock(manager, endpoint, NULL); +static int willOverflowOnLongAdd(long a, long b) { + if (b > 0 && a > LONG_MAX - b) { + return 1; + } + if (b < 0 && a < LONG_MIN - b) { + return -1; + } + return 0; +} - if (status != CELIX_SUCCESS) { - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_ERROR, "TOPOLOGY_MANAGER: Removal of imported service (%s; %s) failed.", endpoint->serviceName, endpoint->id); - } else { - status = topologyManager_addImportedService_nolock(manager, endpoint, NULL); - } - } +static endpoint_description_t* topologyManager_createAdjustedRankingEndpoint(topology_manager_t* tm, const endpoint_description_t* endpoint) { + long rankingOffset = 0; + celix_status_t status = topologyManager_getRankingOffsetForEndpoint(tm, endpoint, &rankingOffset); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error getting ranking offset for imported service."); + return NULL; + } + celix_autoptr(endpoint_description_t) endpointCopy = endpointDescription_clone(endpoint); + if (endpointCopy == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error cloning endpoint description."); + return NULL; + } + if (rankingOffset != 0) { + //According to OSGi specification, If no service.ranking service property is specified or its type is not Integer, + // then a ranking of 0 must be used. + long originalRanking = celix_properties_getAsLong(endpointCopy->properties, CELIX_FRAMEWORK_SERVICE_RANKING, 0); + long newRanking; + int overflow = willOverflowOnLongAdd(originalRanking, rankingOffset); + if (overflow < 0) { + newRanking = LONG_MIN; + } else if (overflow > 0) { + newRanking = LONG_MAX; + } else { + newRanking = originalRanking + rankingOffset; + } + status = celix_properties_setLong(endpointCopy->properties, CELIX_FRAMEWORK_SERVICE_RANKING, newRanking); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(tm->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error setting property %s.", CELIX_FRAMEWORK_SERVICE_RANKING); + return NULL; + } + } + return celix_steal_ptr(endpointCopy); +} - //should unlock until here ?, avoid endpoint is released during topologyManager_removeImportedService - celixThreadMutex_unlock(&manager->lock); +static celix_imported_service_entry_t* topologyManager_createImportedServiceEntry(topology_manager_t* tm, const endpoint_description_t* endpoint) { + celix_autofree celix_imported_service_entry_t* entry = (celix_imported_service_entry_t*)calloc(1, sizeof(*entry)); + if (entry == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error allocating import registration entry."); + return NULL; + } + entry->rsaSvcId = -1; + celix_autoptr(endpoint_description_t) adjustedRankingEP = entry->endpoint = topologyManager_createAdjustedRankingEndpoint(tm, endpoint); + if (adjustedRankingEP == NULL) { + celix_logHelper_error(tm->loghelper, "TOPOLOGY_MANAGER: Error creating adjusted ranking endpoint description."); + return NULL; + } + entry->ranking = celix_properties_getAsLong(adjustedRankingEP->properties, CELIX_FRAMEWORK_SERVICE_RANKING, 0); + celix_steal_ptr(adjustedRankingEP); + return celix_steal_ptr(entry); +} - return status; +static void topologyManager_importedServiceEntryDestroy(void* entryPtr) { + celix_imported_service_entry_t* entry = entryPtr; + if (entry != NULL) { + endpointDescription_destroy(entry->endpoint); + free(entry); + } +} + +CELIX_DEFINE_AUTOPTR_CLEANUP_FUNC(celix_imported_service_entry_t, topologyManager_importedServiceEntryDestroy) + +static int compareImportedServiceEntryRanking(celix_array_list_entry_t a, celix_array_list_entry_t b) { + long ranking1 = ((celix_imported_service_entry_t*)a.voidPtrVal)->ranking; + long ranking2 = ((celix_imported_service_entry_t*)b.voidPtrVal)->ranking; + return ranking1 == ranking2 ? 0 : (ranking1 < ranking2 ? 1 : -1);// descending sort +} + +//XXX: call in locked section +static celix_status_t topologyManager_addImportedServiceToMap(topology_manager_t* manager, const char* importsKey, celix_imported_service_entry_t* importedEntry) { + celix_autoptr(celix_imported_service_entry_t) importedEntryAutoPtr = importedEntry; + celix_array_list_t* imports = celix_stringHashMap_get(manager->importedServices, importsKey); + if (imports == NULL) { + celix_array_list_create_options_t opts = CELIX_EMPTY_ARRAY_LIST_CREATE_OPTIONS; + opts.elementType = CELIX_ARRAY_LIST_ELEMENT_TYPE_POINTER; + opts.initialCapacity = 2; + opts.compareCallback = compareImportedServiceEntryRanking; + opts.simpleRemovedCallback = topologyManager_importedServiceEntryDestroy; + celix_autoptr(celix_array_list_t) importsAutoPtr = celix_arrayList_createWithOptions(&opts); + if (importsAutoPtr == NULL) { + celix_logHelper_logTssErrors(manager->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error creating array list for imported service entries."); + return ENOMEM; + } + celix_status_t status = celix_stringHashMap_put(manager->importedServices, importsKey, importsAutoPtr); + if ( status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(manager->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding imported service entries to map."); + return status; + } + imports = celix_steal_ptr(importsAutoPtr); + } + celix_status_t status = celix_arrayList_add(imports, celix_steal_ptr(importedEntryAutoPtr)); + if (status != CELIX_SUCCESS) { + celix_logHelper_logTssErrors(manager->loghelper, CELIX_LOG_LEVEL_ERROR); + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding imported service entry to list."); + return status; + } + celix_arrayList_sort(imports); + return CELIX_SUCCESS; +} + +//XXX: call in locked section +static void topologyManager_removeImportedServiceFromMap(topology_manager_t* manager, const char* importsKey, const char* endpointId) { + celix_array_list_t* imports = celix_stringHashMap_get(manager->importedServices, importsKey); + int size = celix_arrayList_size(imports); + for (int i = 0; i < size; ++i) { + celix_imported_service_entry_t* entry = celix_arrayList_get(imports, i); + if (celix_utils_stringEquals(entry->endpoint->id, endpointId)) { + celix_arrayList_removeAt(imports, i); + break; + } + } + if (celix_arrayList_size(imports) == 0) { + celix_stringHashMap_remove(manager->importedServices, importsKey); + } } static celix_status_t topologyManager_addImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { @@ -798,24 +1017,26 @@ static celix_status_t topologyManager_addImportedService_nolock(void *handle, en return CELIX_SUCCESS; } - hash_map_pt imports = hashMap_create(NULL, NULL, NULL, NULL); - hashMap_put(manager->importedServices, endpoint, imports); + celix_autoptr(celix_imported_service_entry_t) importedEntry = topologyManager_createImportedServiceEntry(manager, endpoint); + if (importedEntry == NULL) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error creating import registration entry."); + return ENOMEM; + } + celix_autofree char* importsKey = NULL; + if (asprintf(&importsKey, "%s-%ld", importedEntry->endpoint->frameworkUUID, importedEntry->endpoint->serviceId) < 0) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error allocating key for imported service entries."); + return ENOMEM; + } + status = topologyManager_addImportedServiceToMap(manager, importsKey, celix_steal_ptr(importedEntry)); + if (status != CELIX_SUCCESS) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error adding imported service entry to map."); + return status; + } - if (scope_allowImport(manager->scope, endpoint)) { - CELIX_LONG_HASH_MAP_ITERATE(manager->rsaMap, iter) { - celix_rsa_service_entry_t* rsaSvcEntry = iter.value.ptrValue; - import_registration_t *import = NULL; - remote_service_admin_service_t *rsa = rsaSvcEntry->rsa; - celix_status_t substatus = rsa->importService(rsa->admin, endpoint, &import); - if (substatus == CELIX_SUCCESS) { - hashMap_put(imports, rsa, import); - } else { - status = substatus; - } - } - } + celix_array_list_t* imports = celix_stringHashMap_get(manager->importedServices, importsKey); + topologyManager_updateActiveImportedService(manager, imports, NULL, -1); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_addImportedService(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { @@ -834,40 +1055,25 @@ celix_status_t topologyManager_addImportedService(void *handle, endpoint_descrip } static celix_status_t topologyManager_removeImportedService_nolock(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { - celix_status_t status = CELIX_SUCCESS; - topology_manager_pt manager = handle; + topology_manager_pt manager = handle; - celix_logHelper_log(manager->loghelper, CELIX_LOG_LEVEL_DEBUG, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", endpoint->serviceName, endpoint->id); + celix_logHelper_debug(manager->loghelper, "TOPOLOGY_MANAGER: Remove imported service (%s; %s).", endpoint->serviceName, endpoint->id); - hash_map_iterator_pt iter = hashMapIterator_create(manager->importedServices); - while (hashMapIterator_hasNext(iter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(iter); - endpoint_description_t *ep = hashMapEntry_getKey(entry); - hash_map_pt imports = hashMapEntry_getValue(entry); - - if (imports != NULL && strcmp(endpoint->id, ep->id) == 0) { - hash_map_iterator_pt importsIter = hashMapIterator_create(imports); - - while (hashMapIterator_hasNext(importsIter)) { - hash_map_entry_pt entry = hashMapIterator_nextEntry(importsIter); - remote_service_admin_service_t *rsa = hashMapEntry_getKey(entry); - import_registration_t *import = hashMapEntry_getValue(entry); - celix_status_t substatus = rsa->importRegistration_close(rsa->admin, import); - if (substatus == CELIX_SUCCESS) { - hashMapIterator_remove(importsIter); - } else { - status = substatus; - } - } - hashMapIterator_destroy(importsIter); - hashMapIterator_remove(iter); + celix_autofree char* importsKey = NULL; + if (asprintf(&importsKey, "%s-%ld", endpoint->frameworkUUID, endpoint->serviceId) < 0) { + celix_logHelper_error(manager->loghelper, "TOPOLOGY_MANAGER: Error allocating key for imported service entries."); + return CELIX_ENOMEM; + } + celix_array_list_t* imports= celix_stringHashMap_get(manager->importedServices, importsKey); + if (imports == NULL) { + celix_logHelper_debug(manager->loghelper, "TOPOLOGY_MANAGER: No imported service entries found for service (%s; %s).", endpoint->serviceName, endpoint->id); + return CELIX_SUCCESS; + } + topologyManager_updateActiveImportedService(manager, imports, endpoint->id, -1); - hashMap_destroy(imports, false, false); - } - } - hashMapIterator_destroy(iter); + topologyManager_removeImportedServiceFromMap(manager, importsKey, endpoint->id); - return status; + return CELIX_SUCCESS; } celix_status_t topologyManager_removeImportedService(void *handle, endpoint_description_t *endpoint, char *matchedFilter) { diff --git a/libs/utils/src/array_list.c b/libs/utils/src/array_list.c index 361525f2e..80f4a6c09 100644 --- a/libs/utils/src/array_list.c +++ b/libs/utils/src/array_list.c @@ -198,7 +198,7 @@ celix_array_list_t* celix_arrayList_createWithOptions(const celix_array_list_cre return NULL; } - list->capacity = 10; + list->capacity = opts->initialCapacity != 0 ? opts->initialCapacity : 10; list->elementData = calloc(list->capacity, sizeof(celix_array_list_entry_t)); if (!list->elementData) { celix_err_push("Failed to allocate memory for elementData"); From 0352a75e733a669391f6cc605b7f9da170513252 Mon Sep 17 00:00:00 2001 From: xuzhenbao Date: Sun, 15 Mar 2026 22:06:04 +0800 Subject: [PATCH 2/3] Add comments for CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS --- bundles/remote_services/README.md | 6 +++--- .../remote_services/topology_manager/src/topology_manager.c | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bundles/remote_services/README.md b/bundles/remote_services/README.md index bc090a9c7..7bbd83540 100644 --- a/bundles/remote_services/README.md +++ b/bundles/remote_services/README.md @@ -27,9 +27,9 @@ The Remote Service Admin Service subproject contains an adapted implementation o The topology manager decides which services should be imported and exported according to a defined policy. Currently, only one policy is implemented in Celix, the *promiscuous* policy, which simply imports and exports all services. -| **Bundle** | `Celix::rsa_topology_manager` | -|--|-----------------------------------------| -| **Configuration** | *None* | +| **Bundle** | `Celix::rsa_topology_manager` | +|--|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Configuration** | `CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS`: defines the ranking offsets for imported services. The value of this property is a comma-separated list of `=`. The `` is the value of the `service.exported.configs` property of the service registration, and the `` is an integer that is added to the ranking of the imported service. This allows you to influence the ranking of imported services based on their configuration type. For example, if you want to give a higher ranking to remote services with configuration type "shm" than remote services with configuration type "http", you can set this property to `http=-2,shm=-1`. | ### Remote Service Admin diff --git a/bundles/remote_services/topology_manager/src/topology_manager.c b/bundles/remote_services/topology_manager/src/topology_manager.c index 8fd4c3d06..d8556297d 100644 --- a/bundles/remote_services/topology_manager/src/topology_manager.c +++ b/bundles/remote_services/topology_manager/src/topology_manager.c @@ -57,6 +57,7 @@ //The prefix of the config property which is used to store the interfaces of a port. e.g. CELIX_RSA_INTERFACES_OF_PORT_8080. The value is a comma-separated list of interface names. #define CELIX_RSA_INTERFACES_OF_PORT_PREFIX "CELIX_RSA_INTERFACES_OF_PORT_" +//The config property for the ranking offset of imported services. The value is a comma-separated list of =. e.g. CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS="celix.remote.admin.shm=-1,org.amdatu.remote.admin.http=-2". The ranking offset will be added to the original service ranking of the imported service. This allows the user to configure the ranking of imported services based on different configuration types. #define CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS "CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS" typedef struct celix_rsa_service_entry { From bddb2413358ebc1b69eccb545f1036f786be9e1f Mon Sep 17 00:00:00 2001 From: xuzhenbao Date: Sun, 15 Mar 2026 22:07:40 +0800 Subject: [PATCH 3/3] Modify rsa_json_rpc bundle version --- bundles/remote_services/rsa_rpc_json/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundles/remote_services/rsa_rpc_json/CMakeLists.txt b/bundles/remote_services/rsa_rpc_json/CMakeLists.txt index 22a70b560..19e558e73 100644 --- a/bundles/remote_services/rsa_rpc_json/CMakeLists.txt +++ b/bundles/remote_services/rsa_rpc_json/CMakeLists.txt @@ -40,7 +40,7 @@ if (RSA_JSON_RPC) ) add_celix_bundle(rsa_json_rpc - VERSION 2.0.0 + VERSION 2.1.0 SYMBOLIC_NAME "apache_celix_rsa_json_rpc" NAME "Apache Celix Remote Service Admin JSON RPC" GROUP "Celix/RSA"