Skip to content

Commit 70f6f33

Browse files
authored
test: create framework for testing memory (#4921)
Adds a new integration test `resource_test` which contains a utility for making assertions about memory use with a span. This needs to be in a separate integration tests because it hooks into the global memory allocator, and doing that for all tests would probably make them quite slow. Here's an example of how this can be used: ```rust let alloc_tracker = AllocTracker::new(); { let _guard = alloc_tracker.enter(); // Do something that allocates, even in async tasks } let stats = alloc_tracker.stats(); assert_lt!(stats.max_bytes_allocated, 100_000_000, "Used more than 100MB"); assert_eq!(stats.total_bytes_allocated, stats.total_bytes_deallocated, "No memory leaked"); ``` Closes #4761
1 parent d8f9dcb commit 70f6f33

File tree

8 files changed

+301
-1
lines changed

8 files changed

+301
-1
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/lance-datafusion/src/datagen.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33

44
use std::sync::Arc;
55

6+
use arrow_array::RecordBatchReader;
67
use datafusion::{
78
execution::SendableRecordBatchStream,
89
physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan},
910
};
1011
use datafusion_common::DataFusionError;
1112
use futures::TryStreamExt;
12-
use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
13+
use lance_core::Error;
14+
use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RoundingBehavior, RowCount};
1315

1416
use crate::exec::OneShotExec;
1517

@@ -20,6 +22,13 @@ pub trait DatafusionDatagenExt {
2022
num_batches: BatchCount,
2123
) -> SendableRecordBatchStream;
2224

25+
fn into_df_stream_bytes(
26+
self,
27+
batch_size: ByteCount,
28+
num_batches: BatchCount,
29+
rounding_behavior: RoundingBehavior,
30+
) -> Result<SendableRecordBatchStream, Error>;
31+
2332
fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan>;
2433
}
2534

@@ -34,6 +43,18 @@ impl DatafusionDatagenExt for BatchGeneratorBuilder {
3443
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
3544
}
3645

46+
fn into_df_stream_bytes(
47+
self,
48+
batch_size: ByteCount,
49+
num_batches: BatchCount,
50+
rounding_behavior: RoundingBehavior,
51+
) -> Result<SendableRecordBatchStream, Error> {
52+
let stream = self.into_reader_bytes(batch_size, num_batches, rounding_behavior)?;
53+
let schema = stream.schema();
54+
let stream = futures::stream::iter(stream).map_err(DataFusionError::from);
55+
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
56+
}
57+
3758
fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan> {
3859
let stream = self.into_df_stream(batch_size, num_batches);
3960
Arc::new(OneShotExec::new(stream))

rust/lance/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ tempfile.workspace = true
105105
test-log.workspace = true
106106
tracing-chrome = "0.7.1"
107107
rstest = { workspace = true }
108+
tracking-allocator = { version = "0.4", features = ["tracing-compat"] }
108109
# For S3 / DynamoDB tests
109110
aws-config = { workspace = true }
110111
aws-sdk-s3 = { workspace = true }

rust/lance/tests/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Tests for memory and IO usage.
2+
3+
## Debugging memory usage
4+
5+
Once you've identified a test that is using too much memory, you can use
6+
bytehound to find the source of the memory usage. (Note: we need to run
7+
bytehound on the binary, not on cargo, so we have to extract the test binary path.)
8+
9+
The `RUST_ALLOC_TIMINGS` environment variable tells the tracking allocator
10+
to logs the start and end of each allocation tracking session, which makes it
11+
easier to correlate the bytehound output with the code.
12+
13+
```shell
14+
TEST_BINARY=$(cargo test --test resource_tests --no-run 2>&1 | tail -n1 | sed -n 's/.*(\([^)]*\)).*/\1/p')
15+
LD_PRELOAD=/usr/local/lib/libbytehound.so \
16+
RUST_ALLOC_TIMINGS=true \
17+
$TEST_BINARY resource_test::write::test_memory_usage_write \
18+
bytehound server memory-profiling_*.dat
19+
```
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
mod utils;
4+
mod write;
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
use all_asserts::assert_ge;
4+
use std::alloc::System;
5+
use std::collections::HashMap;
6+
use std::sync::{Arc, LazyLock, Mutex, Once};
7+
use tracing::Instrument;
8+
use tracing_subscriber::layer::SubscriberExt;
9+
use tracing_subscriber::Registry;
10+
use tracking_allocator::{
11+
AllocationGroupId, AllocationGroupToken, AllocationLayer, AllocationRegistry,
12+
AllocationTracker, Allocator,
13+
};
14+
15+
#[global_allocator]
16+
static GLOBAL: Allocator<System> = Allocator::system();
17+
18+
#[derive(Default, Clone, Debug)]
19+
pub struct AllocStats {
20+
pub max_bytes_allocated: isize,
21+
pub total_bytes_allocated: isize,
22+
pub total_bytes_deallocated: isize,
23+
pub total_allocations: usize,
24+
pub total_deallocations: usize,
25+
}
26+
27+
impl AllocStats {
28+
pub fn net_bytes_allocated(&self) -> isize {
29+
self.total_bytes_allocated - self.total_bytes_deallocated
30+
}
31+
}
32+
33+
static GLOBAL_STATS: LazyLock<Arc<Mutex<HashMap<AllocationGroupId, AllocStats>>>> =
34+
std::sync::LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
35+
36+
struct MemoryTracker;
37+
38+
impl AllocationTracker for MemoryTracker {
39+
fn allocated(
40+
&self,
41+
_addr: usize,
42+
object_size: usize,
43+
_wrapped_size: usize,
44+
group_id: AllocationGroupId,
45+
) {
46+
if group_id == AllocationGroupId::ROOT {
47+
// We don't track root allocations
48+
return;
49+
}
50+
let mut guard = GLOBAL_STATS.lock().unwrap();
51+
let stats = guard.entry(group_id).or_default();
52+
stats.total_bytes_allocated += object_size as isize;
53+
stats.total_allocations += 1;
54+
stats.max_bytes_allocated = stats.max_bytes_allocated.max(stats.net_bytes_allocated());
55+
}
56+
57+
fn deallocated(
58+
&self,
59+
_addr: usize,
60+
object_size: usize,
61+
_wrapped_size: usize,
62+
source_group_id: AllocationGroupId,
63+
current_group_id: AllocationGroupId,
64+
) {
65+
let group_id = if source_group_id != AllocationGroupId::ROOT {
66+
source_group_id
67+
} else {
68+
current_group_id
69+
};
70+
if group_id == AllocationGroupId::ROOT {
71+
// We don't track root allocations
72+
return;
73+
}
74+
let mut guard = GLOBAL_STATS.lock().unwrap();
75+
let stats = guard.entry(group_id).or_default();
76+
stats.total_bytes_deallocated += object_size as isize;
77+
stats.total_deallocations += 1;
78+
}
79+
}
80+
81+
static INIT: Once = Once::new();
82+
83+
// The alloc tracker holds a span and an associated allocation group id.
84+
pub struct AllocTracker {
85+
group_id: AllocationGroupId,
86+
span: tracing::Span,
87+
}
88+
89+
impl AllocTracker {
90+
pub fn init() {
91+
INIT.call_once(init_memory_tracking);
92+
}
93+
94+
pub fn new() -> Self {
95+
Self::init();
96+
97+
let token = AllocationGroupToken::register().expect("failed to register token");
98+
let group_id = token.id();
99+
100+
let span = tracing::span!(tracing::Level::INFO, "AllocTracker");
101+
token.attach_to_span(&span);
102+
103+
Self { group_id, span }
104+
}
105+
106+
pub fn enter(&self) -> AllocGuard<'_> {
107+
AllocGuard::new(self)
108+
}
109+
110+
pub fn stats(self) -> AllocStats {
111+
let mut stats = GLOBAL_STATS.lock().unwrap();
112+
stats.remove(&self.group_id).unwrap_or_default()
113+
}
114+
}
115+
116+
pub struct AllocGuard<'a> {
117+
_guard: tracing::span::Entered<'a>,
118+
}
119+
120+
impl<'a> AllocGuard<'a> {
121+
#[allow(clippy::print_stderr)]
122+
pub fn new(tracker: &'a AllocTracker) -> Self {
123+
if std::env::var("RUST_ALLOC_TIMINGS").is_ok() {
124+
eprintln!("alloc:enter:{}", chrono::Utc::now().to_rfc3339());
125+
}
126+
AllocGuard {
127+
_guard: tracker.span.enter(),
128+
}
129+
}
130+
}
131+
132+
impl Drop for AllocGuard<'_> {
133+
#[allow(clippy::print_stderr)]
134+
fn drop(&mut self) {
135+
if std::env::var("RUST_ALLOC_TIMINGS").is_ok() {
136+
eprintln!("alloc:exit:{}", chrono::Utc::now().to_rfc3339());
137+
}
138+
}
139+
}
140+
141+
pub fn init_memory_tracking() {
142+
let registry = Registry::default().with(AllocationLayer::new());
143+
tracing::subscriber::set_global_default(registry)
144+
.expect("failed to install tracing subscriber");
145+
146+
let tracker = MemoryTracker;
147+
AllocationRegistry::set_global_tracker(tracker).expect("failed to set global tracker");
148+
AllocationRegistry::enable_tracking();
149+
}
150+
151+
#[test]
152+
fn check_memory_leak() {
153+
// Make sure AllocTracker can detect leaks
154+
let mut leaked = Vec::new();
155+
let tracker = AllocTracker::new();
156+
{
157+
let _guard = tracker.enter();
158+
let v = vec![0u8; 1024 * 1024];
159+
leaked.resize(1024, 0u8);
160+
drop(v);
161+
}
162+
let stats = tracker.stats();
163+
assert_eq!(stats.max_bytes_allocated, (1024 * 1024) + 1024);
164+
assert_eq!(stats.total_bytes_allocated, (1024 * 1024) + 1024);
165+
assert_eq!(stats.total_bytes_deallocated, (1024 * 1024));
166+
assert_eq!(stats.total_allocations, 2);
167+
assert_eq!(stats.net_bytes_allocated(), 1024);
168+
}
169+
170+
#[tokio::test]
171+
async fn check_test_spawn_alloc() {
172+
let tracker = AllocTracker::new();
173+
{
174+
let _guard = tracker.enter();
175+
let future1 = async {
176+
let v = vec![0u8; 256 * 1024];
177+
drop(v);
178+
};
179+
let handle = tokio::spawn(future1.in_current_span());
180+
let future2 = async {
181+
let v = vec![0u8; 512 * 1024];
182+
drop(v);
183+
};
184+
let handle2 = tokio::spawn(future2.in_current_span());
185+
handle.await.unwrap();
186+
handle2.await.unwrap();
187+
}
188+
let stats = tracker.stats();
189+
assert_eq!(stats.total_allocations, 4);
190+
assert_ge!(stats.total_bytes_allocated, 256 * 1024 + 512 * 1024);
191+
assert_ge!(stats.total_bytes_deallocated, 256 * 1024 + 512 * 1024);
192+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
use super::utils::AllocTracker;
4+
use all_asserts::assert_le;
5+
use arrow_schema::DataType;
6+
use lance::dataset::InsertBuilder;
7+
use lance_datafusion::datagen::DatafusionDatagenExt;
8+
use lance_datagen::{array, gen_batch, BatchCount, ByteCount, RoundingBehavior};
9+
10+
#[tokio::test]
11+
async fn test_insert_memory() {
12+
// Create a stream of 100MB of data, in batches
13+
let batch_size = 10 * 1024 * 1024; // 10MB
14+
let num_batches = BatchCount::from(10);
15+
let data = gen_batch()
16+
.col("a", array::rand_type(&DataType::Int32))
17+
.into_df_stream_bytes(
18+
ByteCount::from(batch_size),
19+
num_batches,
20+
RoundingBehavior::RoundDown,
21+
)
22+
.unwrap();
23+
24+
let alloc_tracker = AllocTracker::new();
25+
{
26+
let _guard = alloc_tracker.enter();
27+
28+
// write out to temporary directory
29+
let tmp_dir = tempfile::tempdir().unwrap();
30+
let tmp_path = tmp_dir.path().to_str().unwrap();
31+
let _dataset = InsertBuilder::new(tmp_path)
32+
.execute_stream(data)
33+
.await
34+
.unwrap();
35+
}
36+
37+
let stats = alloc_tracker.stats();
38+
// Allow for 2x the batch size to account for overheads.
39+
// The key test is that we don't load all 100MB into memory at once
40+
assert_le!(
41+
stats.max_bytes_allocated,
42+
(batch_size * 2) as isize,
43+
"Max memory usage exceeded"
44+
);
45+
}

rust/lance/tests/resource_tests.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright The Lance Authors
3+
4+
// The memory tests don't work currently on MacOS because they rely on thread
5+
// local storage in the allocator, which seems to have some issues on MacOS.
6+
#[cfg(target_os = "linux")]
7+
mod resource_test;

0 commit comments

Comments
 (0)