Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bundles/remote_services/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ The Remote Service Admin Service subproject contains an adapted implementation o

The topology manager decides which services should be imported and exported according to a defined policy. Currently, only one policy is implemented in Celix, the *promiscuous* policy, which simply imports and exports all services.

| **Bundle** | `Celix::rsa_topology_manager` |
|--|-----------------------------------------|
| **Configuration** | *None* |
| **Bundle** | `Celix::rsa_topology_manager` |
|--|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Configuration** | `CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS`: defines the ranking offsets for imported services. The value of this property is a comma-separated list of `<config type>=<ranking offset>`. The `<config type>` is the value of the `service.exported.configs` property of the service registration, and the `<ranking offset>` is an integer that is added to the ranking of the imported service. This allows you to influence the ranking of imported services based on their configuration type. For example, if you want to give a higher ranking to remote services with configuration type "shm" than remote services with configuration type "http", you can set this property to `http=-2,shm=-1`. |

### Remote Service Admin

Expand Down
2 changes: 1 addition & 1 deletion bundles/remote_services/rsa_rpc_json/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ if (RSA_JSON_RPC)
)

add_celix_bundle(rsa_json_rpc
VERSION 2.0.0
VERSION 2.1.0
SYMBOLIC_NAME "apache_celix_rsa_json_rpc"
NAME "Apache Celix Remote Service Admin JSON RPC"
GROUP "Celix/RSA"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
#include "celix_framework_version.h"
#include <gtest/gtest.h>
#include <cstdlib>
#include <thread>
#include <atomic>
extern "C" {
#include "remote_interceptors_handler.h"
}
Expand Down Expand Up @@ -360,14 +362,6 @@ class RsaJsonRpcProxyUnitTestSuite2 : public RsaJsonRpcProxyUnitTestSuite {
long proxyId{-1};
};

TEST_F(RsaJsonRpcProxyUnitTestSuite, FailedToCreateSendRequestMethodLock) {
celix_autoptr(endpoint_description_t) endpoint = CreateEndpointDescription();
long proxyId = -1L;
celix_ei_expect_celixThreadRwlock_create((void*)&rsaJsonRpcProxy_factoryCreate, 0, CELIX_ENOMEM);
auto status = rsaJsonRpc_createProxy(jsonRpc.get(), endpoint, SendRequest, nullptr, &proxyId);
EXPECT_EQ(CELIX_ENOMEM, status);
}

TEST_F(RsaJsonRpcProxyUnitTestSuite, FailedToCreateProxiesHashMap) {
auto endpoint = CreateEndpointDescription();
long proxyId = -1L;
Expand Down Expand Up @@ -458,14 +452,26 @@ TEST_F(RsaJsonRpcProxyUnitTestSuite2, InvokeProxyServiceWithInvalidParams) {
}

TEST_F(RsaJsonRpcProxyUnitTestSuite2, InvokeProxyServiceWhenProxyIsDestroying) {
auto found = celix_bundleContext_useService(ctx.get(), RSA_RPC_JSON_TEST_SERVICE, this, [](void *handle, void *svc) {
auto self = static_cast<RsaJsonRpcProxyUnitTestSuite2*>(handle);
rsaJsonRpc_destroyProxy(self->jsonRpc.get(), self->proxyId);
std::atomic<int> syncStatus{0};
std::thread t([this, &syncStatus](){
while (syncStatus.load() != 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
syncStatus.store(2);
rsaJsonRpc_destroyProxy(jsonRpc.get(), proxyId);
});
auto found = celix_bundleContext_useService(ctx.get(), RSA_RPC_JSON_TEST_SERVICE, &syncStatus, [](void *handle, void *svc) {
auto syncStatus = static_cast<std::atomic<int>*>(handle);
syncStatus->store(1);
while (syncStatus->load() != 2) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
auto proxySvc = static_cast<rsa_rpc_json_test_service_t*>(svc);
EXPECT_NE(nullptr, proxySvc);
EXPECT_EQ(CELIX_ILLEGAL_STATE, proxySvc->test(proxySvc->handle));
EXPECT_EQ(CELIX_SUCCESS, proxySvc->test(proxySvc->handle));
});
EXPECT_TRUE(found);
t.join();
}

TEST_F(RsaJsonRpcProxyUnitTestSuite2, FailedToPrepareInvokeRequest) {
Expand Down
44 changes: 7 additions & 37 deletions bundles/remote_services/rsa_rpc_json/src/rsa_json_rpc_proxy_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct rsa_json_rpc_proxy_factory {
endpoint_description_t *endpointDesc;
celix_long_hash_map_t *proxies;//Key:requestingBundle, Value: rsa_json_rpc_proxy_t *. Work on the celix_event thread , so locks are not required
remote_interceptors_handler_t *interceptorsHandler;
celix_thread_rwlock_t sendRequestLock; //protects sendRequest
celix_rsa_send_request_fp sendRequest;
void* sendRequestHandle;
};
Expand All @@ -65,7 +64,6 @@ static void rsaJsonRpcProxy_ungetService(void *handle, const celix_bundle_t *req
static celix_status_t rsaJsonRpcProxy_create(rsa_json_rpc_proxy_factory_t *proxyFactory,
const celix_bundle_t *requestingBundle, rsa_json_rpc_proxy_t **proxyOut);
static void rsaJsonRpcProxy_destroy(rsa_json_rpc_proxy_t *proxy);
static void rsaJsonRpcProxy_unregisterFacSvcDone(void *data);

celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx,
celix_log_helper_t* logHelper,
Expand Down Expand Up @@ -95,14 +93,6 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx,
proxyFactory->sendRequestHandle = sendRequestHandle;
proxyFactory->serialProtoId = serialProtoId;


celix_status_t status = celixThreadRwlock_create(&proxyFactory->sendRequestLock, NULL);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(logHelper, "Proxy: Error creating sendRequest lock. %d", status);
return status;
}
celix_autoptr(celix_thread_rwlock_t) sendRequestLock = &proxyFactory->sendRequestLock;

CELIX_BUILD_ASSERT(sizeof(long) == sizeof(void*)); // The hash_map uses the pointer as key, so this should be true
celix_autoptr(celix_long_hash_map_t) proxies = proxyFactory->proxies = celix_longHashMap_create();
if (proxyFactory->proxies == NULL) {
Expand All @@ -121,7 +111,7 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx,
proxyFactory->factory.getService = rsaJsonRpcProxy_getService;
proxyFactory->factory.ungetService = rsaJsonRpcProxy_ungetService;
celix_properties_t* svcProperties = NULL;
status = celix_rsaUtils_createServicePropertiesFromEndpointProperties(endpointDesc->properties, &svcProperties);
celix_status_t status = celix_rsaUtils_createServicePropertiesFromEndpointProperties(endpointDesc->properties, &svcProperties);
if (status != CELIX_SUCCESS) {
return status;
}
Expand All @@ -135,34 +125,21 @@ celix_status_t rsaJsonRpcProxy_factoryCreate(celix_bundle_context_t* ctx,

celix_steal_ptr(endpointDescCopy);
celix_steal_ptr(proxies);
celix_steal_ptr(sendRequestLock);
*proxyFactoryOut = celix_steal_ptr(proxyFactory);
return CELIX_SUCCESS;
}

void rsaJsonRpcProxy_factoryDestroy(rsa_json_rpc_proxy_factory_t *proxyFactory) {
assert(proxyFactory != NULL);
{
celix_auto(celix_rwlock_wlock_guard_t) wLockGuard = celixRwlockWlockGuard_init(&proxyFactory->sendRequestLock);
proxyFactory->sendRequest = NULL;
}
celix_bundleContext_unregisterServiceAsync(proxyFactory->ctx, proxyFactory->factorySvcId,
proxyFactory, rsaJsonRpcProxy_unregisterFacSvcDone);
}

long rsaJsonRpcProxy_factorySvcId(rsa_json_rpc_proxy_factory_t *proxyFactory) {
return proxyFactory->factorySvcId;
}

static void rsaJsonRpcProxy_unregisterFacSvcDone(void *data) {
assert(data);
rsa_json_rpc_proxy_factory_t *proxyFactory = (rsa_json_rpc_proxy_factory_t *)data;
celix_bundleContext_unregisterService(proxyFactory->ctx, proxyFactory->factorySvcId);
endpointDescription_destroy(proxyFactory->endpointDesc);
assert(celix_longHashMap_size(proxyFactory->proxies) == 0);
celix_longHashMap_destroy(proxyFactory->proxies);
celixThreadRwlock_destroy(&proxyFactory->sendRequestLock);
free(proxyFactory);
return;
}

long rsaJsonRpcProxy_factorySvcId(rsa_json_rpc_proxy_factory_t *proxyFactory) {
return proxyFactory->factorySvcId;
}

static void* rsaJsonRpcProxy_getService(void *handle, const celix_bundle_t *requestingBundle,
Expand Down Expand Up @@ -244,15 +221,8 @@ static void rsaJsonRpcProxy_serviceFunc(void *userData, void *args[], void *retu
proxyFactory->endpointDesc->properties, dynFunction_getName(entry->dynFunc), &metadata);
if (cont) {
struct iovec requestIovec = {invokeRequest,strlen(invokeRequest) + 1};
celixThreadRwlock_readLock(&proxyFactory->sendRequestLock);
if (proxyFactory->sendRequest != NULL) {
status = proxyFactory->sendRequest(proxyFactory->sendRequestHandle, proxyFactory->endpointDesc,
status = proxyFactory->sendRequest(proxyFactory->sendRequestHandle, proxyFactory->endpointDesc,
metadata, &requestIovec, &replyIovec);
} else {
status = CELIX_ILLEGAL_STATE;
celix_logHelper_warning(proxyFactory->logHelper,"Maybe the \"%s\" service is stopping.", proxyFactory->endpointDesc->serviceName);
}
celixThreadRwlock_unlock(&proxyFactory->sendRequestLock);
if (status == CELIX_SUCCESS && dynFunction_hasReturn(entry->dynFunc)) {
if (replyIovec.iov_base != NULL) {
int rsErrno = CELIX_SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion bundles/remote_services/topology_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ add_celix_bundle(rsa_topology_manager
src/topology_manager.c
src/scope.c
src/activator.c
VERSION 0.9.0
VERSION 0.10.0
SYMBOLIC_NAME "apache_celix_rs_topology_manager"
GROUP "Celix/RSA"
NAME "Apache Celix RS Topology Manager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ if (EI_TESTS)
Celix::framework
Celix::threads_ei
# Celix::bundle_ctx_ei
# Celix::string_hash_map_ei
Celix::string_hash_map_ei
Celix::long_hash_map_ei
Celix::array_list_ei
Celix::properties_ei
Celix::utils_ei
Celix::malloc_ei
Celix::filter_ei
Celix::asprintf_ei
GTest::gtest
GTest::gtest_main
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
#include "celix_properties_ei.h"
#include "celix_utils_ei.h"
#include "celix_threads_ei.h"
#include "celix_string_hash_map_ei.h"
#include "celix_filter_ei.h"
#include "asprintf_ei.h"

class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test {
public:
Expand All @@ -45,6 +48,10 @@ class TopologyManagerCreatingErrorInjectionTestSuite : public ::testing::Test {
celix_ei_expect_calloc(nullptr, 0, nullptr);
celix_ei_expect_celixThreadMutex_create(nullptr, 0, 0);
celix_ei_expect_celix_longHashMap_create(nullptr, 0, nullptr);
celix_ei_expect_celix_stringHashMap_create(nullptr, 0, nullptr);
celix_ei_expect_celix_stringHashMap_putLong(nullptr, 1, 0);
celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr);
celix_ei_expect_celix_stringHashMap_createWithOptions(nullptr, 0, nullptr);
}

std::shared_ptr<celix_framework_t> fw{};
Expand All @@ -60,6 +67,42 @@ TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AllocingMemoryErrorTest)
EXPECT_EQ(CELIX_ENOMEM, status);
}

TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceRankingOffsetMapErrorTest) {
celix_ei_expect_celix_stringHashMap_create((void*)topologyManager_create, 0, nullptr);
void *scope{};
topology_manager_t *tmPtr{};
auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope);
EXPECT_EQ(CELIX_ENOMEM, status);
}

TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, ImportedServiceRankingOffsetStringDuplicationErrorTest) {
setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1);
celix_ei_expect_celix_utils_strdup((void*)topologyManager_create, 1, nullptr);
void *scope{};
topology_manager_t *tmPtr{};
auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope);
EXPECT_EQ(CELIX_ENOMEM, status);
unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS");
}

TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, AddingImportedServiceRankingOffsetErrorTest) {
setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "configType1=10,configType2=20", 1);
celix_ei_expect_celix_stringHashMap_putLong((void*)topologyManager_create, 1, ENOMEM);
void *scope{};
topology_manager_t *tmPtr{};
auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope);
EXPECT_EQ(ENOMEM, status);
unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS");
}

TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingImportedServiceMapErrorTest) {
celix_ei_expect_celix_stringHashMap_createWithOptions((void*)topologyManager_create, 0, nullptr);
void *scope{};
topology_manager_t *tmPtr{};
auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope);
EXPECT_EQ(CELIX_ENOMEM, status);
}

TEST_F(TopologyManagerCreatingErrorInjectionTestSuite, CreatingMutexErrorTest) {
celix_ei_expect_celixThreadMutex_create((void*)topologyManager_create, 0, CELIX_ENOMEM);
void *scope{};
Expand Down Expand Up @@ -121,6 +164,12 @@ class TopologyManagerErrorInjectionTestSuite : public TopologyManagerTestSuiteBa
celix_ei_expect_celix_properties_set(nullptr, 0, 0);
celix_ei_expect_celix_utils_strdup(nullptr, 0, nullptr);
celix_ei_expect_celix_arrayList_add(nullptr, 0, 0);
celix_ei_expect_celix_filter_create(nullptr, 0, nullptr);
celix_ei_expect_celix_properties_getAsStringArrayList(nullptr, 0, 0);
celix_ei_expect_celix_properties_setLong(nullptr, 0, 0);
celix_ei_expect_asprintf(nullptr, 0, 0);
celix_ei_expect_celix_arrayList_createWithOptions(nullptr, 0, nullptr);
celix_ei_expect_celix_stringHashMap_put(nullptr, 0, 0);
}

void TestExportServiceFailure(void (*errorInject)(void)) {
Expand Down Expand Up @@ -263,5 +312,91 @@ TEST_F(TopologyManagerErrorInjectionTestSuite, AddDynamicIpEndpointToListErrorTe
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingFilterErrorWhenImportScopeChangedTest) {
celix_ei_expect_celix_filter_create(CELIX_EI_UNKNOWN_CALLER, 0, nullptr, 2);
auto status = tm_addImportScope(tms.get(), (char*)"(service.imported.configs=tm_test_config_type)");
EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, status);
}

TEST_F(TopologyManagerErrorInjectionTestSuite, AllocingMemoryForImportedServiceEntryErrorTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_calloc(CELIX_EI_UNKNOWN_CALLER, 0, nullptr);
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(CELIX_ENOMEM, status);

status = topologyManager_removeImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(CELIX_SUCCESS, status);
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, GettingServiceImportedConfigsErrorTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_celix_properties_getAsStringArrayList(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM);
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);

celix_properties_unset(importEndpoint->properties, CELIX_RSA_SERVICE_IMPORTED_CONFIGS);
status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(CELIX_ENOMEM, status);
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, CopyImportedServicePropertiesErrorTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_celix_properties_copy(CELIX_EI_UNKNOWN_CALLER, 0, nullptr);//called in endpointDescription_clone
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, SettingImportedServiceRankingErrorTest) {
setenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS", "tm_test_config_type=10", 1);
tm.reset();
void* scope = nullptr;
topology_manager_t* tmPtr{};
auto status = topologyManager_create(ctx.get(), logHelper.get(), &tmPtr, &scope);
EXPECT_EQ(status, CELIX_SUCCESS);
tm = std::shared_ptr<topology_manager_t>{tmPtr, [](auto t) {topologyManager_destroy(t);}};

TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_celix_properties_setLong(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM);//called in endpointDescription_clone
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);
});

unsetenv("CELIX_RSA_IMPORTED_SERVICE_RANKING_OFFSETS");
}

TEST_F(TopologyManagerErrorInjectionTestSuite, GeneratingImportsKeyFailureTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_asprintf(CELIX_EI_UNKNOWN_CALLER, 0, -1);
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);

celix_ei_expect_asprintf(CELIX_EI_UNKNOWN_CALLER, 0, -1);
status = topologyManager_removeImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, CreatingImportsListFailureTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_celix_arrayList_createWithOptions(CELIX_EI_UNKNOWN_CALLER, 0, nullptr);
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);

status = topologyManager_removeImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(CELIX_SUCCESS, status);
});
}

TEST_F(TopologyManagerErrorInjectionTestSuite, AddingImportsListToMapFailureTest) {
TestImportService([](topology_manager_t* tm, service_reference_pt, void*, endpoint_description_t *importEndpoint) {
celix_ei_expect_celix_stringHashMap_put(CELIX_EI_UNKNOWN_CALLER, 0, ENOMEM);
auto status = topologyManager_addImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(ENOMEM, status);

status = topologyManager_removeImportedService(tm, importEndpoint, nullptr);
EXPECT_EQ(CELIX_SUCCESS, status);
});
}
Loading
Loading