Skip to content

Commit bf9d68e

Browse files
committed
made USubscription async with InvokeProtoFuture return type
1 parent fb81873 commit bf9d68e

File tree

7 files changed

+181
-302
lines changed

7 files changed

+181
-302
lines changed

include/up-cpp/client/usubscription/v3/Consumer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
#include <up-cpp/communication/RpcClient.h>
1717
#include <up-cpp/communication/Subscriber.h>
1818
#include <up-cpp/datamodel/builder/Payload.h>
19-
#include <up-cpp/utils/ProtoConverter.h>
2019
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
21-
#include <uprotocol/v1/umessage.pb.h>
2220

2321
#include "RequestBuilder.h"
2422
#include "USubscriptionUUriBuilder.h"

include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,31 +52,29 @@ struct RpcClientUSubscription : USubscription {
5252
using ListenCallback = transport::UTransport::ListenCallback;
5353
using ListenHandle = transport::UTransport::ListenHandle;
5454

55-
template <typename Response>
56-
Response invokeResponse(communication::RpcClient rpc_client);
57-
5855
/// @brief Subscribes from a given topic
5956
///
6057
/// @param subscription_request The request object containing the topic to
6158
/// subscribe to
62-
/// @return Returns a SubscriptionResponse on success and a UStatus else
63-
utils::Expected<SubscriptionResponse, v1::UStatus> subscribe(
59+
/// @return Returns a future that reslves to a SubscriptionResponse on
60+
/// success and a UStatus else
61+
communication::RpcClient::InvokeProtoFuture<SubscriptionResponse> subscribe(
6462
const SubscriptionRequest& subscription_request) override;
6563

6664
/// @brief Unsubscribes from a given topic
6765
///
6866
/// @param unsubscribe_request The request object containing the topic to
6967
/// unsubscribe from
7068
/// @return Returns an UnsubscribeResponse on success and a UStatus else
71-
utils::Expected<UnsubscribeResponse, v1::UStatus> unsubscribe(
72-
const UnsubscribeRequest& unsubscribe_request) override;
69+
communication::RpcClient::InvokeProtoFuture<UnsubscribeResponse>
70+
unsubscribe(const UnsubscribeRequest& unsubscribe_request) override;
7371

7472
/// @brief Fetches the list of topics the client is subscribed to
7573
///
7674
/// @param fetch_subscriptions_request The request object
77-
/// @return Returns a FetchSubscriptionsResponse on success and a UStatus
78-
/// else
79-
utils::Expected<FetchSubscriptionsResponse, v1::UStatus>
75+
/// @return Returns a future that reslves to a FetchSubscriptionsResponse on
76+
/// success and a UStatus else
77+
communication::RpcClient::InvokeProtoFuture<FetchSubscriptionsResponse>
8078
fetch_subscriptions(
8179
const FetchSubscriptionsRequest& fetch_subscriptions_request) override;
8280

@@ -85,38 +83,47 @@ struct RpcClientUSubscription : USubscription {
8583
/// @param fetch_subscribers_request The request object containing the topic
8684
/// for which the subscribers are to be fetched
8785
/// @return Returns a FetchSubscribersResponse on success and a UStatus else
88-
utils::Expected<FetchSubscribersResponse, v1::UStatus> fetch_subscribers(
86+
communication::RpcClient::InvokeProtoFuture<FetchSubscribersResponse>
87+
fetch_subscribers(
8988
const FetchSubscribersRequest& fetch_subscribers_request) override;
9089

9190
/// @brief Registers to receive notifications
9291
///
9392
/// @param register_notifications_request The request object containing
9493
/// the details to register for notifications
95-
/// @return Returns a NotificationResponse on success and a UStatus else
96-
utils::Expected<NotificationsResponse, v1::UStatus>
94+
/// @return Returns a future that resolves to a NotificationResponse on
95+
/// success and a UStatus else
96+
communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
9797
register_for_notifications(
9898
const NotificationsRequest& register_notifications_request) override;
9999

100100
/// @brief Unregisters from receiving notifications.
101101
///
102102
/// @param unregister_notifications_request The request object containing
103103
/// the details needed to stop receiving notifications.
104-
/// @return Returns a NotificationResponse on success and a UStatus else
105-
utils::Expected<NotificationsResponse, v1::UStatus>
104+
/// @return Returns future that resolves to a NotificationResponse on
105+
/// success and a UStatus else
106+
communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
106107
unregister_for_notifications(
107108
const NotificationsRequest& unregister_notifications_request) override;
108109

109110
/// @brief Constructor
110111
///
111112
/// @param transport Transport used to send messages
112113
explicit RpcClientUSubscription(
113-
std::shared_ptr<transport::UTransport> transport)
114-
: transport_(std::move(transport)) {}
114+
std::shared_ptr<transport::UTransport> transport);
115115

116116
~RpcClientUSubscription() override = default;
117117

118118
private:
119119
std::shared_ptr<transport::UTransport> transport_;
120+
std::shared_ptr<communication::RpcClient> subscribe_client_;
121+
std::shared_ptr<communication::RpcClient> unsubscribe_client_;
122+
std::shared_ptr<communication::RpcClient> fetch_subscriptions_client_;
123+
std::shared_ptr<communication::RpcClient> fetch_subscribers_client_;
124+
std::shared_ptr<communication::RpcClient> register_for_notification_client_;
125+
std::shared_ptr<communication::RpcClient>
126+
unregister_for_notification_client_;
120127

121128
USubscriptionUUriBuilder uuri_builder_;
122129
};

include/up-cpp/client/usubscription/v3/USubscription.h

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTION_H
1313
#define UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTION_H
1414
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
15-
#include <uprotocol/v1/umessage.pb.h>
1615
#include <uprotocol/v1/ustatus.pb.h>
1716

17+
#include "up-cpp/communication/RpcClient.h"
1818
#include "up-cpp/utils/Expected.h"
1919

2020
namespace uprotocol::core::usubscription::v3 {
@@ -33,46 +33,57 @@ struct USubscription {
3333
/// response on success or else a status code
3434
///
3535
/// @param subscription_request containing a topic to subscribe to
36-
/// @return SubscriptionReponse on success and UStatus else
37-
virtual ResponseOrStatus<SubscriptionResponse> subscribe(
38-
const SubscriptionRequest& subscription_request) = 0;
36+
/// @return future that resolves to a SubscriptionReponse on success and
37+
/// UStatus else
38+
virtual communication::RpcClient::InvokeProtoFuture<SubscriptionResponse>
39+
subscribe(const SubscriptionRequest& subscription_request) = 0;
3940

4041
/// @brief sends an unsubscribe request to a USubscription backend and a
4142
/// response on success or else a status code
4243
///
4344
/// @param unsubscribe_request containing a topic to unsubscribe
44-
/// @return UnsubscribeResponse on success and UStatus else
45-
virtual ResponseOrStatus<UnsubscribeResponse> unsubscribe(
46-
const UnsubscribeRequest& unsubscribe_request) = 0;
45+
/// @return future that resolves to UnsubscribeResponse on success and
46+
/// UStatus else
47+
virtual communication::RpcClient::InvokeProtoFuture<UnsubscribeResponse>
48+
unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0;
4749

4850
/// @brief fetches all topics the client is subscribed to from the backend
4951
///
5052
/// @param fetch_subscriptions_request
51-
/// @return FetchSubscriptionsResponse on success and UStatus else
52-
virtual ResponseOrStatus<FetchSubscriptionsResponse> fetch_subscriptions(
53+
/// @return future that resolves to FetchSubscriptionsResponse on success
54+
/// and UStatus else
55+
virtual communication::RpcClient::InvokeProtoFuture<
56+
FetchSubscriptionsResponse>
57+
fetch_subscriptions(
5358
const FetchSubscriptionsRequest& fetch_subscriptions_request) = 0;
5459

5560
/// @brief registers for notifications to a USubscription backend
5661
///
5762
/// @param register_notifications_request
58-
/// @return NotificationResponse on success and UStatus else
59-
virtual ResponseOrStatus<NotificationsResponse> register_for_notifications(
63+
/// @return future that resolves to NotificationResponse on success and
64+
/// UStatus else
65+
virtual communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
66+
register_for_notifications(
6067
const NotificationsRequest& register_notifications_request) = 0;
6168

6269
/// @brief unregisters for notifications to a USubscription backend
6370
///
6471
/// @param unregister_notifications_request
65-
/// @return NotificationResponse on success and UStatus else
66-
virtual ResponseOrStatus<NotificationsResponse>
72+
/// @return future that resolves to NotificationResponse on success and
73+
/// UStatus else
74+
virtual communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
6775
unregister_for_notifications(
6876
const NotificationsRequest& unregister_notifications_request) = 0;
6977

7078
/// @brief fetches all subscribers for a given topic from the backend
7179
///
7280
/// @param fetch_subscriptions_request containing the topic for which the
7381
/// subscribers are fetched
74-
/// @return FetchSubscriptionsResponse on success and UStatus else
75-
virtual ResponseOrStatus<FetchSubscribersResponse> fetch_subscribers(
82+
/// @return future that resolves to FetchSubscriptionsResponse on success
83+
/// and UStatus else
84+
virtual communication::RpcClient::InvokeProtoFuture<
85+
FetchSubscribersResponse>
86+
fetch_subscribers(
7687
const FetchSubscribersRequest& fetch_subscribers_request) = 0;
7788
};
7889

include/up-cpp/communication/RpcClient.h

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,22 @@ struct RpcClient {
7676
/// for the duration of an RPC call.
7777
using InvokeHandle = Connection::Handle;
7878

79-
/// @brief Extension to std::future that also holds a callback handle
80-
class InvokeFuture {
79+
/// @brief Extension to std::future with template type that also holds a
80+
/// callback handle
81+
template <typename T>
82+
class InvokeProtoFuture {
8183
InvokeHandle callback_handle_;
82-
std::future<MessageOrStatus> future_;
84+
std::future<utils::Expected<T, v1::UStatus>> future_;
8385

8486
public:
85-
InvokeFuture();
86-
InvokeFuture(InvokeFuture&&) noexcept;
87-
InvokeFuture(std::future<MessageOrStatus>&&, InvokeHandle&&) noexcept;
87+
InvokeProtoFuture() = default;
88+
InvokeProtoFuture(InvokeProtoFuture&& other) noexcept = default;
89+
InvokeProtoFuture& operator=(InvokeProtoFuture&& other) noexcept =
90+
default;
8891

89-
InvokeFuture& operator=(InvokeFuture&&) noexcept;
92+
InvokeProtoFuture(std::future<utils::Expected<T, v1::UStatus>>&& future,
93+
InvokeHandle&& handle) noexcept
94+
: callback_handle_(std::move(handle)), future_(std::move(future)) {}
9095

9196
/// @name Passthroughs for std::future
9297
/// @{
@@ -104,6 +109,8 @@ struct RpcClient {
104109
/// @}
105110
};
106111

112+
using InvokeFuture = InvokeProtoFuture<v1::UMessage>;
113+
107114
/// @brief Invokes an RPC method by sending a request message.
108115
///
109116
/// @param A Payload builder containing the payload to be sent with the
@@ -169,36 +176,48 @@ struct RpcClient {
169176
[[nodiscard]] InvokeFuture invokeMethod();
170177

171178
template <typename T, typename R>
172-
ResponseOrStatus<T> invokeProtoMethod(const R& request_message) {
179+
InvokeProtoFuture<T> invokeProtoMethod(const R& request_message) {
180+
auto result_promise =
181+
std::make_shared<std::promise<ResponseOrStatus<T>>>();
182+
auto future = result_promise->get_future();
183+
173184
auto payload_or_status =
174185
uprotocol::utils::ProtoConverter::protoToPayload(request_message);
175186

176187
if (!payload_or_status.has_value()) {
177-
return ResponseOrStatus<T>(
178-
UnexpectedStatus(payload_or_status.error()));
188+
result_promise->set_value(ResponseOrStatus<T>(
189+
UnexpectedStatus(payload_or_status.error())));
190+
return {std::move(future), InvokeHandle()};
179191
}
180192

181193
datamodel::builder::Payload tmp_payload(payload_or_status.value());
182194

183-
auto message_or_status =
184-
this->invokeMethod(std::move(tmp_payload)).get();
185-
186-
if (!message_or_status.has_value()) {
187-
return ResponseOrStatus<T>(
188-
UnexpectedStatus(message_or_status.error()));
189-
}
190-
191-
auto response_or_status = utils::ProtoConverter::extractFromProtobuf<T>(
192-
message_or_status.value());
193-
194-
if (!response_or_status.has_value()) {
195-
spdlog::error(
196-
"invokeProtoMethod: Error when extracting response from "
197-
"protobuf.");
198-
return response_or_status;
199-
}
200-
201-
return ResponseOrStatus<T>(response_or_status.value());
195+
auto handle = invokeMethod(
196+
builder_.build(std::move(tmp_payload)),
197+
[result_promise](const MessageOrStatus& message_or_status) {
198+
if (!message_or_status.has_value()) {
199+
result_promise->set_value(ResponseOrStatus<T>(
200+
UnexpectedStatus(message_or_status.error())));
201+
return;
202+
}
203+
auto response_or_status =
204+
utils::ProtoConverter::extractFromProtobuf<T>(
205+
message_or_status.value());
206+
207+
if (!response_or_status.has_value()) {
208+
spdlog::error(
209+
"invokeProtoMethod: Error when extracting response "
210+
"from "
211+
"protobuf.");
212+
result_promise->set_value(response_or_status);
213+
return;
214+
}
215+
216+
result_promise->set_value(
217+
ResponseOrStatus<T>(response_or_status.value()));
218+
});
219+
220+
return {std::move(future), std::move(handle)};
202221
}
203222

204223
/// @brief Default move constructor (defined in RpcClient.cpp)
@@ -224,4 +243,4 @@ struct RpcClient {
224243

225244
} // namespace uprotocol::communication
226245

227-
#endif // UP_CPP_COMMUNICATION_RPCCLIENT_H
246+
#endif // UP_CPP_COMMUNICATION_RPCCLIENT_H

0 commit comments

Comments
 (0)