Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 202 additions & 17 deletions include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h
Original file line number Diff line number Diff line change
@@ -1,41 +1,226 @@
// SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
//
// Created by max on 28.04.25.
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache License Version 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: Apache-2.0

#ifndef RPCCLIENTUSUBSCRIPTION_H

Check warning on line 12 in include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h

View workflow job for this annotation

GitHub Actions / Lint C++ sources

include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h:12:9 [llvm-header-guard]

header guard does not follow preferred style
#define RPCCLIENTUSUBSCRIPTION_H

#include <up-cpp/communication/NotificationSink.h>
#include <up-cpp/communication/RpcClient.h>
#include <up-cpp/communication/Subscriber.h>
#include <up-cpp/datamodel/builder/Payload.h>
#include <up-cpp/utils/ProtoConverter.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
#include <uprotocol/v1/umessage.pb.h>

#include "USubscription.h"
#include "up-cpp/communication/RpcClient.h"
#include <utility>
#include "up-cpp/client/usubscription/v3/USubscription.h"

namespace uprotocol::core::usubscription::v3 {
using uprotocol::core::usubscription::v3::SubscriptionRequest;
using uprotocol::core::usubscription::v3::UnsubscribeRequest;
using uprotocol::core::usubscription::v3::Update;

Check warning on line 29 in include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h

View workflow job for this annotation

GitHub Actions / Lint C++ sources

include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h:29:43 [misc-unused-using-decls]

using decl 'Update' is unused
using uprotocol::core::usubscription::v3::uSubscription;

/**
* @struct RpcClientUSubscriptionOptions
* @brief Additional details for uSubscription service.
*
* Each member represents an optional parameter for the uSubscription service.
*/
struct RpcClientUSubscriptionOptions {
/// Permission level of the subscription request
std::optional<uint32_t> permission_level;
/// TAP token for access.
std::optional<std::string> token;
/// Expiration time of the subscription.
std::optional<std::chrono::system_clock::time_point> when_expire;
/// Sample period for the subscription messages in milliseconds.
std::optional<std::chrono::milliseconds> sample_period_ms;
/// Details of the subscriber.
std::optional<google::protobuf::Any> subscriber_details;
/// Details of the subscription.
std::optional<google::protobuf::Any> subscription_details;
};

struct RpcClientUSubscription : USubscription {
/// @struct uSubscriptionUUriBuilder
/// @brief Structure to build uSubscription request URIs.
///
/// This structure is used to build URIs for uSubscription service. It uses the
/// service options from uSubscription proto to set the authority name, ue_id,
/// ue_version_major, and the notification topic resource ID in the URI.
struct USubscriptionUUriBuilder {
private:
/// URI for the uSubscription service
v1::UUri uri_;
/// Resource ID of the notification topic
uint32_t sink_resource_id_;

public:
/// @brief Constructor for USubscriptionUUriBuilder.
USubscriptionUUriBuilder() {
// Get the service descriptor
const google::protobuf::ServiceDescriptor* service =
uSubscription::descriptor();
const auto& service_options = service->options();

// Get the service options
const auto& service_name =
service_options.GetExtension(uprotocol::service_name);
const auto& service_version_major =
service_options.GetExtension(uprotocol::service_version_major);
const auto& service_id =
service_options.GetExtension(uprotocol::service_id);
const auto& notification_topic =
service_options.GetExtension(uprotocol::notification_topic, 0);

// Set the values in the URI
uri_.set_authority_name(service_name);
uri_.set_ue_id(service_id);
uri_.set_ue_version_major(service_version_major);
sink_resource_id_ = notification_topic.id();
}

/// @brief Get the URI with a specific resource ID.
///
/// @param resource_id The resource ID to set in the URI.
///
/// @return The URI with the specified resource ID.
v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const {
v1::UUri uri = uri_; // Copy the base URI
uri.set_resource_id(resource_id);
return uri;
}

/// @brief Get the notification URI.
///
/// @return The notification URI.
v1::UUri getNotificationUri() const {
v1::UUri uri = uri_; // Copy the base URI
uri.set_resource_id(sink_resource_id_);
return uri;
}
};

explicit RpcClientUSubscription(std::unique_ptr<communication::RpcClient> client)
: client_(std::move(client)){};
/// @brief Interface for uEntities to create subscriptions.
///
/// Like all L3 client APIs, the RpcClientUSubscription is a wrapper on top of the
/// L2 Communication APIs and USubscription service.
struct RpcClientUSubscription : public USubscription{
using RpcClientUSubscriptionOrStatus =
utils::Expected<std::unique_ptr<RpcClientUSubscription>, v1::UStatus>;
using ListenCallback = transport::UTransport::ListenCallback;
using ListenHandle = transport::UTransport::ListenHandle;

/// @brief Create a subscription
///
/// @param transport Transport to register with.
/// @param subscription_topic Topic to subscribe to.
/// @param callback Function that is called when publish message is
/// received.
/// @param priority Priority of the subscription request.
/// @param subscribe_request_ttl Time to live for the subscription request.
/// @param rpc_client_usubscription_options Additional details for uSubscription service.
// [[nodiscard]] static RpcClientUSubscriptionOrStatus create(
// std::shared_ptr<transport::UTransport> transport,
// const v1::UUri& subscription_topic, ListenCallback&& callback,
// RpcClientUSubscriptionOptions rpc_client_usubscription_options);

/// @brief Subscribe to the topic
///
utils::Expected<SubscriptionResponse, v1::UStatus> subscribe(const SubscriptionRequest& subscription_request) override;
// void subscribe(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::SubscriptionRequest* request,
// ::uprotocol::core::usubscription::v3::SubscriptionResponse* response,
// ::google::protobuf::Closure* done) override;

/// @brief Unsubscribe from the topic and call uSubscription service to
/// close the subscription.
// void Unsubscribe(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::UnsubscribeRequest* request,
// ::uprotocol::core::usubscription::v3::UnsubscribeResponse* response,
// ::google::protobuf::Closure* done) override;

// /// @brief Fetch all subscriptions for a given topic or subscriber contained inside a [`FetchSubscriptionsRequest`]
// void FetchSubscriptions(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::FetchSubscriptionsRequest* request,
// ::uprotocol::core::usubscription::v3::FetchSubscriptionsResponse* response,
// ::google::protobuf::Closure* done) override;

// /// @brief Register for notifications relevant to a given topic inside a [`NotificationsRequest`]
// /// changing in subscription status.
// void RegisterForNotifications(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request,
// ::uprotocol::core::usubscription::v3::NotificationsResponse* response,
// ::google::protobuf::Closure* done) override;

// /// @brief Unregister for notifications relevant to a given topic inside a [`NotificationsRequest`]
// /// changing in subscription status.
// void UnregisterForNotifications(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::NotificationsRequest* request,
// ::uprotocol::core::usubscription::v3::NotificationsResponse* response,
// ::google::protobuf::Closure* done) override;

// /// @brief Fetch a list of subscribers that are currently subscribed to a given topic in a [`FetchSubscribersRequest`]
// void FetchSubscribers(google::protobuf::RpcController* controller,
// const ::uprotocol::core::usubscription::v3::FetchSubscribersRequest* request,
// ::uprotocol::core::usubscription::v3::FetchSubscribersResponse* response,
// ::google::protobuf::Closure* done) override;

/// @brief Destructor
~RpcClientUSubscription() override = default;

/// This section for test code only delete later

// protected:
/// @brief Constructor
///
/// @param transport Transport to register with.
/// @param subscriber_details Additional details about the subscriber.
explicit RpcClientUSubscription(std::shared_ptr<transport::UTransport> transport,
RpcClientUSubscriptionOptions rpc_client_usubscription_options = {});

void default_call_option();
private:
// Transport
std::shared_ptr<transport::UTransport> transport_;

SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) override;
// Topic to subscribe to
const v1::UUri subscription_topic_;

UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) override;
// Additional details about uSubscription service
RpcClientUSubscriptionOptions rpc_client_usubscription_options_;

FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) override;
// URI info about the uSubscription service
USubscriptionUUriBuilder uSubscriptionUUriBuilder_;

NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) override;
// Allow the protected constructor for this class to be used in make_unique
// inside of create()
friend std::unique_ptr<RpcClientUSubscription>
std::make_unique<RpcClientUSubscription, std::shared_ptr<transport::UTransport>,
const uprotocol::v1::UUri,
uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions>(
std::shared_ptr<uprotocol::transport::UTransport>&&,
const uprotocol::v1::UUri&&,
uprotocol::core::usubscription::v3::RpcClientUSubscriptionOptions&&);

NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) override;
/// @brief Build SubscriptionRequest for subscription request
SubscriptionRequest buildSubscriptionRequest();

FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) override;
/// @brief Build UnsubscriptionRequest for unsubscription request
UnsubscribeRequest buildUnsubscriptionRequest();

private:
std::unique_ptr<communication::RpcClient> client_;
/// @brief Create a notification sink to receive subscription updates
v1::UStatus createNotificationSink();

};

} // namespace uprotocol::core::usubscription::v3
} // namespace uprotocol::core::usubscription::v3

#endif //RPCCLIENTUSUBSCRIPTION_H
#endif // RPCCLIENTUSUBSCRIPTION_H
24 changes: 13 additions & 11 deletions include/up-cpp/client/usubscription/v3/USubscription.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
//
// Created by max on 28.04.25.
//

#ifndef USUBSCRIPTION_H

Check warning on line 1 in include/up-cpp/client/usubscription/v3/USubscription.h

View workflow job for this annotation

GitHub Actions / Lint C++ sources

include/up-cpp/client/usubscription/v3/USubscription.h:1:9 [llvm-header-guard]

header guard does not follow preferred style
#define USUBSCRIPTION_H
#include "RpcClientUSubscription.h"
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
#include <uprotocol/v1/umessage.pb.h>
#include <uprotocol/v1/ustatus.pb.h>
#include "up-cpp/utils/Expected.h"

namespace uprotocol::core::usubscription::v3 {

struct USubscription {

template<typename R>
using ResponseOrStatus = utils::Expected<R, v1::UStatus>;

virtual ~USubscription() = default;

virtual SubscriptionResponse subscribe(const SubscriptionRequest& subscription_request) = 0;
virtual ResponseOrStatus<SubscriptionResponse> subscribe(const SubscriptionRequest& subscription_request) = 0;

virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0;
// virtual UnsubscribeResponse unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0;

virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0;
// virtual FetchSubscriptionsResponse fetch_subscriptions(const FetchSubscriptionsRequest& fetch_subscribers_request) = 0;

virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ;
// virtual NotificationsResponse register_for_notifications(const NotificationsRequest& register_notifications_request) =0 ;

virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0;
// virtual NotificationsResponse unregister_for_notifications(const NotificationsRequest& unregister_notifications_request) = 0;

virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0;
// virtual FetchSubscribersResponse fetch_subscribers(const FetchSubscribersRequest& fetch_subscribers_request) = 0;

};

Expand Down
Loading
Loading