Skip to content

Commit afeeae4

Browse files
Add Observable subscriptions with RxJS support (#1357)
• Added createObservableSubscription() method to Node class • Returns ObservableSubscription with RxJS Observable for ROS 2 messages • Enables reactive programming with operators like throttleTime(), debounceTime(), map(), combineLatest() • New ObservableSubscription class exported from rclnodejs
1 parent 4a1b0db commit afeeae4

15 files changed

+990
-0
lines changed

CONTRIBUTORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
- Enhance Message Validation
4848
- Add TypeScript definitions and non-throwing variants for validator
4949
- Add MessageIntrospector for message schema inspection
50+
- Add Observable subscriptions with RxJS support
5051

5152
- **[Martins Mozeiko](https://github.com/martins-mozeiko)**
5253
- QoS new/delete fix

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ rclnodejs.init().then(() => {
2828
- [Tutorials](./tutorials/)
2929
- [Electron-based Visualization](#electron-based-visualization)
3030
- [Using TypeScript](#using-rclnodejs-with-typescript)
31+
- [Observable Subscriptions](#observable-subscriptions)
3132
- [ROS2 Interface Message Generation](#ros2-interface-message-generation)
3233
- [Performance Benchmarks](#performance-benchmarks)
3334
- [Efficient Usage Tips](./docs/EFFICIENCY.md)
@@ -133,6 +134,27 @@ rclnodejs.init().then(() => {
133134

134135
See [TypeScript demos](https://github.com/RobotWebTools/rclnodejs/tree/develop/ts_demo) for more examples.
135136

137+
## Observable Subscriptions
138+
139+
rclnodejs supports [RxJS](https://rxjs.dev/) Observable subscriptions for reactive programming with ROS 2 messages. Use operators like `throttleTime()`, `debounceTime()`, `map()`, and `combineLatest()` to build declarative message processing pipelines.
140+
141+
```javascript
142+
const { throttleTime, map } = require('rxjs');
143+
144+
const obsSub = node.createObservableSubscription(
145+
'sensor_msgs/msg/LaserScan',
146+
'/scan'
147+
);
148+
obsSub.observable
149+
.pipe(
150+
throttleTime(200),
151+
map((msg) => msg.ranges)
152+
)
153+
.subscribe((ranges) => console.log('Ranges:', ranges.length));
154+
```
155+
156+
See the [Observable Subscriptions Tutorial](./tutorials/observable-subscriptions.md) for more details.
157+
136158
## ROS2 Interface Message Generation
137159

138160
ROS client libraries convert IDL message descriptions into target language source code. rclnodejs provides the `generate-ros-messages` script to generate JavaScript message interface files and TypeScript declarations.

example/topics/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,21 @@ The `subscriber/` directory contains examples of nodes that subscribe to topics:
187187
- **Features**: Manual conversion of TypedArrays, BigInt, and special values for JSON serialization
188188
- **Run Command**: `node subscriber/subscription-json-utilities-example.js`
189189

190+
### 10. Observable Subscriber (`subscription-observable-example.js`)
191+
192+
**Purpose**: Demonstrates RxJS Observable subscriptions for reactive programming.
193+
194+
- **Message Type**: `std_msgs/msg/String`
195+
- **Topic**: `topic`
196+
- **Functionality**: Shows how to use `createObservableSubscription()` with RxJS operators
197+
- **Features**:
198+
- Throttling with `throttleTime()` for rate limiting
199+
- Message transformation with `map()`
200+
- Content filtering with `filter()`
201+
- Batching with `bufferCount()`
202+
- **Run Command**: `node subscriber/subscription-observable-example.js`
203+
- **Pair**: Works with `publisher-example.js`
204+
190205
## Validator Example
191206

192207
The `validator/` directory contains validation utilities:
@@ -211,6 +226,7 @@ Several examples work together to demonstrate complete communication:
211226
| `publisher-multiarray-example.js` | `subscription-multiarray-example.js` | Multi-dimensional array data |
212227
| `publisher-qos-example.js` | `subscription-qos-example.js` | QoS configuration |
213228
| `publisher-raw-message.js` | `subscription-raw-message.js` | Raw binary data |
229+
| `publisher-example.js` | `subscription-observable-example.js` | RxJS Observable subscription |
214230

215231
## How to Run Examples
216232

@@ -237,6 +253,7 @@ Several examples work together to demonstrate complete communication:
237253
- **Message Serialization**: TypedArray handling and JSON-safe conversion for web applications
238254
- **Name Validation**: Topic names, node names, and namespace validation utilities
239255
- **Message Validation**: Schema introspection and pre-publish message validation with detailed error reporting
256+
- **Observable Subscriptions**: RxJS-based reactive programming with operators for throttling, filtering, and combining message streams
240257

241258
## Notes
242259

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
'use strict';
16+
17+
const rclnodejs = require('../../../index.js');
18+
const { throttleTime, map, filter, bufferCount } = require('rxjs');
19+
20+
async function main() {
21+
await rclnodejs.init();
22+
23+
const node = rclnodejs.createNode('observable_subscription_example_node');
24+
25+
// Basic observable subscription
26+
const obsSub = node.createObservableSubscription(
27+
'std_msgs/msg/String',
28+
'topic'
29+
);
30+
31+
// Example 1: Throttled messages (max 2/sec)
32+
obsSub.observable
33+
.pipe(
34+
throttleTime(500),
35+
map((msg) => msg.data)
36+
)
37+
.subscribe((data) => {
38+
console.log('[Throttled]', data);
39+
});
40+
41+
// Example 2: Filtered messages (only containing "ROS")
42+
// Note: RxJS filter() operates at the application level after messages are received.
43+
// For more efficient filtering at the DDS middleware level (reducing network traffic),
44+
// use the contentFilter option. See: tutorials/content-filtering-subscription.md
45+
obsSub.observable
46+
.pipe(
47+
map((msg) => msg.data),
48+
filter((data) => data.includes('ROS'))
49+
)
50+
.subscribe((data) => {
51+
console.log('[Filtered]', data);
52+
});
53+
54+
// Example 3: Batched messages (every 3 messages)
55+
obsSub.observable
56+
.pipe(
57+
map((msg) => msg.data),
58+
bufferCount(3)
59+
)
60+
.subscribe((batch) => {
61+
console.log('[Batched]', batch.length, 'messages');
62+
});
63+
64+
console.log('Observable subscription created on /topic');
65+
console.log(
66+
'Run: ros2 topic pub /topic std_msgs/msg/String "{data: Hello ROS}" -r 5'
67+
);
68+
69+
rclnodejs.spin(node);
70+
}
71+
72+
main();

index.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const ParameterClient = require('./lib/parameter_client.js');
6262
const errors = require('./lib/errors.js');
6363
const ParameterWatcher = require('./lib/parameter_watcher.js');
6464
const MessageIntrospector = require('./lib/message_introspector.js');
65+
const ObservableSubscription = require('./lib/observable_subscription.js');
6566
const { spawn } = require('child_process');
6667
const {
6768
ValidationProblem,
@@ -236,6 +237,9 @@ let rcl = {
236237
/** {@link ParameterWatcher} class */
237238
ParameterWatcher: ParameterWatcher,
238239

240+
/** {@link ObservableSubscription} class */
241+
ObservableSubscription: ObservableSubscription,
242+
239243
/** {@link QoS} class */
240244
QoS: QoS,
241245

lib/node.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ const QoS = require('./qos.js');
4545
const Rates = require('./rate.js');
4646
const Service = require('./service.js');
4747
const Subscription = require('./subscription.js');
48+
const ObservableSubscription = require('./observable_subscription.js');
4849
const TimeSource = require('./time_source.js');
4950
const Timer = require('./timer.js');
5051
const TypeDescriptionService = require('./type_description_service.js');
@@ -820,6 +821,42 @@ class Node extends rclnodejs.ShadowNode {
820821
return subscription;
821822
}
822823

824+
/**
825+
* Create a Subscription that returns an RxJS Observable.
826+
* This allows using reactive programming patterns with ROS 2 messages.
827+
*
828+
* @param {function|string|object} typeClass - The ROS message class,
829+
* OR a string representing the message class, e.g. 'std_msgs/msg/String',
830+
* OR an object representing the message class, e.g. {package: 'std_msgs', type: 'msg', name: 'String'}
831+
* @param {string} topic - The name of the topic.
832+
* @param {object} [options] - The options argument used to parameterize the subscription.
833+
* @param {boolean} [options.enableTypedArray=true] - The topic will use TypedArray if necessary.
834+
* @param {QoS} [options.qos=QoS.profileDefault] - ROS Middleware "quality of service" settings.
835+
* @param {boolean} [options.isRaw=false] - The topic is serialized when true.
836+
* @param {string} [options.serializationMode='default'] - Controls message serialization format.
837+
* @param {object} [options.contentFilter] - The content-filter (if supported by RMW).
838+
* @param {SubscriptionEventCallbacks} [eventCallbacks] - The event callbacks for the subscription.
839+
* @return {ObservableSubscription} - An ObservableSubscription with an RxJS Observable.
840+
*/
841+
createObservableSubscription(typeClass, topic, options, eventCallbacks) {
842+
let observableSubscription = null;
843+
844+
const subscription = this.createSubscription(
845+
typeClass,
846+
topic,
847+
options,
848+
(message) => {
849+
if (observableSubscription) {
850+
observableSubscription._emit(message);
851+
}
852+
},
853+
eventCallbacks
854+
);
855+
856+
observableSubscription = new ObservableSubscription(subscription);
857+
return observableSubscription;
858+
}
859+
823860
/**
824861
* Create a Client.
825862
* @param {function|string|object} typeClass - The ROS message class,

lib/observable_subscription.js

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
'use strict';
16+
17+
const { Subject } = require('rxjs');
18+
19+
/**
20+
* A wrapper that provides RxJS Observable support for ROS 2 subscriptions.
21+
* This class wraps a standard Subscription and emits messages through an Observable.
22+
*
23+
* @class ObservableSubscription
24+
* @hideconstructor
25+
*/
26+
class ObservableSubscription {
27+
#subscription;
28+
#subject;
29+
#destroyed;
30+
31+
/**
32+
* Create an ObservableSubscription wrapper.
33+
* @param {Subscription} subscription - The underlying ROS 2 subscription
34+
*/
35+
constructor(subscription) {
36+
this.#subscription = subscription;
37+
this.#subject = new Subject();
38+
this.#destroyed = false;
39+
}
40+
41+
/**
42+
* Get the RxJS Observable for this subscription.
43+
* Use this to pipe operators and subscribe to messages.
44+
* @type {Observable}
45+
*/
46+
get observable() {
47+
return this.#subject.asObservable();
48+
}
49+
50+
/**
51+
* Get the underlying ROS 2 subscription.
52+
* @type {Subscription}
53+
*/
54+
get subscription() {
55+
return this.#subscription;
56+
}
57+
58+
/**
59+
* Get the topic name.
60+
* @type {string}
61+
*/
62+
get topic() {
63+
return this.#subscription.topic;
64+
}
65+
66+
/**
67+
* Check if this observable subscription has been destroyed.
68+
* @type {boolean}
69+
*/
70+
get isDestroyed() {
71+
return this.#destroyed;
72+
}
73+
74+
/**
75+
* Internal method to emit a message to subscribers.
76+
* Called by the subscription's processResponse.
77+
* @private
78+
* @param {any} message - The message to emit
79+
*/
80+
_emit(message) {
81+
if (!this.#destroyed) {
82+
this.#subject.next(message);
83+
}
84+
}
85+
86+
/**
87+
* Complete the observable and clean up resources.
88+
* After calling this, no more messages will be emitted.
89+
*/
90+
complete() {
91+
if (!this.#destroyed) {
92+
this.#destroyed = true;
93+
this.#subject.complete();
94+
}
95+
}
96+
97+
/**
98+
* Alias for complete() for consistency with RxJS naming.
99+
*/
100+
destroy() {
101+
this.complete();
102+
}
103+
}
104+
105+
module.exports = ObservableSubscription;

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
"debug": "^4.4.0",
8181
"json-bigint": "^1.0.0",
8282
"node-addon-api": "^8.3.1",
83+
"rxjs": "^7.8.1",
8384
"walk": "^2.3.15"
8485
},
8586
"husky": {

0 commit comments

Comments
 (0)