Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions libdd-profiling/benches/add_samples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,20 @@ pub fn bench_add_sample_vs_add2(c: &mut Criterion) {
let functions = dict.functions();
let thread_id = get_current_thread_id();
let thread_id_key: StringId2 = strings.try_insert("thread id").unwrap().into();
let labels_api = vec![api::Label {
key: "thread id",
str: "",
num: thread_id,
num_unit: "",
}];
let labels_api = vec![
api::Label {
key: "thread id",
str: "",
num: thread_id,
num_unit: "",
},
api::Label {
key: "thread name",
str: "this thread",
num: 0,
num_unit: "",
},
];

let frames2 = frames.map(|f| {
let set_id = functions
Expand All @@ -128,6 +136,23 @@ pub fn bench_add_sample_vs_add2(c: &mut Criterion) {
});
let dict = profiling::profiles::collections::Arc::try_new(dict).unwrap();

c.bench_function("profile_add_sample_timestamped_x1000", |b| {
b.iter(|| {
let mut profile = profiling::internal::Profile::try_new(&sample_types, None).unwrap();
let (locations, values) = make_stack_api(frames.as_slice());
for i in 0..1000 {
let sample = api::Sample {
locations: locations.clone(),
values: &values,
labels: labels_api.clone(),
};
let ts = std::num::NonZeroI64::new(i + 1);
black_box(profile.try_add_sample(sample, ts)).unwrap();
}
black_box(profile.only_for_testing_num_aggregated_samples())
})
});

c.bench_function("profile_add_sample_frames_x1000", |b| {
b.iter(|| {
let mut profile = profiling::internal::Profile::try_new(&sample_types, None).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use crate::collections::identifiable::Id;
use crate::internal::Timestamp;
use crate::profiles::{DefaultObservationCodec as DefaultCodec, ObservationCodec};
use byteorder::{NativeEndian, ReadBytesExt};
use std::io::{self, Write};
use std::io::{self, BufWriter, Write};

pub type TimestampedObservations = TimestampedObservationsImpl<DefaultCodec>;

pub struct TimestampedObservationsImpl<C: ObservationCodec> {
compressed_timestamped_data: C::Encoder,
compressed_timestamped_data: BufWriter<C::Encoder>,
sample_types_len: usize,
}

Expand All @@ -40,10 +40,10 @@ impl<C: ObservationCodec> TimestampedObservationsImpl<C> {

pub fn try_new(sample_types_len: usize) -> io::Result<Self> {
Ok(Self {
compressed_timestamped_data: C::new_encoder(
Self::DEFAULT_BUFFER_SIZE,
Self::MAX_CAPACITY,
)?,
compressed_timestamped_data: BufWriter::with_capacity(
C::recommended_input_buf_size(),
C::new_encoder(Self::DEFAULT_BUFFER_SIZE, Self::MAX_CAPACITY)?,
),
sample_types_len,
})
}
Expand Down Expand Up @@ -74,8 +74,12 @@ impl<C: ObservationCodec> TimestampedObservationsImpl<C> {
}

pub fn try_into_iter(self) -> io::Result<TimestampedObservationsIterImpl<C>> {
let encoder = self
.compressed_timestamped_data
.into_inner()
.map_err(|e| e.into_error())?;
Ok(TimestampedObservationsIterImpl {
decoder: C::encoder_into_decoder(self.compressed_timestamped_data)?,
decoder: C::encoder_into_decoder(encoder)?,
sample_types_len: self.sample_types_len,
})
}
Expand Down
10 changes: 10 additions & 0 deletions libdd-profiling/src/profiles/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ pub trait ObservationCodec {

fn new_encoder(size_hint: usize, max_capacity: usize) -> io::Result<Self::Encoder>;
fn encoder_into_decoder(encoder: Self::Encoder) -> io::Result<Self::Decoder>;

/// Returns the recommended input buffer size for the encoder.
/// Used to size the `BufWriter` that wraps the encoder.
fn recommended_input_buf_size() -> usize {
0
}
}

#[allow(unused)]
Expand Down Expand Up @@ -181,6 +187,10 @@ impl ObservationCodec for ZstdObservationCodec {
Err((_enc, error)) => Err(error),
}
}

fn recommended_input_buf_size() -> usize {
zstd::Encoder::<SizeRestrictedBuffer>::recommended_input_size()
}
}

#[cfg(not(miri))]
Expand Down
Loading