diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h index 7d2bf43aa..ef25e327e 100644 --- a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -1,41 +1,226 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation // -// Created by max on 28.04.25. +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. // +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 #ifndef RPCCLIENTUSUBSCRIPTION_H #define RPCCLIENTUSUBSCRIPTION_H +#include +#include +#include +#include +#include #include +#include -#include "USubscription.h" -#include "up-cpp/communication/RpcClient.h" +#include +#include "up-cpp/client/usubscription/v3/USubscription.h" namespace uprotocol::core::usubscription::v3 { +using uprotocol::core::usubscription::v3::SubscriptionRequest; +using uprotocol::core::usubscription::v3::UnsubscribeRequest; +using uprotocol::core::usubscription::v3::Update; +using uprotocol::core::usubscription::v3::uSubscription; + +/** + * @struct RpcClientUSubscriptionOptions + * @brief Additional details for uSubscription service. + * + * Each member represents an optional parameter for the uSubscription service. + */ +struct RpcClientUSubscriptionOptions { + /// Permission level of the subscription request + std::optional permission_level; + /// TAP token for access. + std::optional token; + /// Expiration time of the subscription. + std::optional when_expire; + /// Sample period for the subscription messages in milliseconds. + std::optional sample_period_ms; + /// Details of the subscriber. + std::optional subscriber_details; + /// Details of the subscription. + std::optional subscription_details; +}; -struct RpcClientUSubscription : USubscription { +/// @struct uSubscriptionUUriBuilder +/// @brief Structure to build uSubscription request URIs. +/// +/// This structure is used to build URIs for uSubscription service. It uses the +/// service options from uSubscription proto to set the authority name, ue_id, +/// ue_version_major, and the notification topic resource ID in the URI. +struct USubscriptionUUriBuilder { +private: + /// URI for the uSubscription service + v1::UUri uri_; + /// Resource ID of the notification topic + uint32_t sink_resource_id_; + +public: + /// @brief Constructor for USubscriptionUUriBuilder. + USubscriptionUUriBuilder() { + // Get the service descriptor + const google::protobuf::ServiceDescriptor* service = + uSubscription::descriptor(); + const auto& service_options = service->options(); + + // Get the service options + const auto& service_name = + service_options.GetExtension(uprotocol::service_name); + const auto& service_version_major = + service_options.GetExtension(uprotocol::service_version_major); + const auto& service_id = + service_options.GetExtension(uprotocol::service_id); + const auto& notification_topic = + service_options.GetExtension(uprotocol::notification_topic, 0); + + // Set the values in the URI + uri_.set_authority_name(service_name); + uri_.set_ue_id(service_id); + uri_.set_ue_version_major(service_version_major); + sink_resource_id_ = notification_topic.id(); + } + + /// @brief Get the URI with a specific resource ID. + /// + /// @param resource_id The resource ID to set in the URI. + /// + /// @return The URI with the specified resource ID. + v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(resource_id); + return uri; + } + + /// @brief Get the notification URI. + /// + /// @return The notification URI. + v1::UUri getNotificationUri() const { + v1::UUri uri = uri_; // Copy the base URI + uri.set_resource_id(sink_resource_id_); + return uri; + } +}; - explicit RpcClientUSubscription(std::unique_ptr client) - : client_(std::move(client)){}; +/// @brief Interface for uEntities to create subscriptions. +/// +/// Like all L3 client APIs, the RpcClientUSubscription is a wrapper on top of the +/// L2 Communication APIs and USubscription service. +struct RpcClientUSubscription : public USubscription{ + using RpcClientUSubscriptionOrStatus = + utils::Expected, v1::UStatus>; + using ListenCallback = transport::UTransport::ListenCallback; + using ListenHandle = transport::UTransport::ListenHandle; + + /// @brief Create a subscription + /// + /// @param transport Transport to register with. + /// @param subscription_topic Topic to subscribe to. + /// @param callback Function that is called when publish message is + /// received. + /// @param priority Priority of the subscription request. + /// @param subscribe_request_ttl Time to live for the subscription request. + /// @param rpc_client_usubscription_options Additional details for uSubscription service. + // [[nodiscard]] static RpcClientUSubscriptionOrStatus create( + // std::shared_ptr transport, + // const v1::UUri& subscription_topic, ListenCallback&& callback, + // RpcClientUSubscriptionOptions rpc_client_usubscription_options); + + /// @brief Subscribe to the topic + /// + utils::Expected subscribe(const SubscriptionRequest& subscription_request) override; + // void subscribe(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request, + // ::uprotocol::core::usubscription::v3::SubscriptionResponse* response, + // ::google::protobuf::Closure* done) override; + + /// @brief Unsubscribe from the topic and call uSubscription service to + /// close the subscription. + // void Unsubscribe(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, + // ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, + // ::google::protobuf::Closure* done) override; + + // /// @brief Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`] + // void FetchSubscriptions(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, + // ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, + // ::google::protobuf::Closure* done) override; + + // /// @brief Register for notifications relevant to a given topic inside a [`NotificationsRequest`] + // /// changing in subscription status. + // void RegisterForNotifications(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + // ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + // ::google::protobuf::Closure* done) override; + + // /// @brief Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`] + // /// changing in subscription status. + // void UnregisterForNotifications(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, + // ::uprotocol::core::usubscription::v3::NotificationsResponse* response, + // ::google::protobuf::Closure* done) override; + + // /// @brief Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`] + // void FetchSubscribers(google::protobuf::RpcController* controller, + // const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, + // ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, + // ::google::protobuf::Closure* done) override; + + /// @brief Destructor + ~RpcClientUSubscription() override = default; + + /// This section for test code only delete later + +// protected: + /// @brief Constructor + /// + /// @param transport Transport to register with. + /// @param subscriber_details Additional details about the subscriber. + explicit RpcClientUSubscription(std::shared_ptr transport, + RpcClientUSubscriptionOptions rpc_client_usubscription_options = {}); - void default_call_option(); +private: + // Transport + std::shared_ptr transport_; - SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) override; + // Topic to subscribe to + const v1::UUri subscription_topic_; - UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) override; + // Additional details about uSubscription service + RpcClientUSubscriptionOptions rpc_client_usubscription_options_; - FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) override; + // URI info about the uSubscription service + USubscriptionUUriBuilder uSubscriptionUUriBuilder_; - NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) override; + // Allow the protected constructor for this class to be used in make_unique + // inside of create() + friend std::unique_ptr + std::make_unique, + const uprotocol::v1::UUri, + uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions>( + std::shared_ptr&&, + const uprotocol::v1::UUri&&, + uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions&&); - NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) override; + /// @brief Build SubscriptionRequest for subscription request + SubscriptionRequest buildSubscriptionRequest(); - FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) override; + /// @brief Build UnsubscriptionRequest for unsubscription request + UnsubscribeRequest buildUnsubscriptionRequest(); -private: - std::unique_ptr client_; + /// @brief Create a notification sink to receive subscription updates + v1::UStatus createNotificationSink(); }; -} // namespace uprotocol::core::usubscription::v3 +} // namespace uprotocol::core::usubscription::v3 -#endif //RPCCLIENTUSUBSCRIPTION_H +#endif // RPCCLIENTUSUBSCRIPTION_H \ No newline at end of file diff --git a/include/up-cpp/client/usubscription/v3/USubscription.h b/include/up-cpp/client/usubscription/v3/USubscription.h index 53aafb349..9c387039d 100644 --- a/include/up-cpp/client/usubscription/v3/USubscription.h +++ b/include/up-cpp/client/usubscription/v3/USubscription.h @@ -1,28 +1,30 @@ -// -// Created by max on 28.04.25. -// - #ifndef USUBSCRIPTION_H #define USUBSCRIPTION_H -#include "RpcClientUSubscription.h" +#include +#include +#include +#include "up-cpp/utils/Expected.h" namespace uprotocol::core::usubscription::v3 { struct USubscription { + template + using ResponseOrStatus = utils::Expected; + virtual ~USubscription() = default; - virtual SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) = 0; + virtual ResponseOrStatus subscribe(const SubscriptionRequest& subscription_request) = 0; - virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; + // virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; - virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0; + // virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0; - virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ; + // virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ; - virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0; + // virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0; - virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0; + // virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0; }; diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp index 999e584f4..a160f1dce 100644 --- a/src/client/usubscription/v3/RpcClientUSubscription.cpp +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -1,18 +1,329 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation // -// Created by max on 28.04.25. +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. // +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include -#include "up-cpp/client/usubscription/v3/RpcClientUSubscription.h" +#include +#include "up-cpp/communication/RpcClient.h" + +constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; +// TODO(lennart) see default_call_options() for the request in Rust +constexpr auto SUBSCRIPTION_REQUEST_TTL = std::chrono::milliseconds(0x0800); // TODO(lennart) change time +auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // MUST be >= 4 namespace uprotocol::core::usubscription::v3 { - using Payload = datamodel::builder::Payload; - SubscriptionResponse RpcClientUSubscription::subscribe( - const SubscriptionRequest& subscription_request) { - Payload test_test(subscription_request); - auto invoke_handle = client_->invokeMethod(test_test, //TODO(max)); - return SubscriptionResponse(); - } +RpcClientUSubscription::RpcClientUSubscription(std::shared_ptr transport, + RpcClientUSubscriptionOptions rpc_client_usubscription_options) + : transport_(std::move(transport)), + rpc_client_usubscription_options_(std::move(rpc_client_usubscription_options)) { + // Initialize uSubscriptionUUriBuilder_ + uSubscriptionUUriBuilder_ = USubscriptionUUriBuilder(); +} + +// [[nodiscard]] RpcClientUSubscription::RpcClientUSubscriptionOrStatus RpcClientUSubscription::create( +// std::shared_ptr transport, +// const v1::UUri& subscription_topic, ListenCallback&& callback, +// RpcClientUSubscriptionOptions rpc_client_usubscription_options) { +// auto rpc_client_usubscription = std::make_unique( +// std::forward>(transport), +// std::forward(rpc_client_usubscription_options)); + +// google::protobuf::RpcController *controller = nullptr; +// ::uprotocol::core::usubscription::v3::SubscriptionRequest const *subscription_request = nullptr; +// SubscriptionResponse *subscription_response = nullptr; + +// // Attempt to connect create notification sink for updates. +// auto status = rpc_client_usubscription->createNotificationSink(); +// if (status.code() == v1::UCode::OK) { +// rpc_client_usubscription->Subscribe(controller, subscription_request, +// subscription_response, nullptr); +// if (controller == nullptr) { +// return RpcClientUSubscriptionOrStatus(std::move(rpc_client_usubscription)); +// } +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } +// // If connection fails, return the error status. +// return RpcClientUSubscriptionOrStatus(utils::Unexpected(status)); +// } + +// v1::UStatus RpcClientUSubscription::createNotificationSink() { +// auto notification_sink_callback = [this](const v1::UMessage& update) { +// if (update.has_payload()) { +// Update data; +// if (data.ParseFromString(update.payload())) { +// if (data.topic().SerializeAsString() == +// subscription_topic_.SerializeAsString()) { +// subscription_update_ = std::move(data); +// } +// } +// } +// }; + +// auto notification_topic = uSubscriptionUUriBuilder_.getNotificationUri(); + +// auto result = communication::NotificationSink::create( +// transport_, std::move(notification_sink_callback), notification_topic); + +// if (result.has_value()) { +// noficationSinkHandle_ = std::move(result).value(); +// v1::UStatus status; +// status.set_code(v1::UCode::OK); +// return status; +// } +// return result.error(); +// } + +SubscriptionRequest RpcClientUSubscription::buildSubscriptionRequest() { + auto attributes = utils::ProtoConverter::BuildSubscribeAttributes( + rpc_client_usubscription_options_.when_expire, rpc_client_usubscription_options_.subscription_details, + rpc_client_usubscription_options_.sample_period_ms); + + auto subscription_request = utils::ProtoConverter::BuildSubscriptionRequest( + subscription_topic_, attributes); + return subscription_request; +} + +RpcClientUSubscription::ResponseOrStatus RpcClientUSubscription::subscribe(const SubscriptionRequest& subscription_request) { + + communication::RpcClient rpc_client( + transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE), + priority, SUBSCRIPTION_REQUEST_TTL); + + datamodel::builder::Payload payload(subscription_request); + + auto invoke_future = + rpc_client.invokeMethod(std::move(payload)); + + auto message_or_status = invoke_future.get(); + + if (!message_or_status.has_value()) { + return ResponseOrStatus( + utils::Unexpected(message_or_status.error())); + } + + SubscriptionResponse subscription_response; + subscription_response.ParseFromString(message_or_status.value().payload()); + + if (subscription_response.topic().SerializeAsString() == + subscription_topic_.SerializeAsString()) { + return ResponseOrStatus(subscription_response); + } + + return ResponseOrStatus( + utils::Unexpected(message_or_status.error())); + +} + + + + +// UnsubscribeRequest RpcClientUSubscription::buildUnsubscriptionRequest() { +// auto unsubscribe_request = +// utils::ProtoConverter::BuildUnSubscribeRequest(subscription_topic_); +// return unsubscribe_request; +// } + +// void RpcClientUSubscription::Unsubscribe( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request, +// ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// unsubscribe_response_ = *response; +// } +// } +// } +// }; + +// // UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// subscriber_.reset(); + +// done->Run(); +// } + +// void RpcClientUSubscription::FetchSubscriptions( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request, +// ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIPTIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// fetch_subscription_response_ = *response; +// } +// } +// } +// }; + +// // FetchSubscriptionsRequest const fetch_subscriptions_request = buildFetchSubscriptionsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::RegisterForNotifications( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, +// ::uprotocol::core::usubscription::v3::NotificationsResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// notification_response_ = *response; +// } +// } +// } +// }; + +// // NotificationsRequest const register_notifications_request = buildRegisterNotificationsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::UnregisterForNotifications( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request, +// ::uprotocol::core::usubscription::v3::NotificationsResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// notification_response_ = *response; +// } +// } +// } +// }; + +// // NotificationsRequest const unregister_notifications_request = buildUnregisterNotificationsRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? + +// done->Run(); +// } + +// void RpcClientUSubscription::FetchSubscribers( +// google::protobuf::RpcController* controller, +// const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request, +// ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response, +// ::google::protobuf::Closure* done) { + +// constexpr int REQUEST_TTL_TIME = 0x8000; // TODO(lennart) time? +// constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008; +// auto request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); +// auto priority = uprotocol::v1::UPriority::UPRIORITY_UNSPECIFIED; + +// rpc_client_ = std::make_unique( +// transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(RESOURCE_ID_FETCH_SUBSCRIBERS), +// priority, request_ttl); + +// auto on_response = [this, response](const auto& maybe_response) { +// if (maybe_response.has_value() && +// maybe_response.value().has_payload()) { +// if (response->ParseFromString(maybe_response.value().payload())) { +// if (response->SerializeAsString() == // TODO(lennart) topic specific? See subscribe +// subscription_topic_.SerializeAsString()) { +// fetch_subscribers_response_ = *response; +// } +// } +// } +// }; + +// // FetchSubscribersRequest const fetch_subscribers_request = buildFetchSubscribersRequest(); +// auto payload = datamodel::builder::Payload(*request); // TODO(lennart) check if request is correct + +// rpc_handle_ = +// rpc_client_->invokeMethod(std::move(payload), std::move(on_response)); + +// // TODO(lennart) any handle for the response? - } // namespace uprotocol::core::usubscription::v3 +// done->Run(); +// } +} // namespace uprotocol::core::usubscription::v3 diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index bd7e6d67f..56c122710 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -84,6 +84,9 @@ add_coverage_test("NotificationSourceTest" coverage/communication/NotificationSo # client add_coverage_test("ConsumerTest" coverage/client/usubscription/v3/ConsumerTest.cpp) +# core +add_coverage_test("RpcClientUSubscriptionTest" coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp) + ########################## EXTRAS ############################################# add_extra_test("PublisherSubscriberTest" extra/PublisherSubscriberTest.cpp) add_extra_test("NotificationTest" extra/NotificationTest.cpp) diff --git a/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp new file mode 100644 index 000000000..fe6a3ef33 --- /dev/null +++ b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp @@ -0,0 +1,131 @@ +// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include +#include + +#include + +#include "UTransportMock.h" + +namespace { +using MsgDiff = google::protobuf::util::MessageDifferencer; + +class RpcClientUSubscriptionTest : public testing::Test { +private: + std::shared_ptr mockTransportClient_; + std::shared_ptr mockTransportServer_; + uprotocol::v1::UUri client_uuri; + uprotocol::v1::UUri server_uuri; + uprotocol::v1::UUri subscription_uuri; + +protected: + // Run once per TEST_F. + // Used to set up clean environments per test. + + std::shared_ptr getMockTransportClient() + const { + return mockTransportClient_; + } + std::shared_ptr getMockTransportServer() + const { + return mockTransportServer_; + } + uprotocol::v1::UUri& getClientUUri() { return client_uuri; } + const uprotocol::v1::UUri& getServerUUri() const { return server_uuri; } + const uprotocol::v1::UUri& getSubscriptionUUri() const { + return subscription_uuri; + } + + void SetUp() override { + constexpr uint32_t TEST_UE_ID = 0x18000; + constexpr uint32_t DEFAULT_RESOURCE_ID = 0x8000; + // Create a generic transport uri + client_uuri.set_authority_name("random_string"); + client_uuri.set_ue_id(TEST_UE_ID); + client_uuri.set_ue_version_major(3); + client_uuri.set_resource_id(0); + + // Set up a transport + mockTransportClient_ = + std::make_shared(client_uuri); + + // Craete server default uri and set up a transport + server_uuri.set_authority_name("core.usubscription"); + server_uuri.set_ue_id(0); + server_uuri.set_ue_version_major(3); + server_uuri.set_resource_id(0); + + mockTransportServer_ = + std::make_shared(server_uuri); + + // Create a generic subscription uri + subscription_uuri.set_authority_name("10.0.0.2"); + subscription_uuri.set_ue_id(TEST_UE_ID); + subscription_uuri.set_ue_version_major(3); + subscription_uuri.set_resource_id(DEFAULT_RESOURCE_ID); + }; + void TearDown() override {} + + // Run once per execution of the test application. + // Used for setup of all tests. Has access to this instance. + RpcClientUSubscriptionTest() = default; + + void buildDefaultSourceURI(); + void buildValidNotificationURI(); + void buildInValidNotificationURI(); + + // Run once per execution of the test application. + // Used only for global setup outside of tests. + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} + +public: + ~RpcClientUSubscriptionTest() override = default; +}; + +// Negative test case with no source filter +TEST_F(RpcClientUSubscriptionTest, ConstructorTestSuccess) { // NOLINT + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + + auto rpc_client_usubscription = + std::make_unique(getMockTransportClient(), + options); + + // Verify that the RpcClientUSubscription pointer is not null, indicating successful + ASSERT_NE(rpc_client_usubscription, nullptr); +} + +TEST_F(RpcClientUSubscriptionTest, SubscribeTestSuccess) { // NOLINT + + auto options = uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions(); + + uprotocol::core::usubscription::v3::SubscriptionRequest subscription_request = uprotocol::utils::ProtoConverter::BuildSubscriptionRequest( + getSubscriptionUUri(), uprotocol::core::usubscription::v3::SubscribeAttributes()); + + auto rpc_client_usubscription = + std::make_unique(getMockTransportClient(), + options); + + // Verify that the RpcClientUSubscription pointer is not null, indicating successful + ASSERT_NE(rpc_client_usubscription, nullptr); + + auto result = rpc_client_usubscription->subscribe(subscription_request); + + ASSERT_NE(&result, nullptr); +} + +} // namespace