diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs index 2e933df514..589130a47c 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_client.rs @@ -26,7 +26,7 @@ use crate::api::{ protocol_implementations::posix::protocol_id::POSIX_PROTOCOL_ID, }; -/// Client factory implementation for particular shared memory protocol +/// Client factory implementation for POSIX Shared Memory protocol #[zenoh_macros::unstable_doc] #[derive(Debug)] pub struct PosixShmClient; diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_buddy.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_buddy.rs index 79a3e7460f..c561030ae2 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_buddy.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_buddy.rs @@ -110,16 +110,9 @@ impl ShmProviderBackend for PosixShmProviderBackendBuddy { fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult { tracing::trace!("PosixShmProviderBackendBuddy::alloc({:?})", layout); - let alloc_layout = unsafe { - Layout::from_size_align_unchecked( - layout.size().get(), - layout.alignment().get_alignment_value().get(), - ) - }; - let alloc = { let mut lock = zlock!(self.heap); - lock.alloc(alloc_layout) + lock.alloc(layout.into()) }; match alloc { diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_talc.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_talc.rs index abae36d0ed..d8b7b6bfc4 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_talc.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/posix_shm_provider_backend_talc.rs @@ -113,16 +113,9 @@ impl ShmProviderBackend for PosixShmProviderBackendTalc { fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult { tracing::trace!("PosixShmProviderBackendTalc::alloc({:?})", layout); - let alloc_layout = unsafe { - Layout::from_size_align_unchecked( - layout.size().get(), - layout.alignment().get_alignment_value().get(), - ) - }; - let alloc = { let mut lock = zlock!(self.talc); - unsafe { lock.malloc(alloc_layout) } + unsafe { lock.malloc(layout.into()) } }; match alloc { diff --git a/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs b/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs index cff39f921a..8165db96aa 100644 --- a/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs +++ b/commons/zenoh-shm/src/api/protocol_implementations/posix/protocol_id.rs @@ -14,6 +14,6 @@ use crate::api::common::types::ProtocolID; -/// Protocol identifier to use when creating ShmProvider +/// Protocol identifier to use for POSIX Protocol #[zenoh_macros::unstable_doc] pub const POSIX_PROTOCOL_ID: ProtocolID = 0; diff --git a/commons/zenoh-shm/src/api/provider/memory_layout.rs b/commons/zenoh-shm/src/api/provider/memory_layout.rs index 2a5aa5e836..2352ec2f45 100644 --- a/commons/zenoh-shm/src/api/provider/memory_layout.rs +++ b/commons/zenoh-shm/src/api/provider/memory_layout.rs @@ -24,6 +24,18 @@ pub struct MemoryLayout { alignment: AllocAlignment, } +impl From<&MemoryLayout> for core::alloc::Layout { + fn from(value: &MemoryLayout) -> Self { + // SAFERY: this is safe because `MemoryLayout`'s size and alignment are already checked + unsafe { + core::alloc::Layout::from_size_align_unchecked( + value.size().get(), + value.alignment().get_alignment_value().get(), + ) + } + } +} + impl From<&MemoryLayout> for MemoryLayout { fn from(other: &MemoryLayout) -> Self { *other diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5c85abd228..81fb92e9fc 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -28,7 +28,7 @@ publish = false [features] default = ["zenoh-ext/default", "zenoh/default"] -shared-memory = ["zenoh/shared-memory"] +shared-memory = ["talc", "zenoh-result", "zenoh-shm", "zenoh/shared-memory"] unstable = ["zenoh/unstable"] [dependencies] @@ -36,11 +36,14 @@ clap = { workspace = true, features = ["derive"] } futures = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } +talc = { workspace = true, optional = true } tokio = { workspace = true, features = ["io-std", "rt-multi-thread", "time"] } zenoh = { workspace = true, default-features = false } zenoh-ext = { workspace = true, default-features = false, features = [ "unstable", ] } +zenoh-result = { workspace = true, optional = true } +zenoh-shm = { workspace = true, optional = true } [dev-dependencies] rand = { workspace = true, features = ["default"] } @@ -79,7 +82,7 @@ path = "examples/z_pub.rs" [[example]] name = "z_pub_shm" -path = "examples/z_pub_shm.rs" +path = "examples/shm/z_pub_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -88,7 +91,7 @@ path = "examples/z_sub.rs" [[example]] name = "z_sub_shm" -path = "examples/z_sub_shm.rs" +path = "examples/shm/z_sub_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -105,7 +108,7 @@ path = "examples/z_queryable.rs" [[example]] name = "z_queryable_shm" -path = "examples/z_queryable_shm.rs" +path = "examples/shm/z_queryable_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -118,7 +121,7 @@ path = "examples/z_get.rs" [[example]] name = "z_get_shm" -path = "examples/z_get_shm.rs" +path = "examples/shm/z_get_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -147,7 +150,7 @@ path = "examples/z_sub_thr.rs" [[example]] name = "z_pub_shm_thr" -path = "examples/z_pub_shm_thr.rs" +path = "examples/shm/z_pub_shm_thr.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -156,7 +159,7 @@ path = "examples/z_ping.rs" [[example]] name = "z_ping_shm" -path = "examples/z_ping_shm.rs" +path = "examples/shm/z_ping_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] @@ -165,15 +168,25 @@ path = "examples/z_pong.rs" [[example]] name = "z_alloc_shm" -path = "examples/z_alloc_shm.rs" +path = "examples/shm/z_alloc_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] name = "z_bytes_shm" -path = "examples/z_bytes_shm.rs" +path = "examples/shm/z_bytes_shm.rs" required-features = ["shared-memory", "unstable"] [[example]] name = "z_posix_shm_provider" -path = "examples/z_posix_shm_provider.rs" +path = "examples/shm/z_posix_shm_provider.rs" +required-features = ["shared-memory", "unstable"] + +[[example]] +name = "z_custom_shm_provider" +path = "examples/shm/z_custom_shm_provider.rs" +required-features = ["shared-memory", "unstable"] + +[[example]] +name = "z_shm_provider" +path = "examples/shm/z_shm_provider.rs" required-features = ["shared-memory", "unstable"] diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/shm/z_alloc_shm.rs similarity index 86% rename from examples/examples/z_alloc_shm.rs rename to examples/examples/shm/z_alloc_shm.rs index 4c4fb1dcb8..2a95c9b047 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/shm/z_alloc_shm.rs @@ -32,25 +32,9 @@ async fn main() { async fn run() -> zenoh::Result<()> { // Create an SHM provider - let provider = { - // Option 1: simple way to create default ShmProvider initialized with default-configured - { - // SHM backend (PosixShmProviderBackend) - let _simple = - ShmProviderBuilder::default_backend(MemoryLayout::try_from(42).unwrap()).wait()?; - } - - // Option 2: comprehensive ShmProvider creation - { - // Create specific backed - // NOTE: For extended PosixShmProviderBackend API please check z_posix_shm_provider.rs - let comprehensive = - PosixShmProviderBackend::builder((65536, AllocAlignment::ALIGN_8_BYTES)).wait()?; - - // ...and an SHM provider with specified backend - ShmProviderBuilder::backend(comprehensive).wait() - } - }; + // Note: for SHM provider creation API, please see z_shm_provider.rs + let provider = + ShmProviderBuilder::default_backend((65536, AllocAlignment::ALIGN_8_BYTES)).wait()?; // Allocate SHM buffer diff --git a/examples/examples/z_bytes_shm.rs b/examples/examples/shm/z_bytes_shm.rs similarity index 100% rename from examples/examples/z_bytes_shm.rs rename to examples/examples/shm/z_bytes_shm.rs diff --git a/examples/examples/shm/z_custom_shm_provider.rs b/examples/examples/shm/z_custom_shm_provider.rs new file mode 100644 index 0000000000..3c310c6730 --- /dev/null +++ b/examples/examples/shm/z_custom_shm_provider.rs @@ -0,0 +1,262 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + alloc::Layout, + num::NonZeroUsize, + ptr::NonNull, + slice, + sync::{Arc, Mutex}, + time::Duration, +}; + +use talc::{ErrOnOom, Talc}; +use zenoh::{ + shm::{ + AllocAlignment, AllocatedChunk, BlockOn, ChunkAllocResult, ChunkDescriptor, ChunkID, + GarbageCollect, MemoryLayout, ProtocolID, PtrInSegment, SegmentID, ShmClient, + ShmClientStorage, ShmProviderBackend, ShmProviderBuilder, ShmSegment, WithProtocolID, + ZAllocError, ZLayoutError, + }, + Config, Wait, +}; +use zenoh_examples::shm::print_sample_info; +use zenoh_result::ZResult; +use zenoh_shm::posix_shm::array::ArrayInSHM; + +/// Protocol identifier to use when creating ShmProvider +pub const EXAMPLE_PROTOCOL_ID: ProtocolID = 9999; + +#[derive(Debug)] +pub(crate) struct PosixShmSegment { + segment: ArrayInSHM, +} + +impl PosixShmSegment { + pub(crate) fn create(alloc_size: NonZeroUsize) -> ZResult { + let segment = ArrayInSHM::create(alloc_size)?; + Ok(Self { segment }) + } + + pub(crate) fn open(id: SegmentID) -> ZResult { + let segment = ArrayInSHM::open(id)?; + Ok(Self { segment }) + } + + pub(crate) fn allocated_chunk( + self: Arc, + buf: NonNull, + layout: &MemoryLayout, + ) -> AllocatedChunk { + AllocatedChunk { + descriptor: ChunkDescriptor::new( + self.segment.id(), + unsafe { self.segment.index(buf.as_ptr()) }, + layout.size(), + ), + data: PtrInSegment::new(buf.as_ptr(), self), + } + } +} + +impl ShmSegment for PosixShmSegment { + fn map(&self, chunk: ChunkID) -> ZResult<*mut u8> { + Ok(unsafe { self.segment.elem_mut(chunk) }) + } +} + +/// Client factory implementation for our shared memory protocol +#[derive(Debug)] +pub struct ExampleShmClient; + +impl WithProtocolID for ExampleShmClient { + fn id(&self) -> ProtocolID { + EXAMPLE_PROTOCOL_ID + } +} + +impl ShmClient for ExampleShmClient { + /// Attach to particular shared memory segment + fn attach(&self, segment: SegmentID) -> ZResult> { + Ok(Arc::new(PosixShmSegment::open(segment)?)) + } +} + +/// An example backend based on POSIX shared memory. +pub struct ExampleShmProviderBackend { + segment: Arc, + talc: Mutex>, + alignment: AllocAlignment, +} + +impl ExampleShmProviderBackend { + pub fn new(layout: &MemoryLayout) -> ZResult { + let segment = Arc::new(PosixShmSegment::create(layout.size())?); + + let talc = { + let ptr = unsafe { segment.segment.elem_mut(0) }; + let mut talc = Talc::new(ErrOnOom); + unsafe { + talc.claim(slice::from_raw_parts_mut(ptr, layout.size().get()).into()) + .map_err(|_| "Error initializing Talc backend!")?; + } + talc + }; + + println!( + "Created PosixShmProviderBackendTalc id {}, layout {:?}", + segment.segment.id(), + layout + ); + + Ok(Self { + segment, + talc: Mutex::new(talc), + alignment: layout.alignment(), + }) + } +} + +impl WithProtocolID for ExampleShmProviderBackend { + fn id(&self) -> ProtocolID { + EXAMPLE_PROTOCOL_ID + } +} + +impl ShmProviderBackend for ExampleShmProviderBackend { + fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult { + let alloc = { + let mut lock = self.talc.lock().unwrap(); + unsafe { lock.malloc(layout.into()) } + }; + + match alloc { + Ok(buf) => Ok(self.segment.clone().allocated_chunk(buf, layout)), + Err(_) => Err(ZAllocError::OutOfMemory), + } + } + + fn free(&self, chunk: &ChunkDescriptor) { + let alloc_layout = unsafe { + Layout::from_size_align_unchecked( + chunk.len.get(), + self.alignment.get_alignment_value().get(), + ) + }; + + let ptr = unsafe { self.segment.segment.elem_mut(chunk.chunk) }; + + unsafe { + self.talc + .lock() + .unwrap() + .free(NonNull::new_unchecked(ptr), alloc_layout) + }; + } + + fn defragment(&self) -> usize { + 0 + } + + fn available(&self) -> usize { + 0 + } + + fn layout_for(&self, layout: MemoryLayout) -> Result { + layout.extend(self.alignment) + } +} + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let key_expr = "examples/custom_shm_provider"; + + tokio::task::spawn(publisher_session(key_expr)); + tokio::task::spawn(subscriber_session(key_expr)); + + println!("Press CTRL-C to quit..."); + tokio::time::sleep(Duration::from_secs(u64::MAX)).await; +} + +async fn publisher_session(key_expr: &'static str) { + let layout: MemoryLayout = 4096.try_into().unwrap(); + + println!("Creating example SHM provider..."); + + let backend = ExampleShmProviderBackend::new(&layout).unwrap(); + let provider = ShmProviderBuilder::backend(backend).wait(); + + println!("Opening session..."); + let session = zenoh::open(Config::default()).await.unwrap(); + + println!("Declaring Publisher on '{}'...", key_expr); + let publisher = session.declare_publisher(key_expr).await.unwrap(); + + let payload = "SHM"; + let mut idx = 0u64; + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + // We reserve a small space at the beginning of the buffer to include the iteration index + // of the write. This is simply to have the same format as zn_pub. + let prefix = format!("[{idx:4}] "); + let prefix_len = prefix.len(); + let slice_len = prefix_len + payload.len(); + + let mut buffer = provider + .alloc(slice_len) + .with_policy::>() + .await + .unwrap(); + + buffer[0..prefix_len].copy_from_slice(prefix.as_bytes()); + buffer[prefix_len..slice_len].copy_from_slice(payload.as_bytes()); + + // Write the data + println!( + "Put SHM Data ('{}': '{}')", + key_expr, + String::from_utf8_lossy(&buffer[0..slice_len]) + ); + publisher.put(buffer).await.unwrap(); + + idx += 1; + } +} + +async fn subscriber_session(key_expr: &'static str) { + let custom_shm_client_storage = Arc::new( + ShmClientStorage::builder() + .with_client(Arc::new(ExampleShmClient)) + .build(), + ); + + println!("Opening session..."); + let session = zenoh::open(Config::default()) + .with_shm_clients(custom_shm_client_storage) + .await + .unwrap(); + + println!("Declaring Subscriber on '{}'...", key_expr); + let _subscriber = session + .declare_subscriber(key_expr) + .callback(print_sample_info) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(u64::MAX)).await; +} diff --git a/examples/examples/z_get_shm.rs b/examples/examples/shm/z_get_shm.rs similarity index 100% rename from examples/examples/z_get_shm.rs rename to examples/examples/shm/z_get_shm.rs diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/shm/z_ping_shm.rs similarity index 100% rename from examples/examples/z_ping_shm.rs rename to examples/examples/shm/z_ping_shm.rs diff --git a/examples/examples/z_posix_shm_provider.rs b/examples/examples/shm/z_posix_shm_provider.rs similarity index 100% rename from examples/examples/z_posix_shm_provider.rs rename to examples/examples/shm/z_posix_shm_provider.rs diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/shm/z_pub_shm.rs similarity index 100% rename from examples/examples/z_pub_shm.rs rename to examples/examples/shm/z_pub_shm.rs diff --git a/examples/examples/z_pub_shm_thr.rs b/examples/examples/shm/z_pub_shm_thr.rs similarity index 100% rename from examples/examples/z_pub_shm_thr.rs rename to examples/examples/shm/z_pub_shm_thr.rs diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/shm/z_queryable_shm.rs similarity index 100% rename from examples/examples/z_queryable_shm.rs rename to examples/examples/shm/z_queryable_shm.rs diff --git a/examples/examples/shm/z_shm_provider.rs b/examples/examples/shm/z_shm_provider.rs new file mode 100644 index 0000000000..c260c5afab --- /dev/null +++ b/examples/examples/shm/z_shm_provider.rs @@ -0,0 +1,37 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::{ + shm::{AllocAlignment, PosixShmProviderBackendBinaryHeap, ShmProviderBuilder}, + Wait, +}; + +fn main() { + // Option 1: simple way to create default ShmProvider + { + // SHM backend (PosixShmProviderBackend) + let _simple = ShmProviderBuilder::default_backend(4096).wait().unwrap(); + } + + // Option 2: ShmProvider with specific backend + { + // Create specific backed + let backend = + PosixShmProviderBackendBinaryHeap::builder((4096, AllocAlignment::ALIGN_8_BYTES)) + .wait() + .unwrap(); + + // ...and an SHM provider with specified backend + let _specific = ShmProviderBuilder::backend(backend).wait(); + } +} diff --git a/examples/examples/shm/z_sub_shm.rs b/examples/examples/shm/z_sub_shm.rs new file mode 100644 index 0000000000..66a28b6ca6 --- /dev/null +++ b/examples/examples/shm/z_sub_shm.rs @@ -0,0 +1,49 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::{config::Config, key_expr::KeyExpr}; +use zenoh_examples::{shm::print_sample_info, CommonArgs}; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh::init_log_from_env_or("error"); + + let (config, key_expr) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Declaring Subscriber on '{}'...", &key_expr); + let subscriber = session.declare_subscriber(&key_expr).await.unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(sample) = subscriber.recv_async().await { + print_sample_info(sample); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct SubArgs { + #[arg(short, long, default_value = "demo/example/**")] + /// The Key Expression to subscribe to. + key: KeyExpr<'static>, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>) { + let args = SubArgs::parse(); + (args.common.into(), args.key) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs deleted file mode 100644 index 2d9c24f079..0000000000 --- a/examples/examples/z_sub_shm.rs +++ /dev/null @@ -1,107 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::borrow::Cow; - -use clap::Parser; -#[cfg(all(feature = "shared-memory", feature = "unstable"))] -use zenoh::shm::zshmmut; -use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr}; -use zenoh_examples::CommonArgs; - -#[tokio::main] -async fn main() { - // Initiate logging - zenoh::init_log_from_env_or("error"); - - let (config, key_expr) = parse_args(); - - println!("Opening session..."); - let session = zenoh::open(config).await.unwrap(); - - println!("Declaring Subscriber on '{}'...", &key_expr); - let subscriber = session.declare_subscriber(&key_expr).await.unwrap(); - - println!("Press CTRL-C to quit..."); - while let Ok(mut sample) = subscriber.recv_async().await { - let kind = sample.kind(); - let key_str = sample.key_expr().as_str().to_owned(); - - // Print overall payload information - let (payload_type, payload) = handle_bytes(sample.payload_mut()); - print!(">> [Subscriber] Received {kind} ('{key_str}': '{payload}') [{payload_type}] ",); - - // Print attachment information - if let Some(att) = sample.attachment_mut() { - let (attachment_type, attachment) = handle_bytes(att); - print!(" ({attachment_type}: {attachment})"); - } - - println!(); - } -} - -#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] -struct SubArgs { - #[arg(short, long, default_value = "demo/example/**")] - /// The Key Expression to subscribe to. - key: KeyExpr<'static>, - #[command(flatten)] - common: CommonArgs, -} - -fn parse_args() -> (Config, KeyExpr<'static>) { - let args = SubArgs::parse(); - (args.common.into(), args.key) -} - -fn handle_bytes(bytes: &mut ZBytes) -> (&str, Cow<'_, str>) { - // Determine buffer type for indication purpose - let bytes_type = { - // if Zenoh is built without SHM support, the only buffer type it can receive is RAW - #[cfg(not(feature = "shared-memory"))] - { - "RAW" - } - - // if Zenoh is built with SHM support but without SHM API (that is unstable), it can - // receive buffers of any type, but there is no way to detect the buffer type - #[cfg(all(feature = "shared-memory", not(feature = "unstable")))] - { - "UNKNOWN" - } - - // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type - #[cfg(all(feature = "shared-memory", feature = "unstable"))] - match bytes.as_shm_mut() { - // try to mutate SHM buffer to get it's mutability property - Some(shm) => match <&mut zshmmut>::try_from(shm) { - Ok(_shm_mut) => "SHM (MUT)", - Err(_) => "SHM (IMMUT)", - }, - None => "RAW", - } - }; - - // In order to indicate the real underlying buffer type the code above is written ^^^ - // Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently. - // In other words, the common application compiled with "shared-memory" feature will be able to - // handle incoming SHM data without any changes in the application code. - // - // Refer to z_bytes.rs to see how to deserialize different types of message - let bytes_string = bytes - .try_to_string() - .unwrap_or_else(|e| e.to_string().into()); - - (bytes_type, bytes_string) -} diff --git a/examples/src/lib.rs b/examples/src/lib.rs index ab9d212dbe..6b611d467c 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -94,3 +94,64 @@ impl From<&CommonArgs> for Config { config } } + +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +pub mod shm { + pub fn print_sample_info(mut sample: zenoh::sample::Sample) { + let kind = sample.kind(); + let key_str = sample.key_expr().as_str().to_owned(); + + // Print overall payload information + let (payload_type, payload) = handle_bytes(sample.payload_mut()); + print!(">> [Subscriber] Received {kind} ('{key_str}': '{payload}') [{payload_type}] ",); + + // Print attachment information + if let Some(att) = sample.attachment_mut() { + let (attachment_type, attachment) = handle_bytes(att); + print!(" ({attachment_type}: {attachment})"); + } + + println!(); + } + + fn handle_bytes(bytes: &mut zenoh::bytes::ZBytes) -> (&str, std::borrow::Cow<'_, str>) { + // Determine buffer type for indication purpose + let bytes_type = { + // if Zenoh is built without SHM support, the only buffer type it can receive is RAW + #[cfg(not(feature = "shared-memory"))] + { + "RAW" + } + + // if Zenoh is built with SHM support but without SHM API (that is unstable), it can + // receive buffers of any type, but there is no way to detect the buffer type + #[cfg(all(feature = "shared-memory", not(feature = "unstable")))] + { + "UNKNOWN" + } + + // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type + #[cfg(all(feature = "shared-memory", feature = "unstable"))] + match bytes.as_shm_mut() { + // try to mutate SHM buffer to get it's mutability property + Some(shm) => match <&mut zenoh::shm::zshmmut>::try_from(shm) { + Ok(_shm_mut) => "SHM (MUT)", + Err(_) => "SHM (IMMUT)", + }, + None => "RAW", + } + }; + + // In order to indicate the real underlying buffer type the code above is written ^^^ + // Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently. + // In other words, the common application compiled with "shared-memory" feature will be able to + // handle incoming SHM data without any changes in the application code. + // + // Refer to z_bytes.rs to see how to deserialize different types of message + let bytes_string = bytes + .try_to_string() + .unwrap_or_else(|e| e.to_string().into()); + + (bytes_type, bytes_string) + } +}