diff --git a/src/ros2cs/ros2cs_common/interfaces/IExtendedDisposable.cs b/src/ros2cs/ros2cs_common/interfaces/IExtendedDisposable.cs
index ad4c0ff6..60440e57 100644
--- a/src/ros2cs/ros2cs_common/interfaces/IExtendedDisposable.cs
+++ b/src/ros2cs/ros2cs_common/interfaces/IExtendedDisposable.cs
@@ -20,6 +20,11 @@ namespace ROS2
/// Use instead of IDisposable
public interface IExtendedDisposable : IDisposable
{
+ /// If the object is in a disposed state.
+ ///
+ /// Being in a disposed state does not mean that an object has ben disposed successfully.
+ /// Call to assert that an object has been disposed successfully.
+ ///
bool IsDisposed { get; }
}
diff --git a/src/ros2cs/ros2cs_core/CMakeLists.txt b/src/ros2cs/ros2cs_core/CMakeLists.txt
index f6a2f929..f08ab886 100644
--- a/src/ros2cs/ros2cs_core/CMakeLists.txt
+++ b/src/ros2cs/ros2cs_core/CMakeLists.txt
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 Robotec.ai
+# Copyright 2019-2023 Robotec.ai
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -79,6 +79,9 @@ set(CS_INTERFACES
interfaces/IService.cs
interfaces/IPublisher.cs
interfaces/ISubscription.cs
+ interfaces/IContext.cs
+ interfaces/IExecutor.cs
+ interfaces/IWaitable.cs
)
set(CS_NATIVE
@@ -91,6 +94,9 @@ set(CS_NATIVE
set(CS_UTILS
utils/Utils.cs
+ utils/LockedDictionary.cs
+ utils/LockedCollection.cs
+ utils/MappedValueDictionary.cs
)
set(CS_SOURCES
@@ -103,9 +109,12 @@ set(CS_SOURCES
Node.cs
Publisher.cs
QualityOfServiceProfile.cs
- Ros2cs.cs
Subscription.cs
WaitSet.cs
+ Context.cs
+ GuardCondition.cs
+ executors/ManualExecutor.cs
+ executors/TaskExecutor.cs
properties/AssemblyInfo.cs
)
diff --git a/src/ros2cs/ros2cs_core/Client.cs b/src/ros2cs/ros2cs_core/Client.cs
index fd39ab16..f2b64756 100644
--- a/src/ros2cs/ros2cs_core/Client.cs
+++ b/src/ros2cs/ros2cs_core/Client.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
// limitations under the License.
using System;
-using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
@@ -23,393 +22,371 @@
namespace ROS2
{
- /// Client with a topic and Types for Messages
- /// Instances are created by
- /// Message Type to be send
- /// Message Type to be received
- public class Client: IClient
- where I : Message, new()
- where O : Message, new()
- {
- ///
- public string Topic { get { return topic; } }
-
- public rcl_client_t Handle { get { return serviceHandle; } }
-
- ///
- public IReadOnlyDictionary> PendingRequests {get; private set;}
-
- ///
- IReadOnlyDictionary IClientBase.PendingRequests {get { return (IReadOnlyDictionary)this.PendingRequests; }}
-
- private string topic;
-
- ///
- public object Mutex { get { return mutex; } }
-
- private object mutex = new object();
-
///
- /// Mapping from request id without Response to .
+ /// Client with a topic and types for messages wrapping a rcl client.
///
///
- /// The is stored separately to allow
- /// to work even if the source returns multiple tasks.
+ /// This is the implementation produced by ,
+ /// use this method to create new instances.
///
- private Dictionary, Task)> Requests;
-
- private Ros2csLogger logger = Ros2csLogger.GetInstance();
-
- rcl_client_t serviceHandle;
-
- IntPtr serviceOptions = IntPtr.Zero;
-
- rcl_node_t nodeHandle;
-
- ///
- public bool IsDisposed { get { return disposed; } }
- private bool disposed = false;
-
- ///
- /// Internal constructor for Client
- ///
- /// Use to construct new Instances
- public Client(string pubTopic, Node node, QualityOfServiceProfile qos = null)
- {
- topic = pubTopic;
- nodeHandle = node.nodeHandle;
-
- QualityOfServiceProfile qualityOfServiceProfile = qos;
- if (qualityOfServiceProfile == null)
- qualityOfServiceProfile = new QualityOfServiceProfile(QosPresetProfile.SERVICES_DEFAULT);
-
- Requests = new Dictionary, Task)>();
- PendingRequests = new PendingTasksView(Requests);
-
- serviceOptions = NativeRclInterface.rclcs_client_create_options(qualityOfServiceProfile.handle);
-
- IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
-
- serviceHandle = NativeRcl.rcl_get_zero_initialized_client();
- Utils.CheckReturnEnum(NativeRcl.rcl_client_init(
- ref serviceHandle,
- ref nodeHandle,
- typeSupportHandle,
- topic,
- serviceOptions));
- }
-
- ~Client()
- {
- Dispose();
- }
-
- public void Dispose()
- {
- DestroyClient();
- }
-
- /// "Destructor" supporting disposable model
- private void DestroyClient()
+ ///
+ ///
+ public sealed class Client : IClient, IRawClient
+ where I : Message, new()
+ where O : Message, new()
{
- lock (mutex)
- {
- if (!disposed)
+ ///
+ public string Topic { get; private set; }
+
+ ///
+ /// This dictionary is thread safe.
+ ///
+ ///
+ public IReadOnlyDictionary> PendingRequests { get; private set; }
+
+ ///
+ /// This dictionary is thread safe.
+ ///
+ ///
+ IReadOnlyDictionary IClientBase.PendingRequests { get { return this.UntypedPendingRequests; } }
+
+ ///
+ /// Wrapper for .
+ ///
+ private readonly IReadOnlyDictionary UntypedPendingRequests;
+
+ ///
+ public bool IsDisposed
{
- lock (Requests)
- {
- foreach (var source in Requests.Values)
+ get
{
- bool success = source.Item1.TrySetException(new ObjectDisposedException("client has been disposed"));
- Debug.Assert(success);
+ bool ok = NativeRclInterface.rclcs_client_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
}
- Requests.Clear();
- }
- Utils.CheckReturnEnum(NativeRcl.rcl_client_fini(ref serviceHandle, ref nodeHandle));
- NativeRclInterface.rclcs_client_dispose_options(serviceOptions);
- logger.LogInfo("Client destroyed");
- disposed = true;
}
- }
- }
-
- ///
- public bool IsServiceAvailable()
- {
- bool available = false;
- Utils.CheckReturnEnum(NativeRcl.rcl_service_server_is_available(
- ref nodeHandle,
- ref serviceHandle,
- ref available
- ));
- return available;
- }
- ///
- public void TakeMessage()
- {
- MessageInternals msg = new O() as MessageInternals;
- rcl_rmw_request_id_t request_header = default(rcl_rmw_request_id_t);
- int ret;
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ /// Handle to the rcl client.
+ ///
+ public IntPtr Handle { get; private set; } = IntPtr.Zero;
+
+ ///
+ /// Handle to the rcl client options.
+ ///
+ private IntPtr Options = IntPtr.Zero;
+
+ ///
+ /// Node associated with this instance.
+ ///
+ private readonly Node Node;
+
+ ///
+ /// Mapping from request id without Response to .
+ ///
+ ///
+ /// The is stored separately to allow
+ /// to work even if the source returns multiple tasks.
+ /// Furthermore, this object is used for locking.
+ ///
+ private readonly Dictionary, Task)> Requests = new Dictionary, Task)>();
+
+ ///
+ /// Create a new instance.
+ ///
+ ///
+ /// The caller is responsible for adding the instance to .
+ /// This action is not thread safe.
+ ///
+ /// Topic to subscribe to.
+ /// Node to associate with.
+ /// QOS setting for this subscription.
+ /// If was disposed.
+ internal Client(string topic, Node node, QualityOfServiceProfile qos = null)
{
- return;
+ this.Topic = topic;
+ this.Node = node;
+
+ var lockedRequests = new LockedDictionary, Task)>(this.Requests);
+ this.PendingRequests = new MappedValueDictionary, Task), Task>(
+ lockedRequests,
+ tuple => tuple.Item2
+ );
+ this.UntypedPendingRequests = new MappedValueDictionary, Task), Task>(
+ lockedRequests,
+ tuple => tuple.Item2
+ );
+
+ QualityOfServiceProfile qualityOfServiceProfile = qos ?? new QualityOfServiceProfile(QosPresetProfile.SERVICES_DEFAULT);
+
+ this.Options = NativeRclInterface.rclcs_client_create_options(qualityOfServiceProfile.handle);
+
+ IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
+
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_client();
+ int ret = NativeRcl.rcl_client_init(
+ this.Handle,
+ this.Node.Handle,
+ typeSupportHandle,
+ this.Topic,
+ this.Options
+ );
+
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
}
- ret = NativeRcl.rcl_take_response(
- ref serviceHandle,
- ref request_header,
- msg.Handle
- );
- }
- if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_CLIENT_TAKE_FAILED)
- {
- Utils.CheckReturnEnum(ret);
- ProcessResponse(request_header.sequence_number, msg);
- }
- }
- ///
- /// Populates managed fields with native values and finishes the corresponding
- ///
- /// Message that will be populated and used as the task result
- /// sequence number received when sending the Request
- private void ProcessResponse(long sequence_number, MessageInternals msg)
- {
- bool exists = false;
- (TaskCompletionSource, Task) source = default((TaskCompletionSource, Task));
- lock (Requests)
- {
- if (Requests.TryGetValue(sequence_number, out source))
+ ///
+ /// This method is not thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ public bool IsServiceAvailable()
{
- exists = true;
- Requests.Remove(sequence_number);
+ bool available = false;
+ Utils.CheckReturnEnum(NativeRcl.rcl_service_server_is_available(
+ this.Node.Handle,
+ this.Handle,
+ out available
+ ));
+ GC.KeepAlive(this);
+ return available;
}
- }
- if (exists)
- {
- msg.ReadNativeMessage();
- source.Item1.SetResult((O)msg);
- }
- else
- {
- Debug.Print("received unknown sequence number or got disposed");
- }
- }
-
- ///
- /// Send a Request to the Service
- ///
- /// Message to be send
- /// sequence number of the Request
- private long SendRequest(I msg)
- {
- long sequence_number = default(long);
- MessageInternals msgInternals = msg as MessageInternals;
- msgInternals.WriteNativeMessage();
- Utils.CheckReturnEnum(
- NativeRcl.rcl_send_request(
- ref serviceHandle,
- msgInternals.Handle,
- ref sequence_number
- )
- );
- return sequence_number;
- }
- ///
- /// Associate a task with a sequence number
- ///
- /// source used to controll the
- /// sequence number received when sending the Request
- /// The associated task.
- private Task RegisterSource(TaskCompletionSource source, long sequence_number)
- {
- Task task = source.Task;
- lock (Requests)
- {
- Requests.Add(sequence_number, (source, task));
- }
- return task;
- }
-
- ///
- public O Call(I msg)
- {
- var task = CallAsync(msg);
- task.Wait();
- return task.Result;
- }
-
- ///
- public Task CallAsync(I msg)
- {
- return CallAsync(msg, TaskCreationOptions.None);
- }
+ ///
+ /// This method is thread safe.
+ ///
+ ///
+ public bool TryProcess()
+ {
+ rcl_rmw_request_id_t header = default(rcl_rmw_request_id_t);
+ O message = new O();
+ (TaskCompletionSource, Task) source;
+ bool exists = false;
- ///
- public Task CallAsync(I msg, TaskCreationOptions options)
- {
- TaskCompletionSource source;
- lock (mutex)
- {
- if (!Ros2cs.Ok() || disposed)
- {
- throw new InvalidOperationException("Cannot service as the class is already disposed or shutdown was called");
- }
- // prevent TakeMessage from receiving Responses before we called RegisterSource
- long sequence_number = SendRequest(msg);
- source = new TaskCompletionSource(options);
- return RegisterSource(source, sequence_number);
- }
- }
+ lock (this.Requests)
+ {
+ // prevent taking responses before RegisterSource was called
+ int ret = NativeRcl.rcl_take_response(
+ this.Handle,
+ ref header,
+ (message as MessageInternals).Handle
+ );
+ GC.KeepAlive(this);
+
+ switch ((RCLReturnEnum)ret)
+ {
+ case RCLReturnEnum.RCL_RET_CLIENT_TAKE_FAILED:
+ case RCLReturnEnum.RCL_RET_CLIENT_INVALID:
+ return false;
+ default:
+ Utils.CheckReturnEnum(ret);
+ break;
+ }
+
+ if (this.Requests.TryGetValue(header.sequence_number, out source))
+ {
+ exists = true;
+ this.Requests.Remove(header.sequence_number);
+ }
+ }
+ if (exists)
+ {
+ (message as MessageInternals).ReadNativeMessage();
+ source.Item1.SetResult(message);
+ }
+ else
+ {
+ Debug.Print("received request which was not pending, maybe canceled");
+ }
+ return true;
+ }
- ///
- public bool Cancel(Task task)
- {
- var pair = default(KeyValuePair, Task)>);
- try
- {
- lock(this.Requests)
+ ///
+ /// The provided message can be modified or disposed after this call.
+ /// Furthermore, this method is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ public O Call(I msg)
{
- pair = this.Requests.First(entry => entry.Value.Item2 == task);
- // has to be true
- this.Requests.Remove(pair.Key);
+ var task = CallAsync(msg);
+ task.Wait();
+ return task.Result;
}
- }
- catch (InvalidOperationException)
- {
- return false;
- }
- pair.Value.Item1.SetCanceled();
- return true;
- }
- ///
- /// Wrapper to avoid exposing to users.
- ///
- ///
- /// The locking used is required because the user may access the view while is running.
- ///
- private class PendingTasksView : IReadOnlyDictionary>, IReadOnlyDictionary
- {
- public Task this[long key]
- {
- get
+ ///
+ /// This method is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ public Task CallAsync(I msg)
{
- lock (this.Requests)
- {
- return this.Requests[key].Item2;
- }
+ return CallAsync(msg, TaskCreationOptions.None);
}
- }
- Task IReadOnlyDictionary.this[long key]
- {
- get { return this[key]; }
- }
+ ///
+ /// This method is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ public Task CallAsync(I msg, TaskCreationOptions options)
+ {
+ var source = new TaskCompletionSource(options);
+ lock (this.Requests)
+ {
+ // prevents TryProcess from receiving Responses before we called RegisterSource
+ long sequence_number = SendRequest(msg);
+ return RegisterSource(source, sequence_number);
+ }
+ }
- public IEnumerable Keys
- {
- get
+ ///
+ /// Send a Request to the Service
+ ///
+ /// Message to be send
+ /// sequence number of the Request
+ private long SendRequest(I msg)
{
- lock (this.Requests)
- {
- return this.Requests.Keys.ToArray();
- }
+ long sequence_number = default(long);
+ MessageInternals msgInternals = msg as MessageInternals;
+ msgInternals.WriteNativeMessage();
+ Utils.CheckReturnEnum(
+ NativeRcl.rcl_send_request(
+ this.Handle,
+ msgInternals.Handle,
+ out sequence_number
+ )
+ );
+ GC.KeepAlive(this);
+ return sequence_number;
}
- }
- public IEnumerable> Values
- {
- get
+ ///
+ /// Associate a task with a sequence number
+ ///
+ /// source used to controll the
+ /// sequence number received when sending the Request
+ /// The associated task.
+ private Task RegisterSource(TaskCompletionSource source, long sequence_number)
{
- lock (this.Requests)
- {
- return this.Requests.Values.Select(value => value.Item2).ToArray();
- }
+ // handle Task not being a singleton
+ Task task = source.Task;
+ Requests.Add(sequence_number, (source, task));
+ return task;
}
- }
- IEnumerable IReadOnlyDictionary.Values
- {
- get { return this.Values; }
- }
+ ///
+ /// Tasks are automatically removed on completion and have to be removed only when canceled.
+ /// Furthermore, this method is thread safe.
+ ///
+ ///
+ public bool Cancel(Task task)
+ {
+ var pair = default(KeyValuePair, Task)>);
+ lock (this.Requests)
+ {
+ try
+ {
+ pair = this.Requests.First(entry => entry.Value.Item2 == task);
+ }
+ catch (InvalidOperationException)
+ {
+ return false;
+ }
+ // has to be true
+ bool success = this.Requests.Remove(pair.Key);
+ Debug.Assert(success, "failed to remove matching request");
+ }
+ pair.Value.Item1.SetCanceled();
+ return true;
+ }
- public int Count
- {
- get
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the client is in use.
+ /// Disposal is automatically performed on finalization by the GC.
+ /// Any pending tasks are removed and set to have faulted with
+ /// .
+ ///
+ ///
+ public void Dispose()
{
- lock (this.Requests)
- {
- return this.Requests.Count;
- }
+ this.Dispose(true);
+ // finalizer not needed when we disposed successfully
+ GC.SuppressFinalize(this);
}
- }
- private readonly IReadOnlyDictionary, Task)> Requests;
+ /// Disposal logic.
+ /// If this method is not called in a finalizer.
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ // only do if Node.CurrentClients and this.Requests have not been finalized
+ // save since if we are being finalized we are not in a wait set anymore
+ if (disposing)
+ {
+ bool success = this.Node.RemoveClient(this);
+ Debug.Assert(success, "failed to remove client");
+ this.DisposeAllTasks();
+ }
- public PendingTasksView(IReadOnlyDictionary, Task)> requests)
- {
- this.Requests = requests;
- }
+ Utils.CheckReturnEnum(NativeRcl.rcl_client_fini(this.Handle, this.Node.Handle));
+ this.FreeHandles();
+ }
- public bool ContainsKey(long key)
- {
- lock (this.Requests)
+ ///
+ void IRawClient.DisposeFromNode()
{
- return this.Requests.ContainsKey(key);
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ this.DisposeAllTasks();
+ Utils.CheckReturnEnum(NativeRcl.rcl_client_fini(this.Handle, this.Node.Handle));
+ this.FreeHandles();
}
- }
- public bool TryGetValue(long key, out Task value)
- {
- bool success = false;
- (TaskCompletionSource, Task) source = default((TaskCompletionSource, Task));
- lock (this.Requests)
+ ///
+ /// Dispose all tasks currently pending.
+ ///
+ private void DisposeAllTasks()
{
- success = this.Requests.TryGetValue(key, out source);
+ lock (this.Requests)
+ {
+ foreach (var source in this.Requests.Values)
+ {
+ source.Item1.TrySetException(new ObjectDisposedException($"client for topic '{this.Topic}'"));
+ }
+ this.Requests.Clear();
+ }
}
- value = source.Item2;
- return success;
- }
-
- bool IReadOnlyDictionary.TryGetValue(long key, out Task value)
- {
- bool success = this.TryGetValue(key, out var task);
- value = task;
- return success;
- }
-
- public IEnumerator>> GetEnumerator()
- {
- lock (this.Requests)
+
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
{
- return this.Requests
- .Select(pair => new KeyValuePair>(pair.Key, pair.Value.Item2))
- .ToArray()
- .AsEnumerable()
- .GetEnumerator();
+ NativeRclInterface.rclcs_free_client(this.Handle);
+ this.Handle = IntPtr.Zero;
+ NativeRclInterface.rclcs_client_dispose_options(this.Options);
+ this.Options = IntPtr.Zero;
}
- }
-
- IEnumerator IEnumerable.GetEnumerator()
- {
- return this.GetEnumerator();
- }
- IEnumerator> IEnumerable>.GetEnumerator()
- {
- lock (this.Requests)
+ ~Client()
{
- return this.Requests
- .Select(pair => new KeyValuePair(pair.Key, pair.Value.Item2))
- .ToArray()
- .AsEnumerable()
- .GetEnumerator();
+ this.Dispose(false);
}
- }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/Clock.cs b/src/ros2cs/ros2cs_core/Clock.cs
index 0afc5ddc..bc89aefc 100644
--- a/src/ros2cs/ros2cs_core/Clock.cs
+++ b/src/ros2cs/ros2cs_core/Clock.cs
@@ -48,7 +48,7 @@ public RosTime Now
{
RosTime time = new RosTime();
long queryNowNanoseconds = 0;
- NativeRcl.rcl_clock_get_now(handle, ref queryNowNanoseconds);
+ NativeRcl.rcl_clock_get_now(handle, out queryNowNanoseconds);
time.sec = (int)(queryNowNanoseconds / (long)1e9);
time.nanosec = (uint)(queryNowNanoseconds - time.sec*((long)1e9));
return time;
diff --git a/src/ros2cs/ros2cs_core/Context.cs b/src/ros2cs/ros2cs_core/Context.cs
new file mode 100644
index 00000000..07ba9a4f
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/Context.cs
@@ -0,0 +1,282 @@
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+
+namespace ROS2
+{
+ ///
+ /// ROS Context encapsulating the non-global state of an init/shutdown cycle.
+ ///
+ ///
+ /// If the instance is not disposed it will be shut down by the garbage collector.
+ /// Since the collection tracking the nodes might be finalized at this point
+ /// the handle will be leaked.
+ ///
+ public sealed class Context : IContext
+ {
+ ///
+ /// Will be disposed on disposal of this instance.
+ /// Furthermore, access to the collection is thread safe.
+ ///
+ public IReadOnlyDictionary Nodes { get; private set; }
+
+ ///
+ public bool IsDisposed { get { return !this.Ok(); } }
+
+ ///
+ public event Action OnShutdown;
+
+ ///
+ /// Handle to the rcl_context_t
+ ///
+ internal IntPtr Handle { get; private set; } = IntPtr.Zero;
+
+ ///
+ /// Collection nodes active in this context.
+ ///
+ ///
+ /// Also used for synchronisation when creating / removing nodes.
+ ///
+ private Dictionary ROSNodes = new Dictionary();
+
+ ///
+ /// Collection of guard conditions active in this context.
+ ///
+ ///
+ /// Also used for synchronisation when creating / removing guard conditions.
+ ///
+ private HashSet GuardConditions = new HashSet();
+
+ ///
+ /// Collection of wait sets active in this context;
+ ///
+ ///
+ /// Also used for synchronisation when creating / removing guard conditions.
+ ///
+ private HashSet WaitSets = new HashSet();
+
+ ///
+ /// Get the current RMW implementation.
+ ///
+ /// The current implementation as string.
+ public static string GetRMWImplementation()
+ {
+ return Utils.PtrToString(NativeRmwInterface.rmw_native_interface_get_implementation_identifier());
+ }
+
+ ///
+ /// Create a new ROS Context.
+ ///
+ public Context()
+ {
+ this.Nodes = new MappedValueDictionary(
+ new LockedDictionary(this.ROSNodes),
+ node => node
+ );
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_context();
+ int ret = NativeRclInterface.rclcs_init(this.Handle, NativeRcl.rcutils_get_default_allocator());
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
+ }
+
+ ///
+ ///
+ /// This method is thread safe.
+ ///
+ public bool Ok()
+ {
+ return NativeRclInterface.rclcs_context_is_valid(this.Handle);
+ }
+
+ ///
+ ///
+ /// This method is thread safe.
+ ///
+ public bool TryCreateNode(string name, out INode node)
+ {
+ lock (this.ROSNodes)
+ {
+ if (this.ROSNodes.ContainsKey(name))
+ {
+ node = default(INode);
+ return false;
+ }
+ else
+ {
+ Node ROSNode = new Node(name, this);
+ this.ROSNodes.Add(name, ROSNode);
+ node = ROSNode;
+ return true;
+ }
+ }
+ }
+
+ ///
+ /// Remove a Node.
+ ///
+ ///
+ /// This method is intended to be used by and does not dispose the node.
+ /// Furthermore, it is thread safe.
+ ///
+ /// Name of the node.
+ /// If the node existed in this context and has been removed.
+ internal bool RemoveNode(string name)
+ {
+ lock (this.ROSNodes)
+ {
+ return this.ROSNodes.Remove(name);
+ }
+ }
+
+ ///
+ /// Create a guard condition.
+ ///
+ ///
+ /// This method is thread safe.
+ ///
+ /// Callback executed by the executor when the guard condition is triggered.
+ /// A new guard condition instance.
+ internal GuardCondition CreateGuardCondition(Action callback)
+ {
+ lock (this.GuardConditions)
+ {
+ GuardCondition guardCondition = new GuardCondition(this, callback);
+ this.GuardConditions.Add(guardCondition);
+ return guardCondition;
+ }
+ }
+
+ ///
+ /// Remove a guard condition.
+ ///
+ ///
+ /// This method is intended to be used by and does not dispose the guard condition.
+ /// Furthermore, it is thread safe.
+ ///
+ /// Guard condition to remove.
+ /// If the guard condition existed in this context and has been removed.
+ internal bool RemoveGuardCondition(GuardCondition guardCondition)
+ {
+ lock (this.GuardConditions)
+ {
+ return this.GuardConditions.Remove(guardCondition);
+ }
+ }
+
+ ///
+ /// Create a wait set.
+ ///
+ ///
+ /// This method is thread safe.
+ ///
+ /// A new wait set instance.
+ internal WaitSet CreateWaitSet()
+ {
+ lock (this.WaitSets)
+ {
+ WaitSet waitSet = new WaitSet(this);
+ this.WaitSets.Add(waitSet);
+ return waitSet;
+ }
+ }
+
+ ///
+ /// Remove a wait set.
+ ///
+ ///
+ /// This method is intended to be used by and does not dispose the wait set.
+ /// Furthermore, it is thread safe.
+ ///
+ /// Wait set to remove.
+ /// If the wait set existed in this context and has been removed.
+ internal bool RemoveWaitSet(WaitSet waitSet)
+ {
+ lock (this.WaitSets)
+ {
+ return this.WaitSets.Remove(waitSet);
+ }
+ }
+
+ ///
+ /// This method is not thread safe.
+ /// Do not call while the context or any entities
+ /// associated with it are in use.
+ ///
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ // finalizer not needed when we disposed successfully
+ GC.SuppressFinalize(this);
+ }
+
+ /// Disposal logic.
+ /// If this method is not called in a finalizer.
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+ int ret = NativeRcl.rcl_shutdown(this.Handle);
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_ALREADY_SHUTDOWN)
+ {
+ Utils.CheckReturnEnum(ret);
+ }
+ // only continue if the collections of the active primitives have not been finalized
+ if (disposing)
+ {
+ this.OnShutdown?.Invoke();
+ foreach (var node in this.ROSNodes.Values)
+ {
+ node.DisposeFromContext();
+ }
+ this.ROSNodes.Clear();
+ foreach (var guardCondition in this.GuardConditions)
+ {
+ guardCondition.DisposeFromContext();
+ }
+ this.GuardConditions.Clear();
+ foreach (var waitSet in this.WaitSets)
+ {
+ waitSet.DisposeFromContext();
+ }
+ this.WaitSets.Clear();
+ // only safe when all primitives are gone, not calling Dispose() will leak the Handle
+ Utils.CheckReturnEnum(NativeRcl.rcl_context_fini(this.Handle));
+ this.FreeHandles();
+ }
+ }
+
+ ///
+ /// Free the handles of this instance.
+ ///
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_context(this.Handle);
+ // to allow .IsDisposed to work
+ this.Handle = IntPtr.Zero;
+ }
+
+ ~Context()
+ {
+ this.Dispose(false);
+ }
+ }
+}
diff --git a/src/ros2cs/ros2cs_core/GuardCondition.cs b/src/ros2cs/ros2cs_core/GuardCondition.cs
new file mode 100644
index 00000000..556b92f2
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/GuardCondition.cs
@@ -0,0 +1,165 @@
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Diagnostics;
+
+namespace ROS2
+{
+ ///
+ /// Guard condition used to interrupt waits wrapping a rcl guard condition.
+ ///
+ internal sealed class GuardCondition : IWaitable, IExtendedDisposable {
+
+ ///
+ /// Handle to the rcl guard condition.
+ ///
+ public IntPtr Handle { get; private set; } = IntPtr.Zero;
+
+ ///
+ public bool IsDisposed
+ {
+ get
+ {
+ bool ok = NativeRclInterface.rclcs_guard_condition_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
+ }
+ }
+
+ ///
+ /// Context associated with this instance.
+ ///
+ private readonly Context Context;
+
+ ///
+ /// Callback invoked when the guard condition
+ /// is processed.
+ ///
+ private readonly Action Callback;
+
+ ///
+ /// Create a new instance.
+ ///
+ /// Context to associate with.
+ /// Callback to invoke when processed.
+ /// If is disposed.
+ internal GuardCondition(Context context, Action callback)
+ {
+ this.Context = context;
+ this.Callback = callback;
+ int ret = NativeRclInterface.rclcs_get_guard_condition(
+ context.Handle,
+ out IntPtr handle
+ );
+ if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_INVALID_ARGUMENT)
+ {
+ throw new ObjectDisposedException("rcl context");
+ }
+ Utils.CheckReturnEnum(ret);
+ this.Handle = handle;
+ }
+
+ ///
+ /// Trigger the guard condition to make it become ready.
+ ///
+ ///
+ /// It seems that the guard condition stays ready until waited on.
+ /// This method is thread safe.
+ ///
+ /// If the guard condition was disposed.
+ public void Trigger()
+ {
+ int ret = NativeRcl.rcl_trigger_guard_condition(this.Handle);
+ GC.KeepAlive(this);
+
+ if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_INVALID_ARGUMENT)
+ {
+ throw new ObjectDisposedException("rcl guard condition");
+ }
+ Utils.CheckReturnEnum(ret);
+ }
+
+ ///
+ /// This method is thread safe
+ /// is the callback is thread safe.
+ ///
+ ///
+ public bool TryProcess()
+ {
+ this.Callback();
+ return true;
+ }
+
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the guard condition is in use.
+ /// Disposal is automatically performed on finalization by the GC.
+ ///
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ /// Disposal logic.
+ /// If this method is not called in a finalizer
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ // only do if Context.GuardConditions has not been finalized
+ if (disposing)
+ {
+ bool success = this.Context.RemoveGuardCondition(this);
+ Debug.Assert(success, message: "failed to remove guard condition");
+ }
+
+ this.DisposeFromContext();
+ }
+
+ /// Dispose without modifying the context.
+ internal void DisposeFromContext()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ Utils.CheckReturnEnum(NativeRcl.rcl_guard_condition_fini(this.Handle));
+ this.FreeHandles();
+ }
+
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_guard_condition(this.Handle);
+ this.Handle = IntPtr.Zero;
+ }
+
+ ~GuardCondition()
+ {
+ this.Dispose(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ros2cs/ros2cs_core/Node.cs b/src/ros2cs/ros2cs_core/Node.cs
index 6911c2d1..40bbfca4 100644
--- a/src/ros2cs/ros2cs_core/Node.cs
+++ b/src/ros2cs/ros2cs_core/Node.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,271 +13,373 @@
// limitations under the License.
using System;
-using System.Linq;
using System.Collections.Generic;
+using System.Diagnostics;
namespace ROS2
{
- /// Represents a managed ros2 (rcl) node
- ///
- public class Node: INode
- {
- public string Name { get { return name; } }
- private string name;
- private Ros2csLogger logger = Ros2csLogger.GetInstance();
-
- internal List Subscriptions
+ ///
+ /// Node wrapping a rcl node.
+ ///
+ ///
+ /// This is the implementation produced by ,
+ /// use this method to create instances.
+ ///
+ ///
+ ///
+ public sealed class Node : INode
{
- get
- {
- lock (mutex)
- {
- return subscriptions.ToList();
- }
- }
- }
+ ///
+ public string Name { get; private set; }
- internal List Clients
- {
- get
- {
- lock (mutex)
+ ///
+ public IContext Context { get { return this.ROSContext; } }
+
+ ///
+ /// Users have to guarantee that a node is associated with at most one executor at any given time
+ /// to prevent undefined behaviour when multithreading is used.
+ /// It is recommended to not set this property directly and leave this task to the executor.
+ /// Setting this property is thread safe.
+ ///
+ ///
+ public IExecutor Executor
{
- return clients.ToList();
+ get { return this._Executor; }
+ set
+ {
+ // prevent a executor switch while
+ // a primitive is being removed
+ lock (this.Lock)
+ {
+ this._Executor = value;
+ }
+ }
}
- }
- }
- internal List Services
- {
- get
- {
- lock (mutex)
+ private IExecutor _Executor = null;
+
+ ///
+ public bool IsDisposed
{
- return services.ToList();
+ get { return !NativeRclInterface.rclcs_node_is_valid(this.Handle); }
}
- }
- }
- internal rcl_node_t nodeHandle;
- private IntPtr defaultNodeOptions;
- private HashSet subscriptions;
- private HashSet publishers;
- private HashSet clients;
- private HashSet services;
- private readonly object mutex = new object();
- private bool disposed = false;
-
- public bool IsDisposed { get { return disposed; } }
-
- /// Node constructor
- /// Nodes are created through CreateNode method of Ros2cs class
- /// unique, non-namespaced node name
- /// (rcl) context for the node. Global context is passed to this method
- internal Node(string nodeName, ref rcl_context_t context)
- {
- name = nodeName;
- string nodeNamespace = "/";
- subscriptions = new HashSet();
- publishers = new HashSet();
- clients = new HashSet();
- services = new HashSet();
-
- nodeHandle = NativeRcl.rcl_get_zero_initialized_node();
- defaultNodeOptions = NativeRclInterface.rclcs_node_create_default_options();
- Utils.CheckReturnEnum(NativeRcl.rcl_node_init(ref nodeHandle, nodeName, nodeNamespace, ref context, defaultNodeOptions));
- logger.LogInfo("Node initialized");
- }
+ ///
+ /// Handle to the rcl node
+ ///
+ internal IntPtr Handle { get; private set; } = IntPtr.Zero;
- /// Finalizer supporting IDisposable model
- ~Node()
- {
- DestroyNode();
- }
+ ///
+ /// Handle to the rcl node options
+ ///
+ private IntPtr Options = IntPtr.Zero;
- /// Release managed and native resources. IDisposable implementation
- public void Dispose()
- {
- DestroyNode();
- }
+ ///
+ /// Context associated with this instance.
+ ///
+ private readonly Context ROSContext;
- /// "Destructor" supporting IDisposable model
- /// Disposes all subscriptions and publishers and clients before finilizing node
- internal void DestroyNode()
- {
- lock (mutex)
- {
- if (!disposed)
+ ///
+ /// Lock used to allow thread safe access to node primitives.
+ ///
+ private readonly object Lock = new object();
+
+ ///
+ /// This collection is thread safe.
+ ///
+ ///
+ public IReadOnlyCollection Publishers { get; private set; }
+
+ private readonly HashSet CurrentPublishers = new HashSet();
+
+ ///
+ /// This collection is thread safe.
+ ///
+ ///
+ public IReadOnlyCollection Subscriptions { get; private set; }
+
+ private readonly HashSet CurrentSubscriptions = new HashSet();
+
+ ///
+ /// This collection is thread safe.
+ ///
+ ///
+ public IReadOnlyCollection Services { get; private set; }
+
+ private readonly HashSet CurrentServices = new HashSet();
+
+ ///
+ /// This collection is thread safe.
+ ///
+ ///
+ public IReadOnlyCollection Clients { get; private set; }
+
+ private readonly HashSet CurrentClients = new HashSet();
+
+ ///
+ /// Create a new instance.
+ ///
+ ///
+ /// The caller is responsible for adding the instance to .
+ /// This action is not thread safe.
+ ///
+ /// Name of the node.
+ /// Context to associate with.
+ /// If is disposed.
+ internal Node(string name, Context context)
{
- foreach(ISubscriptionBase subscription in subscriptions)
- {
- subscription.Dispose();
- }
- subscriptions.Clear();
-
- foreach(IPublisherBase publisher in publishers)
- {
- publisher.Dispose();
- }
- publishers.Clear();
-
- foreach(IClientBase client in clients)
- {
- client.Dispose();
- }
- clients.Clear();
-
- foreach(IServiceBase service in services)
- {
- service.Dispose();
- }
- services.Clear();
-
- Utils.CheckReturnEnum(NativeRcl.rcl_node_fini(ref nodeHandle));
- NativeRclInterface.rclcs_node_dispose_options(defaultNodeOptions);
- disposed = true;
- logger.LogInfo("Node " + name + " destroyed");
+ this.Name = name;
+ this.ROSContext = context;
+ this.Publishers = new LockedCollection(this.CurrentPublishers, this.Lock);
+ this.Subscriptions = new LockedCollection(this.CurrentSubscriptions, this.Lock);
+ this.Services = new LockedCollection(this.CurrentServices, this.Lock);
+ this.Clients = new LockedCollection(this.CurrentClients, this.Lock);
+
+ this.Options = NativeRclInterface.rclcs_node_create_default_options();
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_node();
+ int ret = NativeRcl.rcl_node_init(
+ this.Handle,
+ this.Name,
+ "/",
+ this.ROSContext.Handle,
+ this.Options
+
+ );
+ switch ((RCLReturnEnum)ret)
+ {
+ case RCLReturnEnum.RCL_RET_OK:
+ break;
+ // does not return RCL_RET_NOT_INIT if the context is NULL
+ case RCLReturnEnum.RCL_RET_INVALID_ARGUMENT:
+ this.FreeHandles();
+ throw new ObjectDisposedException("RCL Context");
+ default:
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ break;
+ }
}
- }
- }
- /// Create a client for this node for a given topic, qos and message type
- ///
- public Client CreateClient(string topic, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new()
- {
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ /// This method is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ ///
+ public IPublisher CreatePublisher(string topic, QualityOfServiceProfile qos = null) where T : Message, new()
{
- logger.LogWarning("Cannot create client as the class is already disposed or shutdown was called");
- return null;
+ lock (this.Lock)
+ {
+ Publisher publisher = new Publisher(topic, this, qos);
+ bool success = this.CurrentPublishers.Add(publisher);
+ Debug.Assert(success, "publisher already exists");
+ return publisher;
+ }
}
- Client client = new Client(topic, this, qos);
- clients.Add(client);
- logger.LogInfo("Created Client for topic " + topic);
- return client;
- }
- }
- /// Remove a client
- ///
- public bool RemoveClient(IClientBase client)
- {
- lock (mutex)
- {
- if (clients.Contains(client))
+ ///
+ /// Remove a publisher.
+ ///
+ ///
+ /// This method is intended to be used by and does not dispose the publisher.
+ /// Furthermore, it is thread safe.
+ ///
+ /// Publisher to be removed.
+ /// If the publisher existed on this node and has been removed.
+ internal bool RemovePublisher(IRawPublisher publisher)
{
- logger.LogInfo("Removing client for topic " + client.Topic);
- client.Dispose();
- return clients.Remove(client);
+ lock (this.Lock)
+ {
+ return this.CurrentPublishers.Remove(publisher);
+ }
}
- return false;
- }
- }
- /// Create a service for this node for a given topic, callback, qos and message type
- ///
- public Service CreateService(string topic, Func callback, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new()
- {
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ /// This method schedules a rescan on the current executor and is thread safe
+ /// if of the current executor is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ ///
+ public ISubscription CreateSubscription(string topic, Action callback, QualityOfServiceProfile qos = null) where T : Message, new()
{
- logger.LogWarning("Cannot create service as the class is already disposed or shutdown was called");
- return null;
+ Subscription subscription;
+ lock (this.Lock)
+ {
+ subscription = new Subscription(topic, this, callback, qos);
+ bool success = this.CurrentSubscriptions.Add(subscription);
+ Debug.Assert(success, "subscription already exists");
+ }
+ this.Executor?.TryScheduleRescan(this);
+ return subscription;
}
- Service service = new Service(topic, this, callback, qos);
- services.Add(service);
- logger.LogInfo("Created service for topic " + topic);
- return service;
- }
- }
+ ///
+ internal bool RemoveSubscription(IRawSubscription primitive)
+ {
+ return this.RemovePrimitive(primitive, this.CurrentSubscriptions);
+ }
- /// Remove a service
- ///
- public bool RemoveService(IServiceBase service)
- {
- lock (mutex)
- {
- if (services.Contains(service))
+ ///
+ /// This method schedules a rescan on the current executor and is thread safe
+ /// if of the current executor is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ ///
+ public IClient CreateClient(string topic, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new()
{
- logger.LogInfo("Removing service for topic " + service.Topic);
- service.Dispose();
- return services.Remove(service);
+ Client client;
+ lock (this.Lock)
+ {
+ client = new Client(topic, this, qos);
+ bool success = this.CurrentClients.Add(client);
+ Debug.Assert(success, "client already exists");
+ }
+ this.Executor?.TryScheduleRescan(this);
+ return client;
}
- return false;
- }
- }
- /// Create a publisher for this node for a given topic, qos and message type
- ///
- public Publisher CreatePublisher(string topic, QualityOfServiceProfile qos = null) where T : Message, new()
- {
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ internal bool RemoveClient(IRawClient primitive)
{
- logger.LogWarning("Cannot create publisher as the class is already disposed or shutdown was called");
- return null;
+ return this.RemovePrimitive(primitive, this.CurrentClients);
}
- Publisher publisher = new Publisher(topic, this, qos);
- publishers.Add(publisher);
- logger.LogInfo("Created Publisher for topic " + topic);
- return publisher;
- }
- }
+ ///
+ /// This method schedules a rescan on the current executor and is thread safe
+ /// if of the current executor is thread safe.
+ ///
+ /// If the instance was disposed.
+ ///
+ ///
+ public IService CreateService(string topic, Func callback, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new()
+ {
+ Service service;
+ lock (this.Lock)
+ {
+ service = new Service(topic, this, callback, qos);
+ bool success = this.CurrentServices.Add(service);
+ Debug.Assert(success, "service already exists");
+ }
+ this.Executor?.TryScheduleRescan(this);
+ return service;
+ }
- /// Create a subscription for this node for a given topic, callback, qos and message type
- ///
- public Subscription CreateSubscription(string topic, Action callback, QualityOfServiceProfile qos = null) where T : Message, new()
- {
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ internal bool RemoveService(IRawService primitive)
{
- logger.LogWarning("Cannot create subscription as the class is already disposed or shutdown was called");
- return null;
+ return this.RemovePrimitive(primitive, this.CurrentServices);
}
- Subscription subscription = new Subscription(topic, this, callback, qos);
- subscriptions.Add(subscription);
- logger.LogInfo("Created subscription for topic " + topic);
- return subscription;
- }
- }
+ ///
+ /// Remove a primitive and wait for it to be disposable.
+ ///
+ ///
+ /// This method is intended to be used by
+ /// of the primitive and does not dispose it.
+ /// Furthermore, it is thread safe if and
+ /// of the current executor are thread safe.
+ ///
+ /// Type of the primitive.
+ /// Primitive to remove.
+ /// Collection of the current primitives.
+ /// Whether the primitive existed.
+ private bool RemovePrimitive(T primitive, ICollection collection)
+ {
+ bool removed;
+ IExecutor currentExecutor;
+ lock (this.Lock)
+ {
+ removed = collection.Remove(primitive);
+ // use the executor in use when removing the primitive,
+ // new executors will only see the updated collection
+ currentExecutor = this.Executor;
+ }
+ if (removed && !(currentExecutor is null))
+ {
+ currentExecutor.TryScheduleRescan(this);
+ currentExecutor.Wait();
+ }
+ return removed;
+ }
- /// Remove a publisher
- ///
- public bool RemovePublisher(IPublisherBase publisher)
- {
- lock (mutex)
- {
- if (publishers.Contains(publisher))
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the node or any of its primitives are in use.
+ /// Furthermore, it is NOT performed on finalization by the GC.
+ ///
+ ///
+ public void Dispose()
{
- logger.LogInfo("Removing publisher for topic " + publisher.Topic);
- publisher.Dispose();
- return publishers.Remove(publisher);
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ bool success = this.ROSContext.RemoveNode(this.Name);
+ Debug.Assert(success, "failed to remove node");
+
+ // no finalizer since the hash sets may have been finalized
+ this.DisposeFromContext();
}
- return false;
- }
- }
- /// Remove a subscription
- ///
- public bool RemoveSubscription(ISubscriptionBase subscription)
- {
- lock (mutex)
- {
- if (subscriptions.Contains(subscription))
+ ///
+ /// Dispose this node without modifying the context.
+ ///
+ internal void DisposeFromContext()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ if (!(this.Executor is null))
+ {
+ bool success = this.Executor.Remove(this);
+ Debug.Assert(success, "node was not added to its old executor");
+ }
+
+ foreach (IRawPublisher publisher in this.CurrentPublishers)
+ {
+ publisher.DisposeFromNode();
+ }
+ this.CurrentPublishers.Clear();
+
+ foreach (IRawSubscription subscription in this.CurrentSubscriptions)
+ {
+ subscription.DisposeFromNode();
+ }
+ this.CurrentSubscriptions.Clear();
+
+ foreach (IRawService service in this.CurrentServices)
+ {
+ service.DisposeFromNode();
+ }
+ this.CurrentServices.Clear();
+
+ foreach (IRawClient client in this.CurrentClients)
+ {
+ client.DisposeFromNode();
+ }
+ this.CurrentClients.Clear();
+
+ Utils.CheckReturnEnum(NativeRcl.rcl_node_fini(this.Handle));
+ this.FreeHandles();
+ }
+
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
{
- logger.LogInfo("Removing subscription for topic " + subscription.Topic);
- subscription.Dispose();
- return subscriptions.Remove(subscription);
+ NativeRclInterface.rclcs_free_node(this.Handle);
+ this.Handle = IntPtr.Zero;
+ NativeRclInterface.rclcs_node_dispose_options(this.Options);
+ this.Options = IntPtr.Zero;
}
- return false;
- }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/Publisher.cs b/src/ros2cs/ros2cs_core/Publisher.cs
index cfe9cb68..6b834c61 100644
--- a/src/ros2cs/ros2cs_core/Publisher.cs
+++ b/src/ros2cs/ros2cs_core/Publisher.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -18,79 +18,163 @@
namespace ROS2
{
- /// Publisher of a topic with a given type
- /// Publishers are created through INode.CreatePublisher
- public class Publisher: IPublisher where T : Message, new ()
- {
- public string Topic { get { return topic; } }
- private string topic;
-
- private Ros2csLogger logger = Ros2csLogger.GetInstance();
- rcl_publisher_t publisherHandle;
- IntPtr publisherOptions = IntPtr.Zero;
- rcl_node_t nodeHandle;
- private bool disposed = false;
-
- public bool IsDisposed { get { return disposed; } }
-
- /// Internal constructor for Publsher. Use INode.CreatePublisher to construct
- ///
- public Publisher(string pubTopic, Node node, QualityOfServiceProfile qos = null)
+ ///
+ /// Publisher of a topic with a given type wrapping a rcl publisher.
+ ///
+ ///
+ /// This is the implementation produced by ,
+ /// use this method to create new instances.
+ ///
+ ///
+ ///
+ public sealed class Publisher : IPublisher, IRawPublisher where T : Message, new()
{
- topic = pubTopic;
- nodeHandle = node.nodeHandle;
+ ///
+ public string Topic { get; private set; }
- QualityOfServiceProfile qualityOfServiceProfile = qos;
- if (qualityOfServiceProfile == null)
- qualityOfServiceProfile = new QualityOfServiceProfile();
+ ///
+ public bool IsDisposed
+ {
+ get
+ {
+ bool ok = NativeRclInterface.rclcs_publisher_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
+ }
+ }
- publisherOptions = NativeRclInterface.rclcs_publisher_create_options(qualityOfServiceProfile.handle);
+ ///
+ /// Handle to the rcl publisher
+ ///
+ private IntPtr Handle = IntPtr.Zero;
- IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
+ ///
+ /// Handle to the rcl publisher options
+ ///
+ private IntPtr Options = IntPtr.Zero;
- publisherHandle = NativeRcl.rcl_get_zero_initialized_publisher();
- Utils.CheckReturnEnum(NativeRcl.rcl_publisher_init(
- ref publisherHandle,
- ref nodeHandle,
- typeSupportHandle,
- topic,
- publisherOptions));
- }
+ ///
+ /// Node associated with this instance.
+ ///
+ private readonly Node Node;
- ~Publisher()
- {
- Dispose();
- }
+ ///
+ /// Create a new instance.
+ ///
+ ///
+ /// The caller is responsible for adding the instance to .
+ /// This action is not thread safe.
+ ///
+ /// Topic to publish to.
+ /// Node to associate with.
+ /// QOS setting for this publisher.
+ /// If was disposed.
+ internal Publisher(string topic, Node node, QualityOfServiceProfile qos = null)
+ {
+ this.Topic = topic;
+ this.Node = node;
- public void Dispose()
- {
- DestroyPublisher();
- }
+ QualityOfServiceProfile qualityOfServiceProfile = qos ?? new QualityOfServiceProfile();
- /// "Destructor" supporting disposable model
- private void DestroyPublisher()
- {
- if (!disposed)
- {
- Utils.CheckReturnEnum(NativeRcl.rcl_publisher_fini(ref publisherHandle, ref nodeHandle));
- NativeRclInterface.rclcs_publisher_dispose_options(publisherOptions);
- logger.LogInfo("Publisher destroyed");
- disposed = true;
- }
- }
+ this.Options = NativeRclInterface.rclcs_publisher_create_options(qualityOfServiceProfile.handle);
- /// Publish a message
- ///
- public void Publish(T msg)
- {
- if (!Ros2cs.Ok() || disposed)
- {
- logger.LogWarning("Cannot publish as the class is already disposed or shutdown was called");
- return;
- }
- MessageInternals msgInternals = msg as MessageInternals;
- msgInternals.WriteNativeMessage();
- Utils.CheckReturnEnum(NativeRcl.rcl_publish(ref publisherHandle, msgInternals.Handle, IntPtr.Zero));
+ IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
+
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_publisher();
+ int ret = NativeRcl.rcl_publisher_init(
+ this.Handle,
+ this.Node.Handle,
+ typeSupportHandle,
+ this.Topic,
+ this.Options
+ );
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
+ }
+
+
+ ///
+ /// Message memory is copied into native structures and
+ /// the message can be safely changed or disposed after this call.
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously.
+ ///
+ /// If the instance was disposed.
+ ///
+ public void Publish(T msg)
+ {
+ MessageInternals msgInternals = msg as MessageInternals;
+ // may not be thread safe
+ msgInternals.WriteNativeMessage();
+ // confused by the rcl documentation, assume it is not thread safe
+ Utils.CheckReturnEnum(NativeRcl.rcl_publish(this.Handle, msgInternals.Handle, IntPtr.Zero));
+ GC.KeepAlive(this);
+ }
+
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the publisher is in use.
+ /// Disposal is automatically performed on finalization by the GC.
+ ///
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ // finalizer not needed when we disposed successfully
+ GC.SuppressFinalize(this);
+ }
+
+ /// Disposal logic.
+ /// If this method is not called in a finalizer.
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ // only do if Node.CurrentPublishers has not been finalized
+ if (disposing)
+ {
+ bool success = this.Node.RemovePublisher(this);
+ Debug.Assert(success, "failed to remove publisher");
+ }
+
+ (this as IRawPublisher).DisposeFromNode();
+ }
+
+ ///
+ void IRawPublisher.DisposeFromNode()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ Utils.CheckReturnEnum(NativeRcl.rcl_publisher_fini(this.Handle, this.Node.Handle));
+ this.FreeHandles();
+ }
+
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_publisher(this.Handle);
+ this.Handle = IntPtr.Zero;
+ NativeRclInterface.rclcs_publisher_dispose_options(this.Options);
+ this.Options = IntPtr.Zero;
+ }
+
+ ~Publisher()
+ {
+ this.Dispose(false);
+ }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/Ros2cs.cs b/src/ros2cs/ros2cs_core/Ros2cs.cs
deleted file mode 100644
index b7303178..00000000
--- a/src/ros2cs/ros2cs_core/Ros2cs.cs
+++ /dev/null
@@ -1,277 +0,0 @@
-// Copyright 2019-2021 Robotec.ai
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-using System;
-using System.Linq;
-using System.Diagnostics;
-using System.Collections.Generic;
-using System.Threading;
-
-namespace ROS2
-{
- /// Primary ros2 C# static class
- /// This class interfaces with rcl library to handle initalization, shutdown,
- /// creation and removal of nodes as well as spinning (no executors are implemented).
- /// Note that the interface is through rcl and not rclcpp, the primary reason is that marshalling
- /// into generic interface api is not feasible, especially when we don't know all possible instantiations
- /// (as it is the case with custom generated messages).
- ///
- public static class Ros2cs
- {
- private static readonly Destructor destructor = new Destructor();
- private static readonly object mutex = new object();
- private static bool initialized = false; // for most part equivalent to rcl::ok()
- private static rcl_context_t global_context; // a simplification, we only use global default context
- private static rcl_allocator_t default_allocator;
- private static List nodes = new List(); // kept to shutdown everything in order
-
- private static WaitSet WaitSet;
-
- /// Globally initialize ros2 (rcl)
- /// Note that only a single context is used.
- /// If needed, support for multiple contexts can be added
- /// in a rather straightforward way throughout api.
- public static void Init()
- {
- lock (mutex)
- {
- if (initialized)
- {
- return;
- }
-
- default_allocator = NativeRcl.rcutils_get_default_allocator();
- global_context = NativeRcl.rcl_get_zero_initialized_context();
- Utils.CheckReturnEnum(NativeRclInterface.rclcs_init(ref global_context, default_allocator));
- WaitSet = new WaitSet(ref global_context);
- initialized = true;
- }
- }
-
- public static string GetRMWImplementation()
- {
- return Utils.PtrToString(NativeRmwInterface.rmw_native_interface_get_implementation_identifier());
- }
-
- /// Globally shutdown ros2 (rcl)
- /// Can be called multiple times with no effects after the first one.
- /// Shutdowns ros2 and disposes all the nodes. Ok() function will return false after Shutdown is called.
- ///
- public static void Shutdown()
- {
- lock (mutex)
- {
- if (!initialized)
- {
- return;
- }
- initialized = false;
-
- Ros2csLogger.GetInstance().LogInfo("Ros2cs shutdown");
- Utils.CheckReturnEnum(NativeRcl.rcl_shutdown(ref global_context));
-
- foreach (var node in nodes)
- {
- node.Dispose();
- }
- nodes.Clear();
- }
- }
-
- /// Whether ros2 C# is initialized
- ///
- /// Only when this function returns true a node can be created and spinning works
- ///
- public static bool Ok()
- {
- return initialized && NativeRcl.rcl_context_is_valid(ref global_context);
- }
-
- /// Helper class to handle Ros2cs finalization
- /// Could be understood as Ros2cs destructor. Can be called from GC if Shutdown
- /// was not called explicitly. Also, handles context finalization.
- private sealed class Destructor
- {
- ~Destructor()
- {
- Ros2csLogger.GetInstance().LogInfo("Ros2cs destructor called");
- Ros2cs.Shutdown();
- NativeRcl.rcl_context_fini(ref global_context);
- }
- }
-
- /// Create a ros2 (rcl) node
- /// Creates a node in the global context and adds it to an internal collection.
- /// Checks for name uniqueness. Throws if name is not unique or Ok() is not true.
- /// Note that node options are not exposed. Default node options are used.
- /// This can be extended by exposing desired configurations and adding a library call to set
- /// them in the native code.
- /// A valid node name, which will be first checked for uniqueness,
- /// then validated inside rcl according to naming rules (will throw exception if invalid).
- /// INode interface, which can be used to create subs and pubs
- public static INode CreateNode(string nodeName)
- {
- lock (mutex)
- {
- if (!Ok())
- {
- Ros2csLogger.GetInstance().LogError("Ros2cs is not initialized, cannot create node");
- throw new NotInitializedException();
- }
-
- foreach (var node in nodes)
- {
- if (node.Name == nodeName)
- {
- throw new InvalidOperationException("Node with name " + nodeName + " already exists, cannot create");
- }
- }
-
- var new_node = new Node(nodeName, ref global_context);
- nodes.Add(new_node);
- return new_node;
- }
- }
-
- /// Remove and dispose ros2 (rcl) node
- /// You can call Shutdown to dispose all the nodes, this is only needed when
- /// node needs to be removed while others are still meant to be running
- /// a node to remove as returned by previous CreateNode call
- /// Whether the node was in the internal collection, which should always be true
- /// unless this is called more than once for a node (which is ok). Return value can be
- /// safely ignored
- public static bool RemoveNode(INode node)
- {
- lock (mutex)
- {
- if (!initialized)
- {
- return false; // removal is handled with shutdown already
- }
- node.Dispose();
- return nodes.Remove(node);
- }
- }
-
- /// Spin on a single node
- /// Spin should be called in a dedicate spinning thread in your
- /// application layer since it runs in a blocking infinite loop. Will return when some work is
- /// executed (a callback for each subscription that received a message) or after a timeout.
- /// Note that you don't need to spin if you are only publishing (like in ros2)
- /// Only subscriptions are executed currently, no timers or other executables
- /// A node to spin on
- /// Maximum time to wait for execution item (e. g. subscription)
- public static void Spin(INode node, double timeoutSec = 0.1)
- {
- var nodes = new List{ node };
- Spin(nodes, timeoutSec);
- }
-
- /// Spin overload for multiple nodes
- /// This overload saves on implicit List creation
- ///
- public static void Spin(List nodes, double timeoutSec = 0.1)
- {
- while (initialized)
- {
- if (!SpinOnce(nodes, timeoutSec))
- {
- Thread.Sleep(TimeSpan.FromSeconds(timeoutSec));
- }
- }
- }
-
- /// Spin only once
- /// This overload is meant for when the while loop is better to
- /// handle in the application layer
- /// Whether the spin was successful (wait set not empty or Ros2cs not initialized)
- ///
- public static bool SpinOnce(INode node, double timeoutSec = 0.1)
- {
- var nodes = new List{ node };
- return SpinOnce(nodes, timeoutSec);
- }
-
- private static bool warned_once = false;
-
- /// SpinOnce overload for multiple nodes
- /// This overload saves on implicit List creation
- /// Whether the spin was successful (wait set not empty or Ros2cs not initialized)
- ///
- public static bool SpinOnce(List nodes, double timeoutSec = 0.1)
- {
- lock (mutex)
- { // Figure out how to minimize this lock
- if (!initialized)
- {
- return false;
- }
-
- // TODO - This can be optimized so that we cache the list and invalidate only with changes
- var allSubscriptions = new List();
- var allClients = new List();
- var allServices = new List();
- foreach (INode node_interface in nodes)
- {
- Node node = node_interface as Node;
- if (node == null)
- continue; //Rare situation in which we are disposing
-
- allSubscriptions.AddRange(node.Subscriptions.Where(s => s != null));
- allClients.AddRange(node.Clients.Where(c => c != null));
- allServices.AddRange(node.Services.Where(s => s != null));
- }
-
- // TODO - investigate performance impact
- WaitSet.Resize(
- (ulong)allSubscriptions.Count,
- (ulong)allClients.Count,
- (ulong)allServices.Count
- );
- foreach(var subscription in allSubscriptions)
- {
- AddResult result = WaitSet.TryAddSubscription(subscription, out ulong _);
- Debug.Assert(result != AddResult.FULL, "no space for subscription in WaitSet");
- }
- foreach(var client in allClients)
- {
- AddResult result = WaitSet.TryAddClient(client, out ulong _);
- Debug.Assert(result != AddResult.FULL, "no space for client in WaitSet");
- }
- foreach(var service in allServices)
- {
- AddResult result = WaitSet.TryAddService(service, out ulong _);
- Debug.Assert(result != AddResult.FULL, "no space for Service in WaitSet");
- }
- bool success;
- try
- {
- success = WaitSet.Wait(TimeSpan.FromSeconds(timeoutSec));
- }
- catch (WaitSetEmptyException)
- {
- return false;
- }
- if (success)
- {
- // Sequential processing
- allSubscriptions.ForEach(subscription => subscription.TakeMessage());
- allClients.ForEach(client => client.TakeMessage());
- allServices.ForEach(service => service.TakeMessage());
- }
- return true;
- }
- }
- }
-}
diff --git a/src/ros2cs/ros2cs_core/Service.cs b/src/ros2cs/ros2cs_core/Service.cs
index 73b84a6d..1783f0b6 100644
--- a/src/ros2cs/ros2cs_core/Service.cs
+++ b/src/ros2cs/ros2cs_core/Service.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,150 +13,219 @@
// limitations under the License.
using System;
+using System.Diagnostics;
using ROS2.Internal;
namespace ROS2
{
- /// Service with a topic and Types for Messages
- /// Instances are created by
- /// Message Type to be received
- /// Message Type to be send
- public class Service: IService
- where I : Message, new ()
- where O : Message, new ()
- {
- public rcl_service_t Handle { get { return serviceHandle; } }
- private rcl_service_t serviceHandle;
-
- ///
- /// Topic of this Service
- ///
- public string Topic { get { return topic; } }
- private string topic;
-
- ///
- public bool IsDisposed { get { return disposed; } }
- private bool disposed = false;
-
- ///
- private rcl_node_t nodeHandle;
-
///
- /// Callback to be called to process incoming requests
+ /// Service with a topic and types for Messages wrapping a rcl service.
///
- private readonly Func callback;
- private IntPtr serviceOptions;
+ ///
+ /// This is the implementation produced by ,
+ /// use this method to create new instances.
+ ///
+ ///
+ ///
+ public sealed class Service : IService, IRawService
+ where I : Message, new()
+ where O : Message, new()
+ {
+ ///
+ public string Topic { get; private set; }
- ///
- public object Mutex { get { return mutex; } }
- private object mutex = new object();
+ ///
+ public bool IsDisposed
+ {
+ get
+ {
+ bool ok = NativeRclInterface.rclcs_service_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
+ }
+ }
- ///
- /// Internal constructor for Service
- ///
- /// Use to construct new Instances
- internal Service(string subTopic, Node node, Func cb, QualityOfServiceProfile qos = null)
- {
- callback = cb;
- nodeHandle = node.nodeHandle;
- topic = subTopic;
- serviceHandle = NativeRcl.rcl_get_zero_initialized_service();
-
- QualityOfServiceProfile qualityOfServiceProfile = qos;
- if (qualityOfServiceProfile == null)
- {
- qualityOfServiceProfile = new QualityOfServiceProfile(QosPresetProfile.SERVICES_DEFAULT);
- }
-
- serviceOptions = NativeRclInterface.rclcs_service_create_options(qualityOfServiceProfile.handle);
-
- I msg = new I();
- MessageInternals msgInternals = msg as MessageInternals;
- IntPtr typeSupportHandle = msgInternals.TypeSupportHandle;
- msg.Dispose();
-
- Utils.CheckReturnEnum(NativeRcl.rcl_service_init(
- ref serviceHandle,
- ref node.nodeHandle,
- typeSupportHandle,
- topic,
- serviceOptions));
- }
+ ///
+ /// Handle to the rcl service
+ ///
+ public IntPtr Handle { get; private set; } = IntPtr.Zero;
+
+ ///
+ /// Handle to the rcl service options
+ ///
+ private IntPtr Options = IntPtr.Zero;
+
+ ///
+ /// Node associated with this instance.
+ ///
+ private readonly Node Node;
+
+ ///
+ /// Callback to be called to process incoming requests.
+ ///
+ private readonly Func Callback;
+
+ ///
+ /// Create a new instance.
+ ///
+ ///
+ /// The caller is responsible for adding the instance to .
+ /// This action is not thread safe.
+ ///
+ /// Topic to receive requests from.
+ /// Node to associate with.
+ /// Callback to be called to process incoming requests.
+ /// QOS setting for this subscription.
+ /// If was disposed.
+ internal Service(string topic, Node node, Func callback, QualityOfServiceProfile qos = null)
+ {
+ this.Topic = topic;
+ this.Node = node;
+ this.Callback = callback;
+
+ QualityOfServiceProfile qualityOfServiceProfile = qos ?? new QualityOfServiceProfile(QosPresetProfile.SERVICES_DEFAULT);
+
+ this.Options = NativeRclInterface.rclcs_service_create_options(qualityOfServiceProfile.handle);
+
+ IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
+
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_service();
+ int ret = NativeRcl.rcl_service_init(
+ this.Handle,
+ this.Node.Handle,
+ typeSupportHandle,
+ this.Topic,
+ this.Options
+ );
+
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
+ }
- ///
- /// Send Response Message with rcl/rmw layers
- ///
- /// request id received when taking the Request
- /// Message to be send
- private void SendResp(rcl_rmw_request_id_t header, O msg)
- {
- RCLReturnEnum ret;
- MessageInternals msgInternals = msg as MessageInternals;
- msgInternals.WriteNativeMessage();
- ret = (RCLReturnEnum)NativeRcl.rcl_send_response(ref serviceHandle, ref header, msgInternals.Handle);
- }
+ ///
+ /// This method is not thread safe.
+ ///
+ ///
+ public bool TryProcess()
+ {
+ rcl_rmw_request_id_t header = default(rcl_rmw_request_id_t);
+ I message = new I();
+ int ret = NativeRcl.rcl_take_request(
+ this.Handle,
+ ref header,
+ (message as MessageInternals).Handle
+ );
+ GC.KeepAlive(this);
+
+ switch ((RCLReturnEnum)ret)
+ {
+ case RCLReturnEnum.RCL_RET_SERIVCE_TAKE_FAILD:
+ case RCLReturnEnum.RCL_RET_SERVICE_INVALID:
+ return false;
+ default:
+ Utils.CheckReturnEnum(ret);
+ break;
+ }
+
+ Utils.CheckReturnEnum(ret);
+ this.ProcessRequest(header, message);
+ return true;
+ }
- ///
- // TODO(adamdbrw) this should not be public - add an internal interface
- public void TakeMessage()
- {
- RCLReturnEnum ret;
- rcl_rmw_request_id_t header = default(rcl_rmw_request_id_t);
- MessageInternals message;
+ ///
+ /// Populates managed fields with native values and calls the callback with the created message
+ ///
+ /// Sending the Response is also takes care of by this method
+ /// Message that will be populated and provided to the callback
+ /// request id received when taking the Request
+ private void ProcessRequest(rcl_rmw_request_id_t header, I message)
+ {
+ (message as MessageInternals).ReadNativeMessage();
+ this.SendResp(header, this.Callback(message));
+ }
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ /// Send Response Message with rcl/rmw layers
+ ///
+ /// request id received when taking the Request
+ /// Message to be send
+ private void SendResp(rcl_rmw_request_id_t header, O msg)
{
- return;
+ MessageInternals msgInternals = msg as MessageInternals;
+ msgInternals.WriteNativeMessage();
+ Utils.CheckReturnEnum(NativeRcl.rcl_send_response(
+ this.Handle,
+ ref header,
+ msgInternals.Handle
+ ));
+ GC.KeepAlive(this);
}
- message = new I() as MessageInternals;
- ret = (RCLReturnEnum)NativeRcl.rcl_take_request(ref serviceHandle, ref header, message.Handle);
- }
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the service is in use.
+ /// Disposal is automatically performed on finalization by the GC.
+ ///
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ // finalizer not needed when we disposed successfully
+ GC.SuppressFinalize(this);
+ }
- if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_OK)
- {
- ProcessRequest(header, message);
- }
- }
+ /// Disposal logic.
+ /// If this method is not called in a finalizer.
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ // only do if Node.CurrentServices has not been finalized
+ // save since if we are being finalized we are not in a wait set anymore
+ if (disposing)
+ {
+ bool success = this.Node.RemoveService(this);
+ Debug.Assert(success, "failed to remove service");
+ }
+
+ (this as IRawService).DisposeFromNode();
+ }
- ///
- /// Populates managed fields with native values and calls the callback with the created message
- ///
- /// Sending the Response is also takes care of by this method
- /// Message that will be populated and provided to the callback
- /// request id received when taking the Request
- private void ProcessRequest(rcl_rmw_request_id_t header, MessageInternals message)
- {
- message.ReadNativeMessage();
- O response = callback((I)message);
- SendResp(header, response);
- }
+ ///
+ void IRawService.DisposeFromNode()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
- ~Service()
- {
- DestroyService();
- }
+ Utils.CheckReturnEnum(NativeRcl.rcl_service_fini(this.Handle, this.Node.Handle));
+ this.FreeHandles();
+ }
- public void Dispose()
- {
- DestroyService();
- }
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_service(this.Handle);
+ this.Handle = IntPtr.Zero;
+ NativeRclInterface.rclcs_service_dispose_options(this.Options);
+ this.Options = IntPtr.Zero;
+ }
- /// "Destructor" supporting disposable model
- private void DestroyService()
- {
- lock (mutex)
- {
- if (!disposed)
+ ~Service()
{
- Utils.CheckReturnEnum(NativeRcl.rcl_service_fini(ref serviceHandle, ref nodeHandle));
- NativeRclInterface.rclcs_node_dispose_options(serviceOptions);
- disposed = true;
- Ros2csLogger.GetInstance().LogInfo("Service destroyed");
+ this.Dispose(false);
}
- }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/Subscription.cs b/src/ros2cs/ros2cs_core/Subscription.cs
index 19885f97..4132b9c1 100644
--- a/src/ros2cs/ros2cs_core/Subscription.cs
+++ b/src/ros2cs/ros2cs_core/Subscription.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,122 +13,187 @@
// limitations under the License.
using System;
+using System.Diagnostics;
using ROS2.Internal;
namespace ROS2
{
- /// Subscription to a topic with a given type
- /// Subscriptions are created through INode interface (CreateSubscription)
- public class Subscription: ISubscription where T : Message, new ()
- {
- public rcl_subscription_t Handle { get { return subscriptionHandle; } }
- private rcl_subscription_t subscriptionHandle;
-
- public string Topic { get { return topic; } }
- private string topic;
-
- public bool IsDisposed { get { return disposed; } }
- private bool disposed = false;
-
- private rcl_node_t nodeHandle;
- private readonly Action callback;
- private IntPtr subscriptionOptions;
-
- public object Mutex { get { return mutex; } }
- private object mutex = new object();
-
- /// Tries to get a message from rcl/rmw layers. Calls the callback if successful
- // TODO(adamdbrw) this should not be public - add an internal interface
- public void TakeMessage()
+ ///
+ /// Subscription of a topic with a given type wrapping a rcl subscription.
+ ///
+ ///
+ /// This is the implementation produced by ,
+ /// use this method to create new instances.
+ ///
+ ///
+ ///
+ public sealed class Subscription : ISubscription, IRawSubscription where T : Message, new()
{
- RCLReturnEnum ret;
- MessageInternals message;
- lock (mutex)
- {
- if (disposed || !Ros2cs.Ok())
+ ///
+ public string Topic { get; private set; }
+
+ ///
+ public bool IsDisposed
{
- return;
+ get
+ {
+ bool ok = NativeRclInterface.rclcs_subscription_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
+ }
}
- message = CreateMessage();
- ret = (RCLReturnEnum)NativeRcl.rcl_take(ref subscriptionHandle, message.Handle, IntPtr.Zero, IntPtr.Zero);
- }
-
- bool gotMessage = ret == RCLReturnEnum.RCL_RET_OK;
+ ///
+ /// Handle to the rcl subscription
+ ///
+ public IntPtr Handle { get; private set; } = IntPtr.Zero;
+
+ ///
+ /// Handle to the rcl subscription options
+ ///
+ private IntPtr Options = IntPtr.Zero;
+
+ ///
+ /// Node associated with this instance.
+ ///
+ private readonly Node Node;
+
+ ///
+ /// Callback invoked when a message is received.
+ ///
+ private readonly Action Callback;
+
+ ///
+ /// Create a new instance.
+ ///
+ ///
+ /// The caller is responsible for adding the instance to .
+ /// This action is not thread safe.
+ ///
+ /// Topic to subscribe to.
+ /// Node to associate with.
+ /// Callback invoked when a message is received.
+ /// QOS setting for this subscription.
+ /// If was disposed.
+ internal Subscription(string topic, Node node, Action callback, QualityOfServiceProfile qos = null)
+ {
+ this.Topic = topic;
+ this.Node = node;
+ this.Callback = callback;
+
+ QualityOfServiceProfile qualityOfServiceProfile = qos ?? new QualityOfServiceProfile();
+
+ this.Options = NativeRclInterface.rclcs_subscription_create_options(qualityOfServiceProfile.handle);
+
+ IntPtr typeSupportHandle = MessageTypeSupportHelper.GetTypeSupportHandle();
+
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_subscription();
+ int ret = NativeRcl.rcl_subscription_init(
+ this.Handle,
+ this.Node.Handle,
+ typeSupportHandle,
+ this.Topic,
+ this.Options
+ );
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
+ }
- if (gotMessage)
- {
- TriggerCallback(message);
- }
- }
+ ///
+ /// This method is not thread safe.
+ ///
+ ///
+ public bool TryProcess()
+ {
+ T message = new T();
+ int ret = NativeRcl.rcl_take(
+ this.Handle,
+ (message as MessageInternals).Handle,
+ IntPtr.Zero,
+ IntPtr.Zero
+ );
+ GC.KeepAlive(this);
+
+ switch ((RCLReturnEnum)ret)
+ {
+ case RCLReturnEnum.RCL_RET_SUBSCRIPTION_TAKE_FAILED:
+ case RCLReturnEnum.RCL_RET_SUBSCRIPTION_INVALID:
+ return false;
+ default:
+ Utils.CheckReturnEnum(ret);
+ break;
+ }
+
+ (message as MessageInternals).ReadNativeMessage();
+ this.Callback(message);
+ return true;
+ }
- /// Construct a message of the subscription type
- private MessageInternals CreateMessage()
- {
- return new T() as MessageInternals;
- }
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the subscription is in use.
+ /// Disposal is automatically performed on finalization by the GC.
+ ///
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ // finalizer not needed when we disposed successfully
+ GC.SuppressFinalize(this);
+ }
- /// Populates managed fields with native values and calls the callback with created message
- /// Message that will be populated and returned through callback
- private void TriggerCallback(MessageInternals message)
- {
- message.ReadNativeMessage();
- callback((T)message);
- }
+ /// Disposal logic.
+ /// If this method is not called in a finalizer.
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ // only do if Node.CurrentSubscriptions has not been finalized
+ // save since if we are being finalized we are not in a wait set anymore
+ if (disposing)
+ {
+ bool success = this.Node.RemoveSubscription(this);
+ Debug.Assert(success, "failed to remove subscription");
+ }
+
+ (this as IRawSubscription).DisposeFromNode();
+ }
- /// Internal constructor for Subscription. Use INode.CreateSubscription to construct
- ///
- internal Subscription(string subTopic, Node node, Action cb, QualityOfServiceProfile qos = null)
- {
- callback = cb;
- nodeHandle = node.nodeHandle;
- topic = subTopic;
- subscriptionHandle = NativeRcl.rcl_get_zero_initialized_subscription();
-
- QualityOfServiceProfile qualityOfServiceProfile = qos;
- if (qualityOfServiceProfile == null)
- {
- qualityOfServiceProfile = new QualityOfServiceProfile();
- }
-
- subscriptionOptions = NativeRclInterface.rclcs_subscription_create_options(qualityOfServiceProfile.handle);
-
- T msg = new T();
- MessageInternals msgInternals = msg as MessageInternals;
- IntPtr typeSupportHandle = msgInternals.TypeSupportHandle;
- msg.Dispose();
-
- Utils.CheckReturnEnum(NativeRcl.rcl_subscription_init(
- ref subscriptionHandle,
- ref node.nodeHandle,
- typeSupportHandle,
- topic,
- subscriptionOptions));
- }
+ ///
+ void IRawSubscription.DisposeFromNode()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
- ~Subscription()
- {
- DestroySubscription();
- }
+ Utils.CheckReturnEnum(NativeRcl.rcl_subscription_fini(this.Handle, this.Node.Handle));
+ this.FreeHandles();
+ }
- public void Dispose()
- {
- DestroySubscription();
- }
+ ///
+ /// Free the rcl handles and replace them with null pointers.
+ ///
+ ///
+ /// The handles are not finalised by this method.
+ ///
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_subscription(this.Handle);
+ this.Handle = IntPtr.Zero;
+ NativeRclInterface.rclcs_subscription_dispose_options(this.Options);
+ this.Options = IntPtr.Zero;
+ }
- /// "Destructor" supporting disposable model
- private void DestroySubscription()
- {
- lock (mutex)
- {
- if (!disposed)
+ ~Subscription()
{
- Utils.CheckReturnEnum(NativeRcl.rcl_subscription_fini(ref subscriptionHandle, ref nodeHandle));
- NativeRclInterface.rclcs_node_dispose_options(subscriptionOptions);
- disposed = true;
- Ros2csLogger.GetInstance().LogInfo("Subscription destroyed");
+ this.Dispose(false);
}
- }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/WaitSet.cs b/src/ros2cs/ros2cs_core/WaitSet.cs
index 18d21935..3bfa9c44 100644
--- a/src/ros2cs/ros2cs_core/WaitSet.cs
+++ b/src/ros2cs/ros2cs_core/WaitSet.cs
@@ -1,4 +1,5 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+// Copyright 2019-2022 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,181 +14,491 @@
// limitations under the License.
using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
namespace ROS2
{
- internal enum AddResult
- {
- SUCCESS,
- FULL,
- DISPOSED
- }
+ ///
+ /// Collection used for waiting on resources to become ready.
+ /// All methods and properties are NOT thread safe.
+ ///
+ internal sealed class WaitSet : IReadOnlyCollection, IExtendedDisposable
+ {
+ ///
+ /// The instances currently in the wait set.
+ ///
+ public ICollection Subscriptions { get { return this.CurrentSubscriptions; } }
- internal class WaitSet
- {
- internal ulong SubscriptionCount {get { return Handle.size_of_subscriptions.ToUInt64(); }}
+ ///
+ /// The instances currently in the wait set.
+ ///
+ public ICollection Clients { get { return this.CurrentClients; } }
- internal ulong ClientCount {get { return Handle.size_of_clients.ToUInt64(); }}
+ ///
+ /// The instances currently in the wait set.
+ ///
+ public ICollection Services { get { return this.CurrentServices; } }
- internal ulong ServiceCount {get { return Handle.size_of_services.ToUInt64(); }}
+ ///
+ /// The instances currently in the wait set.
+ ///
+ public ICollection GuardConditions { get { return this.CurrentGuardConditions; } }
- private rcl_wait_set_t Handle;
+ ///
+ /// Context associated with this wait set.
+ ///
+ public IContext Context
+ {
+ get => this.ROSContext;
+ }
- internal WaitSet(ref rcl_context_t context)
- {
- Handle = NativeRcl.rcl_get_zero_initialized_wait_set();
- Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_init(
- ref Handle,
- (UIntPtr)0,
- (UIntPtr)0,
- (UIntPtr)0,
- (UIntPtr)0,
- (UIntPtr)0,
- (UIntPtr)0,
- ref context,
- NativeRcl.rcutils_get_default_allocator()));
- }
+ private Context ROSContext;
- ~WaitSet()
- {
- Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_fini(ref Handle));
- }
+ ///
+ public bool IsDisposed
+ {
+ get
+ {
+ bool ok = NativeRclInterface.rclcs_wait_set_is_valid(this.Handle);
+ GC.KeepAlive(this);
+ return !ok;
+ }
+ }
- internal void Clear()
- {
- Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_clear(ref Handle));
- }
+ ///
+ public int Count
+ {
+ get
+ {
+ return this.Subscriptions.Count
+ + this.Clients.Count
+ + this.Services.Count
+ + this.GuardConditions.Count;
+ }
+ }
- internal void Resize(ulong subscriptionCount, ulong clientCount, ulong serviceCount)
- {
- Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_resize(
- ref Handle,
- (UIntPtr)subscriptionCount,
- (UIntPtr)0,
- (UIntPtr)0,
- (UIntPtr)clientCount,
- (UIntPtr)serviceCount,
- (UIntPtr)0));
- }
+ ///
+ /// Handle to the rcl wait set.
+ ///
+ private IntPtr Handle = IntPtr.Zero;
- internal AddResult TryAddSubscription(ISubscriptionBase subscription, out ulong index)
- {
- UIntPtr native_index = default(UIntPtr);
- int ret;
- lock (subscription.Mutex)
- {
- if (subscription.IsDisposed)
- {
- index = default(ulong);
- return AddResult.DISPOSED;
- }
-
- rcl_subscription_t subscription_handle = subscription.Handle;
- ret = NativeRcl.rcl_wait_set_add_subscription(
- ref Handle,
- ref subscription_handle,
- ref native_index
- );
- }
-
- if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_WAIT_SET_FULL)
- {
- index = default(ulong);
- return AddResult.FULL;
- }
- else
- {
- Utils.CheckReturnEnum(ret);
- index = native_index.ToUInt64();
- return AddResult.SUCCESS;
- }
- }
+ ///
+ /// Modification version used to detect if the wait set was modified.
+ ///
+ private uint Version = 0;
- internal AddResult TryAddClient(IClientBase client, out ulong index)
- {
- UIntPtr native_index = default(UIntPtr);
- int ret;
- lock (client.Mutex)
- {
- if (client.IsDisposed)
- {
- index = default(ulong);
- return AddResult.DISPOSED;
- }
-
- rcl_client_t client_handle = client.Handle;
- ret = NativeRcl.rcl_wait_set_add_client(
- ref Handle,
- ref client_handle,
- ref native_index
- );
- }
-
- if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_WAIT_SET_FULL)
- {
- index = default(ulong);
- return AddResult.FULL;
- }
- else
- {
- Utils.CheckReturnEnum(ret);
- index = native_index.ToUInt64();
- return AddResult.SUCCESS;
- }
- }
+ // are exposed as collections to prevent users from depending on the changing indexes
+ private readonly List CurrentSubscriptions = new List();
- internal AddResult TryAddService(IServiceBase service, out ulong index)
- {
- UIntPtr native_index = default(UIntPtr);
- int ret;
-
- lock (service.Mutex)
- {
- if (service.IsDisposed)
- {
- index = default(ulong);
- return AddResult.DISPOSED;
- }
-
- rcl_service_t service_handle = service.Handle;
- ret = NativeRcl.rcl_wait_set_add_service(
- ref Handle,
- ref service_handle,
- ref native_index
- );
- }
-
-
- if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_WAIT_SET_FULL)
- {
- index = default(ulong);
- return AddResult.FULL;
- }
- else
- {
- Utils.CheckReturnEnum(ret);
- index = native_index.ToUInt64();
- return AddResult.SUCCESS;
- }
- }
+ private readonly List CurrentClients = new List();
- internal bool Wait()
- {
- return Wait(TimeSpan.FromTicks(-1));
- }
+ private readonly List CurrentServices = new List();
- internal bool Wait(TimeSpan timeout)
- {
- int ret = NativeRcl.rcl_wait(ref Handle, timeout.Ticks * 100);
- if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_TIMEOUT)
- {
- return false;
- }
- else
- {
- Utils.CheckReturnEnum(ret);
- return true;
- }
+ private readonly List CurrentGuardConditions = new List();
+
+ ///
+ /// Construct a new instance.
+ ///
+ /// Associated context
+ internal WaitSet(Context context)
+ {
+ this.ROSContext = context;
+ this.Handle = NativeRclInterface.rclcs_get_zero_initialized_wait_set();
+ int ret = NativeRcl.rcl_wait_set_init(
+ this.Handle,
+ new UIntPtr(Convert.ToUInt32(this.CurrentSubscriptions.Capacity)),
+ new UIntPtr(Convert.ToUInt32(this.CurrentGuardConditions.Capacity)),
+ UIntPtr.Zero,
+ new UIntPtr(Convert.ToUInt32(this.CurrentClients.Capacity)),
+ new UIntPtr(Convert.ToUInt32(this.CurrentServices.Capacity)),
+ UIntPtr.Zero,
+ context.Handle,
+ NativeRcl.rcutils_get_default_allocator()
+ );
+ if ((RCLReturnEnum)ret != RCLReturnEnum.RCL_RET_OK)
+ {
+ this.FreeHandles();
+ Utils.CheckReturnEnum(ret);
+ }
+ }
+
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ return this.Subscriptions
+ .Concat(this.Clients)
+ .Concat(this.Services)
+ .Concat(this.GuardConditions)
+ .GetEnumerator();
+ }
+
+ ///
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+
+ ///
+ /// Wait for something to become ready.
+ ///
+ ///
+ /// This will invalidate previous wait results.
+ ///
+ /// The resources that became ready
+ ///
+ /// The wait set can only be waited on if it contains something
+ ///
+ public WaitResult Wait()
+ {
+ if (this.TryWait(TimeSpan.FromTicks(-1), out WaitResult ready))
+ {
+ return ready;
+ }
+ // should never happen
+ throw new TimeoutException("infinite wait timed out");
+ }
+
+ ///
+ /// Resize the wait set to have the same size as the collections holding the resources and clear it.
+ ///
+ ///
+ /// No allocation will be done if the new size of the wait set matches the current size.
+ ///
+ private void PrepareWaitSet()
+ {
+ int ret = NativeRcl.rcl_wait_set_resize(
+ this.Handle,
+ new UIntPtr(Convert.ToUInt32(this.CurrentSubscriptions.Count)),
+ new UIntPtr(Convert.ToUInt32(this.CurrentGuardConditions.Count)),
+ UIntPtr.Zero,
+ new UIntPtr(Convert.ToUInt32(this.CurrentClients.Count)),
+ new UIntPtr(Convert.ToUInt32(this.CurrentServices.Count)),
+ UIntPtr.Zero
+ );
+ if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_INVALID_ARGUMENT)
+ {
+ throw new ObjectDisposedException("RCL wait set");
+ }
+ Utils.CheckReturnEnum(ret);
+ }
+
+ ///
+ /// Check if the wait set contains something at an index.
+ ///
+ /// Delegate used for accessing the array of that resource
+ /// Index to check
+ /// Whether the wait set already contains a resource
+ /// The wait set does not contain the index
+ private bool IsAdded(NativeRclInterface.WaitSetGetType getter, int index)
+ {
+ if (getter(this.Handle, new UIntPtr(Convert.ToUInt32(index)), out IntPtr ptr))
+ {
+ return ptr != IntPtr.Zero;
+ }
+ throw new IndexOutOfRangeException($"wait set has no index {index}");
+ }
+
+ ///
+ /// Fill the wait set of a resource.
+ ///
+ ///
+ /// The wrapper will be updated if the wait set adds resources at different indexes.
+ ///
+ /// Type of the resource
+ /// Delegate used for adding to the wait set
+ /// Delegate used for accessing the wait set
+ /// Resources to add
+ private void FillWaitSet(NativeRcl.WaitSetAddType adder, NativeRclInterface.WaitSetGetType getter, IList wrappers)
+ where T : IWaitable
+ {
+ if (wrappers.Count == 0)
+ {
+ return;
+ }
+ int filled = 0;
+ int index = 0;
+ // add index to wait set until it is filled
+ while (true)
+ {
+ Utils.CheckReturnEnum(adder(this.Handle, wrappers[index].Handle, out UIntPtr destination));
+ filled += 1;
+ int newIndex = Convert.ToInt32(destination.ToUInt32());
+ if (newIndex != index)
+ {
+ // different wait set index, update wrappers and repeat with not added resource
+ (wrappers[index], wrappers[newIndex]) = (wrappers[newIndex], wrappers[index]);
+ continue;
+ }
+ if (filled >= wrappers.Count)
+ {
+ // all wrappers filled, skip searching for next index to prevent triggering IndexOutOfRangeException
+ break;
+ }
+ // some wrappers are not added yet, advance to next index not already in wait set
+ // IndexOutOfRangeException indicates that not all wrappers could be added and
+ // should not be ignored since it hints at a bug or threading issue
+ do
+ {
+ index += 1;
+ }
+ while (this.IsAdded(getter, index));
+ }
+ }
+
+ ///
+ /// Fill the wait set of all resources.
+ ///
+ ///
+ /// This will clear and resize the wait set first.
+ ///
+ private void FillWaitSet()
+ {
+ this.PrepareWaitSet();
+ this.FillWaitSet(
+ NativeRcl.rcl_wait_set_add_subscription,
+ NativeRclInterface.rclcs_wait_set_get_subscription,
+ this.CurrentSubscriptions
+ );
+ this.FillWaitSet(
+ NativeRcl.rcl_wait_set_add_client,
+ NativeRclInterface.rclcs_wait_set_get_client,
+ this.CurrentClients
+ );
+ this.FillWaitSet(
+ NativeRcl.rcl_wait_set_add_service,
+ NativeRclInterface.rclcs_wait_set_get_service,
+ this.CurrentServices
+ );
+ this.FillWaitSet(
+ NativeRcl.rcl_wait_set_add_guard_condition,
+ NativeRclInterface.rclcs_wait_set_get_guard_condition,
+ this.CurrentGuardConditions
+ );
+ }
+
+ /// Timeout for waiting, infinite if negative
+ /// The resources that became ready
+ /// Whether the wait did not timed out
+ ///
+ public bool TryWait(TimeSpan timeout, out WaitResult result)
+ {
+ // invalidate last wait result
+ this.Version += 1;
+
+ this.FillWaitSet();
+
+ long nanoSeconds = timeout.Ticks * (1_000_000_000 / TimeSpan.TicksPerSecond);
+ int ret = NativeRcl.rcl_wait(this.Handle, nanoSeconds);
+ if ((RCLReturnEnum)ret == RCLReturnEnum.RCL_RET_TIMEOUT)
+ {
+ result = default(WaitResult);
+ return false;
+ }
+ Utils.CheckReturnEnum(ret);
+
+ result = new WaitResult(this);
+ return true;
+ }
+
+ ///
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+
+ /// Disposal logic.
+ /// If this method is not called in a finalizer
+ private void Dispose(bool disposing)
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ if (disposing)
+ {
+ bool success = this.ROSContext.RemoveWaitSet(this);
+ Debug.Assert(success, "failed to remove wait set");
+ this.ClearCollections();
+ }
+
+ Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_fini(this.Handle));
+ this.FreeHandles();
+ }
+
+ /// Dispose without modifying the context.
+ internal void DisposeFromContext()
+ {
+ if (this.Handle == IntPtr.Zero)
+ {
+ return;
+ }
+
+ this.ClearCollections();
+
+ Utils.CheckReturnEnum(NativeRcl.rcl_wait_set_fini(this.Handle));
+ this.FreeHandles();
+ }
+
+ private void ClearCollections()
+ {
+ this.Subscriptions.Clear();
+ this.Clients.Clear();
+ this.Services.Clear();
+ this.GuardConditions.Clear();
+ }
+
+ private void FreeHandles()
+ {
+ NativeRclInterface.rclcs_free_wait_set(this.Handle);
+ this.Handle = IntPtr.Zero;
+ }
+
+ ~WaitSet()
+ {
+ this.Dispose(false);
+ }
+
+ ///
+ /// Result of waiting on a wait set.
+ ///
+ ///
+ /// The enumerables are invalidated when waiting on the wait set again,
+ /// which is only done for debugging purposes and not done when the
+ /// collections containing the primitives change.
+ ///
+ public struct WaitResult : IEnumerable
+ {
+ ///
+ /// Subscriptions which are ready.
+ ///
+ public IEnumerable ReadySubscriptions
+ {
+ get => this.CurrentPrimitives(
+ NativeRclInterface.rclcs_wait_set_get_subscription,
+ this.WaitSet.CurrentSubscriptions
+ );
+ }
+
+ ///
+ /// Clients which are ready.
+ ///
+ public IEnumerable ReadyClients
+ {
+ get => this.CurrentPrimitives(
+ NativeRclInterface.rclcs_wait_set_get_client,
+ this.WaitSet.CurrentClients
+ );
+ }
+
+ ///
+ /// Services which are ready.
+ ///
+ public IEnumerable ReadyServices
+ {
+ get => this.CurrentPrimitives(
+ NativeRclInterface.rclcs_wait_set_get_service,
+ this.WaitSet.CurrentServices
+ );
+ }
+
+ ///
+ /// Guard conditions which are ready.
+ ///
+ public IEnumerable ReadyGuardConditions
+ {
+ get => this.CurrentPrimitives(
+ NativeRclInterface.rclcs_wait_set_get_guard_condition,
+ this.WaitSet.CurrentGuardConditions
+ );
+ }
+
+ ///
+ /// Wait set associated with this result.
+ ///
+ private readonly WaitSet WaitSet;
+
+ ///
+ /// Version when this result was created.
+ ///
+ private readonly uint CreatedVersion;
+
+ internal WaitResult(WaitSet waitSet)
+ {
+ this.WaitSet = waitSet;
+ this.CreatedVersion = waitSet.Version;
+ }
+
+ ///
+ /// Assert that the wait set is valid and has not been waited on.
+ ///
+ /// If the wait set was disposed.
+ private void AssertOk()
+ {
+ if (this.WaitSet.Version != this.CreatedVersion || this.WaitSet.IsDisposed)
+ {
+ throw new ObjectDisposedException("rcl wait set");
+ }
+ }
+
+ ///
+ /// Primitives currently in the wait set.
+ ///
+ ///
+ /// After waiting the only primitives left in
+ /// the wait set are ready.
+ ///
+ /// Primitive type
+ /// Function to access the wait set array.
+ /// List holding the primitives.
+ /// Enumerable of the primitives.
+ private IEnumerable CurrentPrimitives(NativeRclInterface.WaitSetGetType getter, IList primitives) where T : IWaitable
+ {
+ this.AssertOk();
+ for (UIntPtr index = UIntPtr.Zero; getter(this.WaitSet.Handle, index, out IntPtr ptr); index += 1)
+ {
+ if (ptr != IntPtr.Zero)
+ {
+ yield return primitives[Convert.ToInt32(index.ToUInt32())];
+ this.AssertOk();
+ }
+ }
+ }
+
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ return this.ReadySubscriptions
+ .Concat(this.ReadyClients)
+ .Concat(this.ReadyServices)
+ .Concat(this.ReadyGuardConditions)
+ .GetEnumerator();
+ }
+
+ ///
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+
+ ///
+ /// Deconstruct the result into the resources which are ready.
+ ///
+ public void Deconstruct(
+ out IEnumerable subscriptions,
+ out IEnumerable clients,
+ out IEnumerable services,
+ out IEnumerable guard_conditions)
+ {
+ subscriptions = this.ReadySubscriptions;
+ clients = this.ReadyClients;
+ services = this.ReadyServices;
+ guard_conditions = this.ReadyGuardConditions;
+ }
+ }
}
- }
}
diff --git a/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs b/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs
new file mode 100644
index 00000000..3889f75b
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/executors/ManualExecutor.cs
@@ -0,0 +1,583 @@
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ROS2.Executors
+{
+ ///
+ /// Executor which has to be spun manually.
+ ///
+ ///
+ /// Spinning is impossible if a rescan is scheduled
+ /// to allow waiting to stop when the executor is not spinning.
+ ///
+ public sealed class ManualExecutor : IExecutor
+ {
+ ///
+ /// Context associated with this executor.
+ ///
+ public IContext Context
+ {
+ get { return this.WaitSet.Context; }
+ }
+
+ ///
+ /// Whether the executor is currently spinning.
+ ///
+ public bool IsSpinning
+ {
+ get => this._IsSpinning;
+ private set => this._IsSpinning = value;
+ }
+
+ private volatile bool _IsSpinning = false;
+
+ ///
+ /// Whether a rescan is scheduled.
+ ///
+ public bool RescanScheduled
+ {
+ get => this._RescanScheduled;
+ private set => this._RescanScheduled = value;
+ }
+
+ // volatile since it may be changed by multiple threads
+ private volatile bool _RescanScheduled = false;
+
+ ///
+ /// To prevent from being starved by multiple spins.
+ ///
+ private long SpinId = 0;
+
+ ///
+ public bool IsDisposed
+ {
+ get { return this.WaitSet.IsDisposed || this.InterruptCondition.IsDisposed; }
+ }
+
+ ///
+ /// This property is thread safe.
+ ///
+ ///
+ public int Count
+ {
+ get
+ {
+ lock (this.Nodes)
+ {
+ return this.Nodes.Count;
+ }
+ }
+ }
+
+ ///
+ /// This property is thread safe.
+ ///
+ ///
+ public bool IsReadOnly
+ {
+ get { return false; }
+ }
+
+ ///
+ /// Wait set used while spinning.
+ ///
+ ///
+ /// Is also used to notify when the executor finished spinning.
+ ///
+ private readonly WaitSet WaitSet;
+
+ ///
+ /// Guard condition used for interrupting waits.
+ ///
+ private readonly GuardCondition InterruptCondition;
+
+ ///
+ /// Nodes in the executor.
+ ///
+ private readonly HashSet Nodes = new HashSet();
+
+ ///
+ /// Create a new instance.
+ ///
+ /// Context to associate with.
+ /// If is disposed.
+ public ManualExecutor(Context context) : this(
+ context.CreateWaitSet(),
+ context.CreateGuardCondition(() => { })
+ )
+ { }
+
+ ///
+ internal ManualExecutor(WaitSet waitSet, GuardCondition interruptCondition)
+ {
+ this.WaitSet = waitSet;
+ this.InterruptCondition = interruptCondition;
+ this.WaitSet.GuardConditions.Add(this.InterruptCondition);
+ }
+
+ ///
+ /// This method is thread safe when setting
+ /// is thread safe
+ /// and not changed concurrently.
+ ///
+ /// If the node already has an executor.
+ ///
+ public void Add(INode node)
+ {
+ if (!(node.Executor is null))
+ {
+ throw new InvalidOperationException("node already has an executor");
+ }
+ // make sure the node knows its
+ // new executor before it can be added to the wait set
+ node.Executor = this;
+ lock (this.Nodes)
+ {
+ this.Nodes.Add(node);
+ }
+ this.ScheduleRescan();
+ }
+
+ ///
+ /// This method is thread safe when setting
+ /// is thread safe.
+ ///
+ /// If the executor was disposed.
+ ///
+ public bool Remove(INode node)
+ {
+ bool removed;
+ lock (this.Nodes)
+ {
+ removed = this.Nodes.Remove(node);
+ }
+ if (removed)
+ {
+ try
+ {
+ Debug.Assert(
+ Object.ReferenceEquals(node.Executor, this),
+ "node has different executor"
+ );
+ this.ScheduleRescan();
+ this.Wait();
+ }
+ finally
+ {
+ // clear executor after it
+ // is safe to do so
+ node.Executor = null;
+ }
+ }
+ return removed;
+ }
+
+ ///
+ /// This method is thread safe when setting
+ /// is thread safe.
+ ///
+ /// If the executor was disposed.
+ ///
+ public void Clear()
+ {
+ // use thread safe enumerator
+ foreach (INode node in this)
+ {
+ this.Remove(node);
+ }
+ }
+
+ ///
+ /// This method is thread safe.
+ ///
+ ///
+ public bool Contains(INode node)
+ {
+ lock (this.Nodes)
+ {
+ return this.Nodes.Contains(node);
+ }
+ }
+
+ ///
+ /// This method is thread safe.
+ ///
+ ///
+ public void CopyTo(INode[] array, int index)
+ {
+ if (array is null)
+ {
+ throw new ArgumentException("array is null");
+ }
+ if (index < 0)
+ {
+ throw new ArgumentOutOfRangeException("index is less than 0");
+ }
+ lock (this.Nodes)
+ {
+ foreach (INode item in this.Nodes)
+ {
+ try
+ {
+ array[index] = item;
+ }
+ catch (IndexOutOfRangeException e)
+ {
+ throw new ArgumentException("array is too small", e);
+ }
+ index += 1;
+ }
+ }
+ }
+
+ ///
+ /// The enumerator is thread safe.
+ ///
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ lock (this.Nodes)
+ {
+ return this.Nodes.ToArray().AsEnumerable().GetEnumerator();
+ }
+ }
+
+ ///
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+
+ ///
+ /// This method is thread safe.
+ ///
+ ///
+ public void ScheduleRescan()
+ {
+ this.RescanScheduled = true;
+ }
+
+ ///
+ /// This method is an alias for .
+ ///
+ ///
+ public bool TryScheduleRescan(INode node)
+ {
+ this.ScheduleRescan();
+ return true;
+ }
+
+ ///
+ /// This method is thread safe and uses .
+ ///
+ ///
+ public void Wait()
+ {
+ if (this.RescanScheduled)
+ {
+ lock (this.WaitSet)
+ {
+ this.WaitUntilDone(this.SpinId);
+ }
+ }
+ }
+
+ ///
+ /// This method is thread safe.
+ ///
+ /// If the executor was disposed.
+ /// If the timeout is negative or too big.
+ ///
+ public bool TryWait(TimeSpan timeout)
+ {
+ if (timeout.Ticks < 0)
+ {
+ throw new ArgumentOutOfRangeException("timeout is negative");
+ }
+ if (this.RescanScheduled)
+ {
+ lock (this.WaitSet)
+ {
+ // read id inside the lock to prevent an outdated id from being copied
+ return this.WaitUntilDone(this.SpinId, timeout);
+ }
+ }
+ return true;
+ }
+
+ ///
+ /// Utility method to wait until the current spin has finished.
+ ///
+ ///
+ /// This replaces a which did starve waiters
+ /// when spinning multiple times.
+ ///
+ /// Current spin id.
+ private void WaitUntilDone(long spinId)
+ {
+ // the condition is checked with the lock held to prevent
+ // a the spin from pulsing before the wait can be started
+ while (this.IsSpinning && this.SpinId == spinId)
+ {
+ try
+ {
+ // stop a possible current spin
+ this.Interrupt();
+ }
+ catch (ObjectDisposedException)
+ {
+ // if the context is shut down then the
+ // guard condition might be disposed but
+ // nodes still have to be removed
+ }
+ Monitor.Wait(this.WaitSet);
+ }
+ }
+
+ ///
+ /// Utility method to wait until the current spin has finished.
+ ///
+ /// Current spin id.
+ /// Timeout when waiting
+ /// Whether the wait did not time out.
+ /// Timeout is too big.
+ private bool WaitUntilDone(long spinId, TimeSpan timeout)
+ {
+ int milliSeconds;
+ try
+ {
+ milliSeconds = Convert.ToInt32(timeout.TotalMilliseconds);
+ }
+ catch (OverflowException e)
+ {
+ throw new ArgumentOutOfRangeException("timeout too big", e);
+ }
+ int remainingTimeout = milliSeconds;
+ uint startTime = (uint)Environment.TickCount;
+ while (this.IsSpinning && this.SpinId == spinId)
+ {
+ try
+ {
+ // stop a possible current spin
+ this.Interrupt();
+ }
+ catch (ObjectDisposedException)
+ {
+ // if the context is shut down then the
+ // guard condition might be disposed but
+ // nodes still have to be removed
+ }
+ if (!Monitor.Wait(this.WaitSet, remainingTimeout))
+ {
+ // if the wait timed out return immediately
+ return false;
+ }
+ // update the timeout for the next wait
+ uint elapsed = (uint)Environment.TickCount - startTime;
+ if (elapsed > int.MaxValue)
+ {
+ return false;
+ }
+ remainingTimeout = milliSeconds - (int)elapsed;
+ if (remainingTimeout <= 0)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ ///
+ /// Interrupt the next or current .
+ ///
+ ///
+ /// This method only causes the wait to be skipped, work which is ready will be executed.
+ /// This method is thread safe.
+ ///
+ /// If the executor or context was disposed.
+ public void Interrupt()
+ {
+ this.InterruptCondition.Trigger();
+ }
+
+ ///
+ /// Try to process work if no rescan is scheduled.
+ ///
+ ///
+ /// This method is thread safe if it itself or is not executed concurrently.
+ ///
+ /// Maximum time to wait for work to become available.
+ /// If the executor or context was disposed.
+ /// Whether work could be processed since no rescan was scheduled.
+ public bool TrySpin(TimeSpan timeout)
+ {
+ this.IsSpinning = true;
+ try
+ {
+ // check after setting IsSpinning to
+ // prevent race condition
+ if (this.RescanScheduled)
+ {
+ return false;
+ }
+ if (this.WaitSet.TryWait(timeout, out var result))
+ {
+ foreach (IWaitable waitable in result)
+ {
+ waitable.TryProcess();
+ }
+ }
+ }
+ finally
+ {
+ // update flag before waking threads
+ this.IsSpinning = false;
+ lock (this.WaitSet)
+ {
+ // prevent other threads from reading stale result
+ // overflow is acceptable
+ unchecked { this.SpinId++; }
+ // notify other threads that we finished spinning
+ Monitor.PulseAll(this.WaitSet);
+ }
+ }
+ return true;
+ }
+
+ ///
+ /// Rescan the nodes of this executor for
+ /// new objects to wait for and clear scheduled rescans.
+ ///
+ ///
+ /// This method is thread safe if it itself or is not executed concurrently
+ /// and enumerating the primitives of the nodes is thread safe.
+ ///
+ public void Rescan()
+ {
+ // clear the wait set first to
+ // assert that the removed objects
+ // can be disposed even on error
+ this.WaitSet.Subscriptions.Clear();
+ this.WaitSet.Services.Clear();
+ this.WaitSet.Clients.Clear();
+ // clear the flag to prevent clearing rescans
+ // scheduled just after we finished rescaning
+ this.RescanScheduled = false;
+ try
+ {
+ // use the thread safe GetEnumerator wrapper
+ foreach (INode node in this)
+ {
+ foreach (ISubscriptionBase subscription in node.Subscriptions)
+ {
+ this.WaitSet.Subscriptions.Add(subscription);
+ }
+ foreach (IServiceBase service in node.Services)
+ {
+ this.WaitSet.Services.Add(service);
+ }
+ foreach (IClientBase client in node.Clients)
+ {
+ this.WaitSet.Clients.Add(client);
+ }
+ }
+ }
+ catch (Exception)
+ {
+ // schedule rescan since the wait set may not be filled completely
+ this.ScheduleRescan();
+ throw;
+ }
+ }
+
+ ///
+ /// Utility which spins while a condition is true
+ /// and handles automatic rescanning.
+ ///
+ ///
+ /// The condition check is performed before each spin.
+ ///
+ /// Condition which has to be true to continue spinning.
+ public void SpinWhile(Func condition)
+ {
+ this.SpinWhile(condition, TimeSpan.FromSeconds(0.1));
+ }
+
+ ///
+ /// Maximum time to wait for work to become available.
+ public void SpinWhile(Func condition, TimeSpan timeout)
+ {
+ while (condition())
+ {
+ if (!this.TrySpin(timeout))
+ {
+ this.Rescan();
+ }
+ }
+ }
+
+ ///
+ /// Create a task which calls when started.
+ ///
+ ///
+ /// The resulting task prevents and from being called
+ /// and this instance as well as its context from being disposed safely while it is running.
+ ///
+ /// Maximum time to wait for work to become available.
+ /// Token to cancel the task.
+ /// Task representing the spin operation.
+ public Task CreateSpinTask(TimeSpan timeout, CancellationToken cancellationToken)
+ {
+ return new Task(() => {
+ using (cancellationToken.Register(this.Interrupt))
+ {
+ this.SpinWhile(() => !cancellationToken.IsCancellationRequested, timeout);
+ }
+ cancellationToken.ThrowIfCancellationRequested();
+ }, cancellationToken, TaskCreationOptions.LongRunning);
+ }
+
+ ///
+ /// This method is not thread safe and may not be called from
+ /// multiple threads simultaneously or while the executor is in use.
+ /// Furthermore, it does not dispose the nodes of this executor.
+ /// Remember that is called when disposing
+ /// with nodes in the executor.
+ ///
+ ///
+ public void Dispose()
+ {
+ // remove nodes one by one to
+ // prevent node.Executor from being out
+ // of sync if an exception occurs
+ foreach (INode node in this.Nodes.ToArray())
+ {
+ this.Nodes.Remove(node);
+ // waiting not required since the executor
+ // should not be running
+ node.Executor = null;
+ }
+ this.WaitSet.Dispose();
+ this.InterruptCondition.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs b/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs
new file mode 100644
index 00000000..042fb51e
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/executors/TaskExecutor.cs
@@ -0,0 +1,191 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace ROS2.Executors
+{
+ ///
+ /// Executor which wraps a and automatically
+ /// executes the task created by .
+ ///
+ ///
+ /// The spin task is automatically stopped when
+ /// is called or the context is shut down.
+ ///
+ public sealed class TaskExecutor : IExecutor
+ {
+ ///
+ /// Task managed by this executor.
+ ///
+ public Task Task { get; private set; }
+
+ private readonly CancellationTokenSource CancellationSource = new CancellationTokenSource();
+
+ private readonly ManualExecutor Executor;
+
+ private readonly Context Context;
+
+ /// Context associated with this executor.
+ /// Maximum time to wait for work to become available.
+ public TaskExecutor(Context context, TimeSpan timeout)
+ {
+ this.Context = context;
+ this.Executor = new ManualExecutor(context);
+ this.Task = this.Executor.CreateSpinTask(timeout, this.CancellationSource.Token);
+ try
+ {
+ context.OnShutdown += this.StopSpinTask;
+ this.Task.Start();
+ }
+ catch (SystemException)
+ {
+ try
+ {
+ context.OnShutdown -= this.StopSpinTask;
+ }
+ finally
+ {
+ this.Executor.Dispose();
+ }
+ throw;
+ }
+ }
+
+ ///
+ public bool IsDisposed
+ {
+ get => this.Executor.IsDisposed;
+ }
+
+ ///
+ public int Count
+ {
+ get => this.Executor.Count;
+ }
+
+ ///
+ public bool IsReadOnly
+ {
+ get => this.Executor.IsReadOnly;
+ }
+
+ ///
+ public void Add(INode node)
+ {
+ this.Executor.Add(node);
+ }
+
+ ///
+ public void Clear()
+ {
+ this.Executor.Clear();
+ }
+
+ ///
+ public bool Contains(INode node)
+ {
+ return this.Executor.Contains(node);
+ }
+
+ ///
+ public void CopyTo(INode[] array, int arrayIndex)
+ {
+ this.Executor.CopyTo(array, arrayIndex);
+ }
+
+ ///
+ public bool Remove(INode node)
+ {
+ return this.Executor.Remove(node);
+ }
+
+ ///
+ public IEnumerator GetEnumerator()
+ {
+ return this.Executor.GetEnumerator();
+ }
+
+ ///
+ IEnumerator IEnumerable.GetEnumerator()
+ {
+ return this.GetEnumerator();
+ }
+
+ ///
+ public void ScheduleRescan()
+ {
+ this.Executor.ScheduleRescan();
+ }
+
+ ///
+ public bool TryScheduleRescan(INode node)
+ {
+ return this.Executor.TryScheduleRescan(node);
+ }
+
+ ///
+ public void Wait()
+ {
+ this.Executor.Wait();
+ }
+
+ ///
+ public bool TryWait(TimeSpan timeout)
+ {
+ return this.Executor.TryWait(timeout);
+ }
+
+ ///
+ /// Stop the spin task and return after it has stopped.
+ ///
+ ///
+ /// This function returns immediately if the spin task
+ /// has already been stopped.
+ ///
+ private void StopSpinTask()
+ {
+ try
+ {
+ this.CancellationSource.Cancel();
+ }
+ catch (ObjectDisposedException)
+ {
+ // task has been canceled before
+ }
+ try
+ {
+ this.Task.Wait();
+ }
+ catch (AggregateException e)
+ {
+ e.Handle(inner => inner is TaskCanceledException);
+ }
+ catch (ObjectDisposedException)
+ {
+ // task has already stopped
+ }
+ }
+
+ ///
+ ///
+ /// The wrapper handles stopping the spin task.
+ ///
+ public void Dispose()
+ {
+ try
+ {
+ this.StopSpinTask();
+ }
+ catch (AggregateException)
+ {
+ // prevent faulted task from preventing disposal
+ }
+ this.Context.OnShutdown -= this.StopSpinTask;
+ this.Task.Dispose();
+ this.Executor.Dispose();
+ this.CancellationSource.Dispose();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/ros2cs/ros2cs_core/interfaces/IClient.cs b/src/ros2cs/ros2cs_core/interfaces/IClient.cs
index 96a4e1a2..6ae8d5b6 100644
--- a/src/ros2cs/ros2cs_core/interfaces/IClient.cs
+++ b/src/ros2cs/ros2cs_core/interfaces/IClient.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -17,81 +17,74 @@
namespace ROS2
{
- /// Non-generic base interface for all subscriptions
- ///
- public interface IClientBase: IExtendedDisposable
- {
- ///
- /// Tries to get a Response message from rcl/rmw layers
- ///
+ /// Non-generic base interface for all clients.
///
- /// Marks the corresponding as finished if successful
+ /// This interface is useful for managing client collections and disposal.
+ /// Create instances with .
///
- // TODO this should not be public - add an internal interface
- void TakeMessage();
+ public interface IClientBase : IExtendedDisposable, IWaitable
+ {
+ /// Topic of this client.
+ string Topic { get; }
- ///
- /// topic name which was used when calling
- ///
- string Topic {get;}
+ ///
+ /// Requests which are pending for this client.
+ ///
+ IReadOnlyDictionary PendingRequests { get; }
- ///
- /// Requests which are pending for this client.
- ///
- IReadOnlyDictionary PendingRequests {get;}
+ ///
+ /// Check if the service to be called is available
+ ///
+ /// if the service is avilable
+ bool IsServiceAvailable();
- ///
- /// Remove a pending and cancel it.
- ///
- ///
- /// Tasks are automatically removed on completion and have to be removed only when canceled.
- ///
- /// Task to be removed.
- /// Whether the Task was removed successfully.
- bool Cancel(Task task);
-
- rcl_client_t Handle {get;}
-
- /// service mutex for internal use
- object Mutex { get; }
- }
+ ///
+ /// Remove a pending and cancel it.
+ ///
+ ///
+ /// Tasks are automatically removed on completion and have to be removed only when canceled.
+ ///
+ /// Task to be removed.
+ /// Whether the Task was removed successfully.
+ bool Cancel(Task task);
+ }
- /// Generic base interface for all subscriptions
- /// Message Type to be send
- /// Message Type to be received
- ///
- public interface IClient: IClientBase
- where I: Message
- where O: Message
- {
- ///
- /// Requests which are pending for this client.
- ///
- new IReadOnlyDictionary> PendingRequests {get;}
+ /// Internal client extensions.
+ internal interface IRawClient : IClientBase
+ {
+ /// Dispose without modifying the node.
+ void DisposeFromNode();
+ }
- ///
- /// Check if the service to be called is available
- ///
- /// if the service is avilable
- bool IsServiceAvailable();
+ /// Generic base interface for all clients.
+ /// Message Type to be send.
+ /// Message Type to be received.
+ public interface IClient : IClientBase
+ where I : Message
+ where O : Message
+ {
+ ///
+ /// Requests which are pending for this client.
+ ///
+ new IReadOnlyDictionary> PendingRequests { get; }
- ///
- /// Send a Request to a Service and wait for a Response
- ///
- /// The provided message can be modified or disposed after this call
- /// Message to be send
- /// Response of the Service
- O Call(I msg);
+ ///
+ /// Send a Request to a Service and wait for a Response
+ ///
+ /// The provided message can be modified or disposed after this call
+ /// Message to be send
+ /// Response of the Service
+ O Call(I msg);
- ///
- /// Send a Request to a Service and wait for a Response asynchronously
- ///
- /// Message to be send
- /// representing the Response of the Service
- Task CallAsync(I msg);
+ ///
+ /// Send a Request to a Service and wait for a Response asynchronously
+ ///
+ /// Message to be send
+ /// representing the Response of the Service
+ Task CallAsync(I msg);
- ///
- /// Options used when creating the
- Task CallAsync(I msg, TaskCreationOptions options);
- }
+ ///
+ /// Options used when creating the
+ Task CallAsync(I msg, TaskCreationOptions options);
+ }
}
diff --git a/src/ros2cs/ros2cs_core/interfaces/IContext.cs b/src/ros2cs/ros2cs_core/interfaces/IContext.cs
new file mode 100644
index 00000000..44fba76a
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/interfaces/IContext.cs
@@ -0,0 +1,51 @@
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+
+namespace ROS2
+{
+ ///
+ /// ROS Context encapsulating the non-global state of an init/shutdown cycle.
+ ///
+ ///
+ /// Instances should be disposed with which may NOT automatically performed completely on garbage collection.
+ ///
+ public interface IContext : IExtendedDisposable
+ {
+ ///
+ /// Nodes associated with this instance
+ ///
+ /// Will be disposed on disposal of this instance.
+ IReadOnlyDictionary Nodes { get; }
+
+ ///
+ /// Event triggered after context shutdown before disposing nodes and finalization.
+ ///
+ event Action OnShutdown;
+
+ ///
+ /// Check if the instance is valid (has not been disposed).
+ ///
+ bool Ok();
+
+ ///
+ /// Try to create a .
+ ///
+ /// Name of the node, has to be unqiue
+ /// If the instance could be created.
+ bool TryCreateNode(string name, out INode node);
+ }
+}
diff --git a/src/ros2cs/ros2cs_core/interfaces/IExecutor.cs b/src/ros2cs/ros2cs_core/interfaces/IExecutor.cs
new file mode 100644
index 00000000..5c658a36
--- /dev/null
+++ b/src/ros2cs/ros2cs_core/interfaces/IExecutor.cs
@@ -0,0 +1,60 @@
+// Copyright 2023 ADVITEC Informatik GmbH - www.advitec.de
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.Collections.Generic;
+
+namespace ROS2
+{
+ ///
+ /// Executor controlling the processing of callbacks of a set of nodes.
+ ///
+ ///
+ /// Adding and removing Nodes has to update .
+ /// Furthermore, removing Nodes has to guarantee that the Node is ready to be disposed.
+ ///
+ public interface IExecutor: IExtendedDisposable, ICollection
+ {
+ ///
+ /// Notify the instance that some nodes changed.
+ ///
+ ///
+ /// This is used to tell the executor when entities are created or destroyed.
+ ///
+ void ScheduleRescan();
+
+ ///
+ /// Notify the instance that a node changed.
+ ///
+ /// Node which changed.
+ /// If a rescan was scheduled.
+ ///
+ bool TryScheduleRescan(INode node);
+
+ ///
+ /// Wait for scheduled rescans to complete.
+ ///
+ ///
+ /// This is used for example to ensure that removed objects
+ /// are removed from the executor before they are disposed.
+ /// Return immediately if no rescans are scheduled.
+ ///
+ void Wait();
+
+ /// positive Amount of time to wait.
+ /// Wether no timeout occurred.
+ ///
+ bool TryWait(TimeSpan timeout);
+ }
+}
diff --git a/src/ros2cs/ros2cs_core/interfaces/INode.cs b/src/ros2cs/ros2cs_core/interfaces/INode.cs
index 90b59d4c..f314af70 100644
--- a/src/ros2cs/ros2cs_core/interfaces/INode.cs
+++ b/src/ros2cs/ros2cs_core/interfaces/INode.cs
@@ -1,4 +1,4 @@
-// Copyright 2019-2021 Robotec.ai
+// Copyright 2019-2023 Robotec.ai
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -17,68 +17,79 @@
namespace ROS2
{
- /// Ros2cs node, created with Ros2cs.CreateNode and supporting use of publishers and subscribers
- /// Is automatically disposed when Ros2cs.Shutdown is called.
- /// Can also be disposed through IDisposable interface. Ros2cs.RemoveNode should be called in such case
- // TODO(adamdbrw) wrap disposing so that user does not need to handle anything
- public interface INode: IExtendedDisposable
- {
- /// Node name as given in Ros2cs.CreateNode
- string Name {get;}
+ ///
+ /// Ros2cs node, created with .
+ ///
+ ///
+ /// Instances should be disposed with which is NOT automatically performed on shutdown.
+ ///
+ public interface INode : IExtendedDisposable
+ {
+ ///
+ /// Node name as given in .
+ ///
+ /// Is unique per context while node is not disposed.
+ string Name { get; }
- /// Create a client for this node for a given topic, qos and message type
- /// Can only be called in an initialized Ros2cs state.
- /// Topic for the client. Naming restrictions of ros2 apply and violation results in an exception
- /// Quality of Client settings. Not passing this parameter will result in default settings
- /// Client for the topic, which can be used to client messages
- Client CreateClient(string topic, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new();
+ ///
+ /// Context containing this node.
+ ///
+ IContext Context { get; }
- /// Remove a client
- /// Note that this does not call Dispose on Client
- /// Client created with earlier CreateClient call
- /// Whether removal actually took place. Safe to ignore
- bool RemoveClient(IClientBase client);
+ ///
+ /// Executor handling callbacks of this node, may be null.
+ ///
+ ///
+ /// Users have to guarantee that a node is associated with at most one executor at any given time
+ /// to prevent undefined behaviour when multithreading is used.
+ /// It is recommended to not set this property directly and leave this task to the executor.
+ ///
+ IExecutor Executor { get; set; }
- /// Create a service for this node for a given topic, callback, qos and message type
- /// Can only be called in an initialized Ros2cs state.
- /// Topic to service to. Naming restrictions of ros2 apply and violation results in an exception
- /// Action to be called when message is received (through Spin or SpinOnce). Provide a lambda or a method
- /// Quality of Service settings. Not passing this parameter will result in default settings
- /// Service for the topic
- Service CreateService(string topic, Func callback, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new();
+ /// Create a publisher for this node for a given topic, qos and message type
+ /// Topic for the publisher. Naming restrictions of ros2 apply and violation results in an exception
+ /// Quality of Service settings. Not passing this parameter will result in default settings
+ /// Publisher for the topic, which can be used to publish messages
+ IPublisher CreatePublisher(string topic, QualityOfServiceProfile qos = null) where T : Message, new();
- /// Remove a service
- /// Note that this does not call Dispose on Service
- /// Service created with earlier CreateService call
- /// Whether removal actually took place. Safe to ignore
- bool RemoveService(IServiceBase service);
+ ///
+ /// Publishers created on this node.
+ ///
+ IReadOnlyCollection Publishers { get; }
- /// Create a publisher for this node for a given topic, qos and message type
- /// Can only be called in an initialized Ros2cs state.
- /// Topic for the publisher. Naming restrictions of ros2 apply and violation results in an exception
- /// Quality of Service settings. Not passing this parameter will result in default settings
- /// Publisher for the topic, which can be used to publish messages
- Publisher CreatePublisher(string topic, QualityOfServiceProfile qos = null) where T : Message, new();
+ /// Create a subscription for this node for a given topic, callback, qos and message type
+ /// Topic to subscribe to. Naming restrictions of ros2 apply and violation results in an exception
+ /// Action to be called when message is received (through Spin or SpinOnce). Provide a lambda or a method
+ /// Quality of Service settings. Not passing this parameter will result in default settings
+ /// Subscription for the topic
+ ISubscription CreateSubscription(string topic, Action callback, QualityOfServiceProfile qos = null) where T : Message, new();
- /// Create a subscription for this node for a given topic, callback, qos and message type
- /// Can only be called in an initialized Ros2cs state.
- /// Topic to subscribe to. Naming restrictions of ros2 apply and violation results in an exception
- /// Action to be called when message is received (through Spin or SpinOnce). Provide a lambda or a method
- /// Quality of Service settings. Not passing this parameter will result in default settings
- /// Subscription for the topic
- Subscription CreateSubscription(string topic, Action callback, QualityOfServiceProfile qos = null) where T : Message, new();
+ ///
+ /// Subscriptions created on this node.
+ ///
+ IReadOnlyCollection Subscriptions { get; }
- /// Remove a publisher
- /// Note that this does not call Dispose on Publisher
- /// Publisher created with earlier CreatePublisher call
- /// Whether removal actually took place. Safe to ignore
- bool RemovePublisher(IPublisherBase publisher);
+ /// Create a client for this node for a given topic, qos and message type
+ /// Topic for the client. Naming restrictions of ros2 apply and violation results in an exception
+ /// Quality of Client settings. Not passing this parameter will result in default settings
+ /// Client for the topic, which can be used to client messages
+ IClient CreateClient(string topic, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new();
- /// Remove a subscription
- /// Note that this does not call Dispose on Subscription. If the caller also does not own
- /// the subscription, it can be garbage collected. You can also call Dispose after calling this
- /// Subscription created with earlier CreateSubscription call
- /// Whether removal actually took place. Safe to ignore
- bool RemoveSubscription(ISubscriptionBase subscription);
- }
+ ///
+ /// Clients created on this node.
+ ///
+ IReadOnlyCollection Clients { get; }
+
+ /// Create a service for this node for a given topic, callback, qos and message type
+ /// Topic to service to. Naming restrictions of ros2 apply and violation results in an exception
+ /// Action to be called when message is received. Provide a lambda or a method
+ /// Quality of Service settings. Not passing this parameter will result in default settings
+ /// Service for the topic
+ IService CreateService(string topic, Func callback, QualityOfServiceProfile qos = null) where I : Message, new() where O : Message, new();
+
+ ///