Skip to content

Commit f7c2389

Browse files
committed
test(batch-processing): add comprehensive concurrency and stress tests
- Add BatchProcessingStressTests for high-concurrency and sustained load validation - Add BatchProcessingThreadSafetyTests to verify thread-safe record processing - Add isolation tests for DynamoDB, Kinesis, SQS, and typed batch processors - Add StaticResultPropertyTests to validate static result property behavior - Add helper classes for concurrent invocation results, test event factory, and record handlers - Add project reference to BatchProcessing library in test project - Verify batch processor maintains correctness under high concurrency (20-100 concurrent invocations) - Validate thread safety with concurrent record processing and state isolation - Ensure no record leakage between concurrent invocations - Test stress scenarios with varying concurrency levels and iteration counts
1 parent 1a1a3fa commit f7c2389

11 files changed

+4461
-0
lines changed

libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/AWS.Lambda.Powertools.ConcurrencyTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
</ItemGroup>
2626

2727
<ItemGroup>
28+
<ProjectReference Include="..\..\src\AWS.Lambda.Powertools.BatchProcessing\AWS.Lambda.Powertools.BatchProcessing.csproj" />
2829
<ProjectReference Include="..\..\src\AWS.Lambda.Powertools.Idempotency\AWS.Lambda.Powertools.Idempotency.csproj" />
2930
<ProjectReference Include="..\..\src\AWS.Lambda.Powertools.Logging\AWS.Lambda.Powertools.Logging.csproj" />
3031
<ProjectReference Include="..\..\src\AWS.Lambda.Powertools.Metrics\AWS.Lambda.Powertools.Metrics.csproj" />

libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/BatchProcessing/BatchProcessingStressTests.cs

Lines changed: 593 additions & 0 deletions
Large diffs are not rendered by default.

libraries/tests/AWS.Lambda.Powertools.ConcurrencyTests/BatchProcessing/BatchProcessingThreadSafetyTests.cs

Lines changed: 894 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
using Amazon.Lambda.DynamoDBEvents;
2+
using AWS.Lambda.Powertools.BatchProcessing;
3+
using AWS.Lambda.Powertools.ConcurrencyTests.BatchProcessing.Helpers;
4+
using Xunit;
5+
6+
namespace AWS.Lambda.Powertools.ConcurrencyTests.BatchProcessing;
7+
8+
/// <summary>
9+
/// Tests for validating DynamoDB Stream batch processor isolation under concurrent execution scenarios.
10+
/// These tests verify that when multiple Lambda invocations run concurrently (multi-instance mode),
11+
/// each invocation's ProcessingResult remains isolated from other invocations.
12+
/// </summary>
13+
[Collection("BatchProcessing Concurrency Tests")]
14+
public class DynamoDbProcessorIsolationTests
15+
{
16+
/// <summary>
17+
/// Verifies that concurrent invocations using the DynamoDbStreamBatchProcessor
18+
/// each receive their own ProcessingResult with the correct record count.
19+
/// Requirements: 1.1, 1.3, 2.1
20+
/// </summary>
21+
[Theory]
22+
[InlineData(2)]
23+
[InlineData(5)]
24+
[InlineData(10)]
25+
public async Task ConcurrentInvocations_ShouldMaintainProcessingResultIsolation(int concurrencyLevel)
26+
{
27+
// Arrange
28+
var results = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>[concurrencyLevel];
29+
var barrier = new Barrier(concurrencyLevel);
30+
var recordCountsPerInvocation = Enumerable.Range(0, concurrencyLevel)
31+
.Select(i => 3 + (i * 2)) // Different record counts: 3, 5, 7, 9, etc.
32+
.ToArray();
33+
34+
// Act
35+
var tasks = new Task[concurrencyLevel];
36+
for (int i = 0; i < concurrencyLevel; i++)
37+
{
38+
int invocationIndex = i;
39+
tasks[i] = Task.Run(async () =>
40+
{
41+
var invocationId = $"dynamodb-inv-{invocationIndex}-{Guid.NewGuid():N}";
42+
var recordCount = recordCountsPerInvocation[invocationIndex];
43+
var dynamoDbEvent = TestEventFactory.CreateDynamoDbEvent(recordCount, invocationId);
44+
var expectedRecordIds = TestEventFactory.GetDynamoDbSequenceNumbers(dynamoDbEvent);
45+
var handler = new TestDynamoDbRecordHandler();
46+
47+
// Synchronize all invocations to start at the same time
48+
barrier.SignalAndWait();
49+
50+
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
51+
52+
// Create a new processor instance for each invocation to ensure isolation
53+
var processor = new TestDynamoDbStreamBatchProcessor();
54+
var result = await processor.ProcessAsync(dynamoDbEvent, handler);
55+
56+
stopwatch.Stop();
57+
58+
results[invocationIndex] = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>
59+
{
60+
InvocationId = invocationId,
61+
ExpectedRecordCount = recordCount,
62+
ExpectedRecordIds = expectedRecordIds,
63+
ActualResult = result,
64+
Duration = stopwatch.Elapsed
65+
};
66+
});
67+
}
68+
69+
await Task.WhenAll(tasks);
70+
71+
// Assert
72+
foreach (var result in results)
73+
{
74+
Assert.NotNull(result.ActualResult);
75+
Assert.Equal(result.ExpectedRecordCount, result.ActualResult.BatchRecords.Count);
76+
77+
// Verify all records in the result belong to this invocation
78+
var actualRecordIds = result.ActualResult.BatchRecords
79+
.Select(r => r.Dynamodb.SequenceNumber)
80+
.ToHashSet();
81+
Assert.True(actualRecordIds.SetEquals(result.ExpectedRecordIds),
82+
$"Invocation {result.InvocationId}: Expected records {string.Join(",", result.ExpectedRecordIds)} " +
83+
$"but got {string.Join(",", actualRecordIds)}");
84+
}
85+
}
86+
87+
/// <summary>
88+
/// Verifies that concurrent invocations with different success/failure ratios
89+
/// each receive the correct BatchItemFailures for their invocation.
90+
/// Requirements: 1.4
91+
/// </summary>
92+
[Theory]
93+
[InlineData(2)]
94+
[InlineData(5)]
95+
[InlineData(10)]
96+
public async Task ConcurrentInvocations_ShouldMaintainBatchItemFailuresIsolation(int concurrencyLevel)
97+
{
98+
// Arrange
99+
var results = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>[concurrencyLevel];
100+
var barrier = new Barrier(concurrencyLevel);
101+
var recordCount = 5;
102+
103+
// Act
104+
var tasks = new Task[concurrencyLevel];
105+
for (int i = 0; i < concurrencyLevel; i++)
106+
{
107+
int invocationIndex = i;
108+
tasks[i] = Task.Run(async () =>
109+
{
110+
var invocationId = $"dynamodb-inv-{invocationIndex}-{Guid.NewGuid():N}";
111+
var dynamoDbEvent = TestEventFactory.CreateDynamoDbEvent(recordCount, invocationId);
112+
var expectedFailureCount = invocationIndex % recordCount; // 0, 1, 2, 3, 4 failures
113+
114+
var handler = new TestDynamoDbRecordHandler
115+
{
116+
// Fail the first N records based on invocation index
117+
ShouldFail = record =>
118+
{
119+
var seqParts = record.Dynamodb.SequenceNumber.Split("-seq-");
120+
var recordIndex = int.Parse(seqParts[1]);
121+
return recordIndex < expectedFailureCount;
122+
}
123+
};
124+
125+
barrier.SignalAndWait();
126+
127+
var processor = new TestDynamoDbStreamBatchProcessor();
128+
var processingOptions = new ProcessingOptions { ThrowOnFullBatchFailure = false };
129+
var result = await processor.ProcessAsync(dynamoDbEvent, handler, processingOptions);
130+
131+
results[invocationIndex] = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>
132+
{
133+
InvocationId = invocationId,
134+
ExpectedRecordCount = recordCount,
135+
ExpectedFailureCount = expectedFailureCount,
136+
ActualResult = result
137+
};
138+
});
139+
}
140+
141+
await Task.WhenAll(tasks);
142+
143+
// Assert
144+
foreach (var result in results)
145+
{
146+
Assert.NotNull(result.ActualResult);
147+
Assert.Equal(result.ExpectedFailureCount, result.ActualResult.FailureRecords.Count);
148+
Assert.Equal(result.ExpectedFailureCount,
149+
result.ActualResult.BatchItemFailuresResponse.BatchItemFailures.Count);
150+
151+
// Verify all failure IDs belong to this invocation
152+
foreach (var failure in result.ActualResult.BatchItemFailuresResponse.BatchItemFailures)
153+
{
154+
Assert.True(failure.ItemIdentifier.StartsWith(result.InvocationId),
155+
$"Failure ID {failure.ItemIdentifier} should start with invocation ID {result.InvocationId}");
156+
}
157+
}
158+
}
159+
160+
/// <summary>
161+
/// Verifies that when one invocation completes while another is still processing,
162+
/// the completing invocation returns only its own results.
163+
/// Requirements: 1.2, 4.3
164+
/// </summary>
165+
[Theory]
166+
[InlineData(10)]
167+
[InlineData(50)]
168+
[InlineData(100)]
169+
public async Task OverlappingInvocations_CompletingInvocationShouldReturnOnlyOwnResults(int shortDelayMs)
170+
{
171+
// Arrange
172+
var longDelayMs = shortDelayMs * 3;
173+
var shortInvocationResult = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>();
174+
var longInvocationResult = new ConcurrentInvocationResult<DynamoDBEvent.DynamodbStreamRecord>();
175+
var barrier = new Barrier(2);
176+
177+
var shortRecordCount = 3;
178+
var longRecordCount = 5;
179+
180+
// Act
181+
var shortTask = Task.Run(async () =>
182+
{
183+
var invocationId = $"dynamodb-short-{Guid.NewGuid():N}";
184+
var dynamoDbEvent = TestEventFactory.CreateDynamoDbEvent(shortRecordCount, invocationId);
185+
var expectedRecordIds = TestEventFactory.GetDynamoDbSequenceNumbers(dynamoDbEvent);
186+
var handler = new TestDynamoDbRecordHandler
187+
{
188+
ProcessingDelay = TimeSpan.FromMilliseconds(shortDelayMs)
189+
};
190+
191+
barrier.SignalAndWait();
192+
193+
var processor = new TestDynamoDbStreamBatchProcessor();
194+
var result = await processor.ProcessAsync(dynamoDbEvent, handler);
195+
196+
shortInvocationResult.InvocationId = invocationId;
197+
shortInvocationResult.ExpectedRecordCount = shortRecordCount;
198+
shortInvocationResult.ExpectedRecordIds = expectedRecordIds;
199+
shortInvocationResult.ActualResult = result;
200+
});
201+
202+
var longTask = Task.Run(async () =>
203+
{
204+
var invocationId = $"dynamodb-long-{Guid.NewGuid():N}";
205+
var dynamoDbEvent = TestEventFactory.CreateDynamoDbEvent(longRecordCount, invocationId);
206+
var expectedRecordIds = TestEventFactory.GetDynamoDbSequenceNumbers(dynamoDbEvent);
207+
var handler = new TestDynamoDbRecordHandler
208+
{
209+
ProcessingDelay = TimeSpan.FromMilliseconds(longDelayMs)
210+
};
211+
212+
barrier.SignalAndWait();
213+
214+
var processor = new TestDynamoDbStreamBatchProcessor();
215+
var result = await processor.ProcessAsync(dynamoDbEvent, handler);
216+
217+
longInvocationResult.InvocationId = invocationId;
218+
longInvocationResult.ExpectedRecordCount = longRecordCount;
219+
longInvocationResult.ExpectedRecordIds = expectedRecordIds;
220+
longInvocationResult.ActualResult = result;
221+
});
222+
223+
await Task.WhenAll(shortTask, longTask);
224+
225+
// Assert - Short invocation should have only its own records
226+
Assert.NotNull(shortInvocationResult.ActualResult);
227+
Assert.Equal(shortRecordCount, shortInvocationResult.ActualResult.BatchRecords.Count);
228+
var shortActualIds = shortInvocationResult.ActualResult.BatchRecords
229+
.Select(r => r.Dynamodb.SequenceNumber)
230+
.ToHashSet();
231+
Assert.True(shortActualIds.SetEquals(shortInvocationResult.ExpectedRecordIds),
232+
"Short invocation should contain only its own records");
233+
234+
// Assert - Long invocation should have only its own records
235+
Assert.NotNull(longInvocationResult.ActualResult);
236+
Assert.Equal(longRecordCount, longInvocationResult.ActualResult.BatchRecords.Count);
237+
var longActualIds = longInvocationResult.ActualResult.BatchRecords
238+
.Select(r => r.Dynamodb.SequenceNumber)
239+
.ToHashSet();
240+
Assert.True(longActualIds.SetEquals(longInvocationResult.ExpectedRecordIds),
241+
"Long invocation should contain only its own records");
242+
243+
// Verify no cross-contamination
244+
Assert.False(shortActualIds.Overlaps(longInvocationResult.ExpectedRecordIds),
245+
"Short invocation should not contain long invocation's records");
246+
Assert.False(longActualIds.Overlaps(shortInvocationResult.ExpectedRecordIds),
247+
"Long invocation should not contain short invocation's records");
248+
}
249+
250+
/// <summary>
251+
/// Verifies that concurrent access to the batch processor does not throw
252+
/// thread-safety related exceptions.
253+
/// Requirements: 2.1
254+
/// </summary>
255+
[Theory]
256+
[InlineData(2, 10)]
257+
[InlineData(5, 20)]
258+
[InlineData(10, 10)]
259+
public async Task ConcurrentAccess_ShouldNotThrowThreadSafetyExceptions(int concurrencyLevel, int iterationsPerThread)
260+
{
261+
// Arrange
262+
var exceptions = new List<Exception>();
263+
var exceptionLock = new object();
264+
var barrier = new Barrier(concurrencyLevel);
265+
266+
// Act
267+
var tasks = Enumerable.Range(0, concurrencyLevel).Select(threadIndex => Task.Run(async () =>
268+
{
269+
try
270+
{
271+
barrier.SignalAndWait();
272+
273+
for (int iteration = 0; iteration < iterationsPerThread; iteration++)
274+
{
275+
var invocationId = $"dynamodb-thread-{threadIndex}-iter-{iteration}-{Guid.NewGuid():N}";
276+
var dynamoDbEvent = TestEventFactory.CreateDynamoDbEvent(3, invocationId);
277+
var handler = new TestDynamoDbRecordHandler();
278+
279+
var processor = new TestDynamoDbStreamBatchProcessor();
280+
var result = await processor.ProcessAsync(dynamoDbEvent, handler);
281+
282+
// Verify basic correctness
283+
Assert.Equal(3, result.BatchRecords.Count);
284+
}
285+
}
286+
catch (Exception ex)
287+
{
288+
lock (exceptionLock)
289+
{
290+
exceptions.Add(new Exception($"Thread {threadIndex}: {ex.Message}", ex));
291+
}
292+
}
293+
})).ToList();
294+
295+
await Task.WhenAll(tasks);
296+
297+
// Assert
298+
Assert.Empty(exceptions);
299+
}
300+
}

0 commit comments

Comments
 (0)