Skip to content

Rpc client u subscription #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 8 additions & 90 deletions include/up-cpp/client/usubscription/v3/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,97 +16,15 @@
#include <up-cpp/communication/RpcClient.h>
#include <up-cpp/communication/Subscriber.h>
#include <up-cpp/datamodel/builder/Payload.h>
#include <up-cpp/utils/ProtoConverter.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
#include <uprotocol/v1/umessage.pb.h>

#include <utility>
#include "RequestBuilder.h"
#include "USubscriptionUUriBuilder.h"

namespace uprotocol::client::usubscription::v3 {
using uprotocol::core::usubscription::v3::SubscriptionRequest;
using uprotocol::core::usubscription::v3::UnsubscribeRequest;
using uprotocol::core::usubscription::v3::Update;
using uprotocol::core::usubscription::v3::uSubscription;

/**
* @struct ConsumerOptions
* @brief Additional details for uSubscription service.
*
* Each member represents an optional parameter for the uSubscription service.
*/
struct ConsumerOptions {
/// Permission level of the subscription request
std::optional<uint32_t> permission_level;
/// TAP token for access.
std::optional<std::string> token;
/// Expiration time of the subscription.
std::optional<std::chrono::system_clock::time_point> when_expire;
/// Sample period for the subscription messages in milliseconds.
std::optional<std::chrono::milliseconds> sample_period_ms;
/// Details of the subscriber.
std::optional<google::protobuf::Any> subscriber_details;
/// Details of the subscription.
std::optional<google::protobuf::Any> subscription_details;
};

/// @struct uSubscriptionUUriBuilder
/// @brief Structure to build uSubscription request URIs.
///
/// This structure is used to build URIs for uSubscription service. It uses the
/// service options from uSubscription proto to set the authority name, ue_id,
/// ue_version_major, and the notification topic resource ID in the URI.
struct USubscriptionUUriBuilder {
private:
/// URI for the uSubscription service
v1::UUri uri_;
/// Resource ID of the notification topic
uint32_t sink_resource_id_;

public:
/// @brief Constructor for uSubscriptionUUriBuilder.
USubscriptionUUriBuilder() {
// Get the service descriptor
const google::protobuf::ServiceDescriptor* service =
uSubscription::descriptor();
const auto& service_options = service->options();

// Get the service options
const auto& service_name =
service_options.GetExtension(uprotocol::service_name);
const auto& service_version_major =
service_options.GetExtension(uprotocol::service_version_major);
const auto& service_id =
service_options.GetExtension(uprotocol::service_id);
const auto& notification_topic =
service_options.GetExtension(uprotocol::notification_topic, 0);

// Set the values in the URI
uri_.set_authority_name(service_name);
uri_.set_ue_id(service_id);
uri_.set_ue_version_major(service_version_major);
sink_resource_id_ = notification_topic.id();
}

/// @brief Get the URI with a specific resource ID.
///
/// @param resource_id The resource ID to set in the URI.
///
/// @return The URI with the specified resource ID.
v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const {
v1::UUri uri = uri_; // Copy the base URI
uri.set_resource_id(resource_id);
return uri;
}

/// @brief Get the notification URI.
///
/// @return The notification URI.
v1::UUri getNotificationUri() const {
v1::UUri uri = uri_; // Copy the base URI
uri.set_resource_id(sink_resource_id_);
return uri;
}
};

/// @brief Interface for uEntities to create subscriptions.
///
Expand All @@ -133,7 +51,7 @@ struct Consumer {
const v1::UUri& subscription_topic, ListenCallback&& callback,
v1::UPriority priority,
std::chrono::milliseconds subscription_request_ttl,
ConsumerOptions consumer_options);
core::usubscription::v3::CallOptions consumer_options);

/// @brief Unsubscribe from the topic and call uSubscription service to
/// close the subscription.
Expand All @@ -160,7 +78,7 @@ struct Consumer {
/// @param subscriber_details Additional details about the subscriber.
Consumer(std::shared_ptr<transport::UTransport> transport,
v1::UUri subscription_topic,
ConsumerOptions consumer_options = {});
core::usubscription::v3::CallOptions consumer_options = {});

private:
// Transport
Expand All @@ -169,10 +87,10 @@ struct Consumer {
// Topic to subscribe to
const v1::UUri subscription_topic_;
// Additional details about uSubscription service
ConsumerOptions consumer_options_;
core::usubscription::v3::CallOptions consumer_options_;

// URI info about the uSubscription service
USubscriptionUUriBuilder uSubscriptionUUriBuilder_;
core::usubscription::v3::USubscriptionUUriBuilder uSubscriptionUUriBuilder_;

// Subscription updates
std::unique_ptr<communication::NotificationSink> noficationSinkHandle_;
Expand All @@ -191,10 +109,10 @@ struct Consumer {
friend std::unique_ptr<Consumer>
std::make_unique<Consumer, std::shared_ptr<transport::UTransport>,
const uprotocol::v1::UUri,
uprotocol::client::usubscription::v3::ConsumerOptions>(
uprotocol::core::usubscription::v3::CallOptions>(
std::shared_ptr<uprotocol::transport::UTransport>&&,
const uprotocol::v1::UUri&&,
uprotocol::client::usubscription::v3::ConsumerOptions&&);
uprotocol::core::usubscription::v3::CallOptions&&);

/// @brief Build SubscriptionRequest for subscription request
SubscriptionRequest buildSubscriptionRequest();
Expand Down
101 changes: 101 additions & 0 deletions include/up-cpp/client/usubscription/v3/RequestBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache License Version 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: Apache-2.0

#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H
#define UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H
#include <up-cpp/utils/ProtoConverter.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>

#include <utility>

namespace uprotocol::core::usubscription::v3 {

/// @struct CallOptions
/// @brief Additional details for uSubscription service.
///
/// Each member represents an optional parameter for the uSubscription service.
struct CallOptions {
/// Permission level of the subscription request
std::optional<uint32_t> permission_level;
/// TAP token for access.
std::optional<std::string> token;
/// Expiration time of the subscription.
std::optional<std::chrono::system_clock::time_point> when_expire;
/// Sample period for the subscription messages in milliseconds.
std::optional<std::chrono::milliseconds> sample_period_ms;
/// Details of the subscriber.
std::optional<google::protobuf::Any> subscriber_details;
/// Details of the subscription.
std::optional<google::protobuf::Any> subscription_details;
};

/// @brief Builds different requests using specified options.
///
/// This struct facilitates the construction of requests based on
/// `USubscriptionOptions`, providing methods to build different requests.
struct RequestBuilder {
/// @brief Builds a subscription request for a given topic.
///
/// @param topic The `v1::UUri` representing the topic for the subscription.
///
/// @return A `SubscriptionRequest` configured for the specified topic.
static SubscriptionRequest buildSubscriptionRequest(
const v1::UUri& topic, const CallOptions& options = {});

/// @brief Builds an unsubscription request for a given topic.
///
/// @param topic The `v1::UUri` representing the topic to unsubscribe from.
///
/// @return An `UnsubscribeRequest` configured for the specified topic.
static UnsubscribeRequest buildUnsubscribeRequest(const v1::UUri& topic);

/// @brief Build fetch subscritions request for a given topic.
///
/// @param topic The `v1::UUri` representing the topic to fetch.
///
/// @return A `FetchSubscriptionsRequest` configured for the specified
/// topic.
static FetchSubscriptionsRequest buildFetchSubscriptionsRequest(
const v1::UUri& topic);

/// @brief Build fetch subscritions request for a given subscriber.
///
/// @param subscriber The `SubscriberInfo` representing the subscriber to
/// fetch.
///
/// @return A `FetchSubscriptionsRequest` configured for the specified
/// subscriber.
static FetchSubscriptionsRequest buildFetchSubscriptionsRequest(
const SubscriberInfo& subscriber);

/// @brief Build fetch subscribers request for a given topic.
///
/// @param topic The `v1::UUri` representing the topic to fetch.
///
/// @return A `FetchSubscribersRequest` configured for the specified topic.
static FetchSubscribersRequest buildFetchSubscribersRequest(
const v1::UUri& topic);

/// @brief Build a notifications request for a given topic. Subscription
/// change
/// notifications MUST use topic SubscriptionsChange with resource id
/// 0x8000, as per the protobuf definition.
///
/// @param topic The `v1::UUri` representing the topic to (un)register
/// for/from.
///
/// @return A `NotificationsRequest` configured for the specified topic.
static NotificationsRequest buildNotificationsRequest(
const v1::UUri& topic);
};

} // namespace uprotocol::core::usubscription::v3
#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H
134 changes: 134 additions & 0 deletions include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache License Version 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: Apache-2.0

#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H
#define UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H

#include <up-cpp/communication/NotificationSink.h>
#include <up-cpp/communication/RpcClient.h>
#include <up-cpp/transport/UTransport.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>

#include "up-cpp/client/usubscription/v3/USubscription.h"
#include "up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h"

/// The uEntity (type) identifier of the uSubscription service.
constexpr uint32_t USUBSCRIPTION_TYPE_ID = 0x00000000;
/// The (latest) major version of the uSubscription service.
constexpr uint8_t UE_VERSION_MAJOR = 0x03;
/// The resource identifier of uSubscription's _subscribe_ operation.
constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001;
/// The resource identifier of uSubscription's _unsubscribe_ operation.
constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002;
/// The resource identifier of uSubscription's _fetch subscriptions_ operation.
constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003;
/// The resource identifier of uSubscription's _register for notifications_
/// operation.
constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006;
/// The resource identifier of uSubscription's _unregister for notifications_
/// operation.
constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007;
/// The resource identifier of uSubscription's _fetch subscribers_ operation.
constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008;

constexpr auto USUBSCRIPTION_REQUEST_TTL = std::chrono::milliseconds(5000);

namespace uprotocol::core::usubscription::v3 {
using v3::SubscriptionRequest;
using v3::UnsubscribeRequest;

struct USubscriptionOptions {
std::string authority_name;
uint16_t instance_id = 0x0000;
};

/// @brief Client which implements the USubscription interface
struct RpcClientUSubscription : USubscription {
using RpcClientUSubscriptionOrStatus =
utils::Expected<std::unique_ptr<RpcClientUSubscription>, v1::UStatus>;
using ListenCallback = transport::UTransport::ListenCallback;
using ListenHandle = transport::UTransport::ListenHandle;

/// @brief Subscribes from a given topic
///
/// @param subscription_request The request object containing the topic to
/// subscribe to
/// @return Returns a future that reslves to a SubscriptionResponse on
/// success and a UStatus else
communication::RpcClient::InvokeProtoFuture<SubscriptionResponse> subscribe(
const SubscriptionRequest& subscription_request) override;

/// @brief Unsubscribes from a given topic
///
/// @param unsubscribe_request The request object containing the topic to
/// unsubscribe from
/// @return Returns an UnsubscribeResponse on success and a UStatus else
communication::RpcClient::InvokeProtoFuture<UnsubscribeResponse>
unsubscribe(const UnsubscribeRequest& unsubscribe_request) override;

/// @brief Fetches the list of topics the client is subscribed to
///
/// @param fetch_subscriptions_request The request object
/// @return Returns a future that reslves to a FetchSubscriptionsResponse on
/// success and a UStatus else
communication::RpcClient::InvokeProtoFuture<FetchSubscriptionsResponse>
fetch_subscriptions(
const FetchSubscriptionsRequest& fetch_subscriptions_request) override;

/// @brief Fetches the list of subscribers for a given topic
///
/// @param fetch_subscribers_request The request object containing the topic
/// for which the subscribers are to be fetched
/// @return Returns a FetchSubscribersResponse on success and a UStatus else
communication::RpcClient::InvokeProtoFuture<FetchSubscribersResponse>
fetch_subscribers(
const FetchSubscribersRequest& fetch_subscribers_request) override;

/// @brief Registers to receive notifications
///
/// @param register_notifications_request The request object containing
/// the details to register for notifications
/// @return Returns a future that resolves to a NotificationResponse on
/// success and a UStatus else
communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
register_for_notifications(
const NotificationsRequest& register_notifications_request) override;

/// @brief Unregisters from receiving notifications.
///
/// @param unregister_notifications_request The request object containing
/// the details needed to stop receiving notifications.
/// @return Returns future that resolves to a NotificationResponse on
/// success and a UStatus else
communication::RpcClient::InvokeProtoFuture<NotificationsResponse>
unregister_for_notifications(
const NotificationsRequest& unregister_notifications_request) override;

/// @brief Constructor
///
/// @param transport Transport used to send messages
/// @param options Struct containing all options for the USubscription
/// client
explicit RpcClientUSubscription(
std::shared_ptr<transport::UTransport> transport,
const USubscriptionOptions& options);

~RpcClientUSubscription() override = default;

private:
std::shared_ptr<transport::UTransport> transport_;
std::shared_ptr<communication::RpcClient> rpc_client_;
USubscriptionUUriBuilder uuri_builder_;
};

} // namespace uprotocol::core::usubscription::v3

#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H
Loading
Loading