Shardis: Bigger on the inside. Smarter on the outside.
Shardis is a lightweight, scalable sharding framework for .NET designed to help developers partition and route aggregates across multiple databases cleanly and efficiently. Built for domain-driven systems, event sourcing architectures, and multi-tenant platforms, Shardis ensures that data routing remains deterministic, maintainable, and completely decoupled from business logic.
-
🚀 Deterministic Key-based Routing Route aggregate instances consistently to the correct database shard based on a strong hashing mechanism.
-
🛠️ Pluggable Shard Map Storage Abstract where and how shard mappings are stored — support in-memory development, persistent stores, or distributed caches.
-
🔗 Designed for Event Sourcing and CQRS Integrates naturally with systems like MartenDB, EventStoreDB, and custom event stores.
-
🧩 Simple, Extensible Architecture Swap out routing strategies or extend shard metadata without leaking sharding concerns into your domain models.
-
🏗 Ready for Production Scaling Shard assignments are persistent, predictable, and optimized for horizontal scalability.
-
📊 Instrumentation Hooks Plug in metrics (counters, tracing) by replacing the default no-op metrics service.
Instrumentation quick reference
- ActivitySource:
Shardis— add with.AddSource("Shardis")when configuring OpenTelemetry tracing. - Meter:
Shardis— add with.AddMeter("Shardis")to pick up built-in counters and histograms. - Metric API: implement
IShardisMetricsor use the providedMetricShardisMetrics(default isNoOpShardisMetrics). - Important:
IShardisMetricsnow exposesRecordRouteLatency(double)(milliseconds) as a first-class method — implementers should record this on a histogram namedshardis.route.latency(unit: ms). Ordered and unordered streaming paths are covered by metrics observer tests (item counts, heap samples, backpressure waits) ensuring instrumentation stability.
- ActivitySource:
-
🔄 Consistent Hashing Option Choose between simple sticky routing and a consistent hashing ring with configurable replication factor & pluggable ring hashers.
-
📥 Ordered & Unordered Streaming Queries Low-latency unordered fan-out plus deterministic k‑way heap merge (bounded prefetch) for globally ordered streaming.
-
📈 Adaptive Paging (Marten) Deterministic latency-targeted page size adjustments with oscillation & final-size telemetry.
-
🧪 Central Public API Snapshots Consolidated multi-assembly approval tests ensure stable public surface; drift produces clear
.receiveddiffs. -
🔁 Pluggable Migration Executors & Providers Core migration package plus EF Core (rowversion / checksum) and Marten (checksum) providers with per-key copy → verify → swap pipeline, checkpointing, deterministic retries, and duration instrumentation (copy / verify / swap batch / total elapsed).
-
🔒 Deterministic Canonicalization & Checksums Stable JSON canonicalization + pluggable hashing (
Fnv1a64Hasherby default) powering verification strategies (seedocs/canonicalization.md). -
🏥 Shard Health & Resilience Detect, route around, and recover from unhealthy shards without custom scaffolding. Configurable health probes, failure thresholds, and query execution strategies (best-effort, strict, require N-of-M shards).
| Package | Purpose |
|---|---|
Shardis |
Core routing, hashing, shard map, metrics abstractions. |
Shardis.Migration |
Key migration planning & execution pipeline (planner, executor, checkpoints, verification abstractions). |
Shardis.Migration.EntityFrameworkCore |
EF Core migration provider (rowversion + checksum verification strategies). |
Shardis.Migration.Marten |
Marten migration provider (checksum verification). |
Shardis.Migration.Sql |
(Experimental) SQL durability components: checkpoint store + shard map/history + assignment changed event hook. |
Shardis.Query |
Query abstraction layer (shard-aware LINQ, executors). |
Shardis.Query.EntityFrameworkCore |
EF Core query executor + shard factory adapters. |
Shardis.Query.Marten |
Marten query executor with adaptive paging. |
Shardis.Query.InMemory |
In-memory query executor (tests / samples). |
Shardis.Redis |
Redis-backed shard map store implementation. |
Shardis.DependencyInjection |
Per-shard resource registration (AddShard*) + IShardFactory<T> DI resolution. |
var services = new ServiceCollection()
.AddShards<MyDbContext>(2, shard => new MyDbContext(BuildOptionsFor(shard))); // registers factory-backed contexts per shard
await using var provider = services.BuildServiceProvider();
var factory = provider.GetRequiredService<IShardFactory<MyDbContext>>();
await using var ctx = await factory.CreateAsync(new ShardId("0"));
// use ctx...The DI package centralizes per-shard provisioning logic and feeds query executors (e.g. EntityFrameworkCoreShardQueryExecutor) via the generic IShardFactory<T> abstraction.
🔜*(Coming soon to NuGet.)*
For now, clone the repository:
git clone https://github.com/veggerby/shardis.git
cd ShardisReference the Shardis project in your solution, or package it locally using your preferred method.
For key migration, prefer the dedicated Shardis.Migration package (planner + executor + verification abstractions). Add a backend provider (EF Core or Marten) for concrete data movers and verification strategies.
Canonical DI usage:
var services = new ServiceCollection()
.AddShardisMigration<string>()
.BuildServiceProvider();
var planner = services.GetRequiredService<Shardis.Migration.Abstractions.IShardMigrationPlanner<string>>();
var executor = services.GetRequiredService<Shardis.Migration.Execution.ShardMigrationExecutor<string>>();
var from = new Shardis.Migration.Model.TopologySnapshot<string>(new Dictionary<Shardis.Model.ShardKey<string>, Shardis.Model.ShardId>());
var to = new Shardis.Migration.Model.TopologySnapshot<string>(new Dictionary<Shardis.Model.ShardKey<string>, Shardis.Model.ShardId>());
var plan = await planner.CreatePlanAsync(from, to, CancellationToken.None);
await executor.ExecuteAsync(plan, CancellationToken.None);See docs/MIGRATION.md, docs/canonicalization.md, src/Shardis.Migration/README.md, and provider READMEs under src/Shardis.Migration.* for details and production guidance.
Setting up a basic router:
using Shardis.Model;
using Shardis.Routing;
using Shardis.Persistence;
using Shardis.Hashing;
// Define available shards
var shards = new[]
{
new SimpleShard(new("shard-001"), "postgres://user:pass@host1/db"),
new SimpleShard(new("shard-002"), "postgres://user:pass@host2/db"),
new SimpleShard(new("shard-003"), "postgres://user:pass@host3/db")
};
// Initialize the shard router
### Using Dependency Injection
var shardRouter = new DefaultShardRouter(
shardMapStore: new InMemoryShardMapStore(),
availableShards: shards
);
// Route a ShardKey
var userId = new ShardKey("user-451");
var shard = shardRouter.RouteToShard(userId);
Console.WriteLine($"User {userId} routed to {shard.ShardId}");// Register shards & configure options
services.AddShardis<IShard<string>, string, string>(opts =>
{
opts.UseConsistentHashing = true; // or false for DefaultShardRouter
opts.ReplicationFactor = 150; // only for consistent hashing
opts.RingHasher = Fnv1aShardRingHasher.Instance; // optional alternative ring hasher
opts.Shards.Add(new SimpleShard(new("shard-001"), "postgres://host1/db"));
opts.Shards.Add(new SimpleShard(new("shard-002"), "postgres://host2/db"));
});
// Override map store BEFORE AddShardis if desired:
services.AddSingleton<IShardMapStore<string>>(new InMemoryShardMapStore<string>());
// Provide metrics (default registered is no-op):
services.AddSingleton<IShardisMetrics, MetricShardisMetrics>();- ShardKey: A value object representing the identity of an aggregate or entity to be routed.
- Shard: Represents a physical partition (e.g., a specific PostgreSQL database instance).
- ShardRouter: Routes incoming ShardKeys to the appropriate Shard based on hashing.
- ShardMapStore: Caches key-to-shard assignments to ensure stable, deterministic routing over time.
- Metrics: Routers invoke
IShardisMetrics(hits, misses, new/existing assignment) – default implementation is a no-op.
The following invariants are enforced at startup / construction to fail fast and keep routing deterministic:
| Invariant | Enforcement Point | Exception |
|---|---|---|
| At least one shard registered | AddShardis options validation |
ShardisException |
| ReplicationFactor > 0 and <= 10,000 | AddShardis options validation & router construction |
ShardisException |
| Non-empty shard collection for broadcasters | ShardBroadcaster / ShardStreamBroadcaster constructors |
ArgumentException |
| Null shard collection rejected | Broadcaster constructors | ArgumentNullException (ParamName = shards) |
| Null query delegate rejected | Broadcaster QueryAllShardsAsync methods |
ArgumentNullException (ParamName = query) |
DefaultShardKeyHasher<TKey>.Instance selects an implementation by type:
| Key Type | Hasher |
|---|---|
string |
StringShardKeyHasher |
int |
Int32ShardKeyHasher |
uint |
UInt32ShardKeyHasher |
long |
Int64ShardKeyHasher |
Guid |
GuidShardKeyHasher |
| other | (throws) ShardisException |
Override via opts.ShardKeyHasher if you need a custom algorithm (e.g. xxHash, HighwayHash) – ensure determinism and stable versioning.
ReplicationFactor controls virtual node count per shard. Higher values smooth distribution but increase memory and ring rebuild time. Empirically:
| ReplicationFactor | Typical Shard Count | Distribution Variance (cv heuristic) |
|---|---|---|
| 50 | ≤ 8 | ~0.40–0.45 |
| 100 (default) | 8–16 | ~0.32–0.38 |
| 150 | 16–32 | ~0.28–0.33 |
| 200+ | 32+ | Diminishing returns |
Variance numbers are approximate and workload dependent; adjust after observing real key distributions.
Replication factor hard cap: values greater than 10,000 are rejected to prevent pathological ring sizes (memory amplification + long rebuild latency).
IShardMapStore<TKey> exposes two atomic primitives:
TryAssignShardToKey(compare-and-set). First writer wins; concurrent attempts racing to assign the same key yield exactly onetrue.TryGetOrAdd– fetch an existing assignment or create it without a separate preliminary lookup (eliminates double hashing / allocation patterns in hot routing paths).
Routers rely on these to avoid duplicate assignments under bursty traffic. Tests stress thousands of concurrent attempts to ensure a single winner.
Routers emit exactly one RouteMiss for the first key assignment followed by:
RouteHit(existingAssignment=false)for the initial persisted assignment.RouteHit(existingAssignment=true)for every subsequent route.
Single-miss guarantee (even under extreme concurrency) is enforced via:
- Per-key lock in the Default router collapsing races to a single creator.
- Miss de-dup dictionary in both routers so even if optimistic creation paths surface multiple contenders, only the first records the miss.
Consistent hash router only records a miss if TryGetOrAdd actually created the mapping and it has not yet been recorded for that key.
ShardBroadcaster (materializing) and ShardStreamBroadcaster (streaming) enforce non-empty shard sets and parameter validation. The streaming broadcaster:
- Starts one producer task per shard.
- Supports optional bounded channel capacity (backpressure) – unbounded by default.
- Cancels remaining work early for short‑circuit operations (
AnyAsync,FirstAsync). - Guarantees that consumer observation order is the actual arrival order (no artificial reordering unless using ordered merge utilities).
- Emits lifecycle callbacks via
IMergeObserver:OnItemYielded(shardId)– after an item is yielded to the consumer.OnShardCompleted(shardId)– shard produced all items successfully.OnShardStopped(shardId, reason)– exactly once per shard withCompleted|Canceled|Faulted.OnBackpressureWaitStart/Stop()– unordered path only when bounded channel is full.OnHeapSizeSample(size)– ordered merge heap sampling (throttled byheapSampleEvery).
using Shardis.Querying;
using Shardis.Model;
public sealed class LoggingObserver : IMergeObserver
{
private int _count;
public void OnItemYielded(ShardId shardId) => Interlocked.Increment(ref _count);
public void OnShardCompleted(ShardId shardId) => Console.WriteLine($"Shard {shardId} completed.");
public void OnShardStopped(ShardId shardId, ShardStopReason reason) => Console.WriteLine($"Shard {shardId} stopped: {reason} (items so far={_count}).");
public void OnBackpressureWaitStart() { }
public void OnBackpressureWaitStop() { }
public void OnHeapSizeSample(int size) => Console.WriteLine($"Heap size: {size}");
}
// Wiring:
var observer = new LoggingObserver();
var broadcaster = new ShardStreamBroadcaster<IShard<string>, string>(shards, channelCapacity: 64, observer: observer, heapSampleEvery: 10);Observer implementations MUST be thread-safe; callbacks can occur concurrently.
ShardisAsyncOrderedEnumeratorperforms a k‑way merge using a min-heap keyed by the provided selector – stable for identical keys (tie broken by shard enumeration order).ShardisAsyncCombinedEnumeratorsimply interleaves items as each shard advances; no global ordering guarantees.
Enumerators and broadcasters honor passed CancellationTokens; ordered/combined enumerators propagate cancellation immediately on next MoveNextAsync and broadcasters swallow expected cancellation exceptions after signaling completion.
- Distribute user accounts across multiple PostgreSQL clusters in a SaaS platform.
- Scale event streams across multiple event stores without burdening domain logic.
- Implement tenant-based isolation by routing organizations to their assigned shards.
- Future-proof a growing CQRS/Event Sourcing system against database size limits.
Shardis is designed for extension:
-
Custom Routing Strategies Implement your own
IShardRouterif you need consistent hashing rings, weighted shards, or region-aware routing. -
Persistent Shard Maps Replace the in-memory
IShardMapStorewith implementations backed by SQL, Redis, or cloud storage. -
Shard Migrations and Rebalancing Coming soon: native support for safely reassigning keys and migrating aggregates between shards.
-
Metrics / Telemetry Implement
IShardisMetricsto export counters to OpenTelemetry / Prometheus.
Shardis is built around three core principles:
-
Determinism First: Given the same ShardKey, the same shard must always be chosen unless explicitly migrated.
-
Separation of Concerns: Domain models should never "know" about shards — sharding remains purely an infrastructure concern.
-
Minimal Intrusion: Shardis integrates into your system without forcing heavy infrastructure or hosting requirements.
- Durable checkpoint store implementations (Redis) (SQL experimental implementation available)
- Segmented migration planner (large plan pagination) – ADR 0004 follow-up
- Dual-read / dual-write transition window (grace phase) for Tier 3 integrity
- Alphabetical canonicalization option (stable across type refactors)
- Server-side checksum integration (backend-provided hash short‑circuit)
- Additional map stores (harden SQL provider + add Redis durability enhancements)
- Read/Write split router support
- Multi-region / geo-sharding affinity routing
- Telemetry package with OpenTelemetry exporters (metrics + traces pre-wired, migration duration histograms)
- Checksum & canonicalization benchmarks (
ChecksumBenchmarks) - Performance regression guard rails (allocation + latency budgets)
Pull requests, issues, and ideas are welcome. If you find an interesting edge case or want to extend Shardis into more advanced scaling patterns, open a discussion or a PR!
See CONTRIBUTING.md.
BenchmarkDotNet benchmarks live in benchmarks/.
Run (from repo root):
dotnet run -c Release -p benchmarks/Shardis.Benchmarks.csproj --filter *RouterBenchmarks*
dotnet run -c Release -p benchmarks/Shardis.Benchmarks.csproj --filter *HasherBenchmarks*Use these to compare (by --anyCategories):
router: Default vs Consistent hash routing costhasher: Different ring hash algorithms (Default vs FNV-1a) & replication factor impactmigration: Migration executor throughput across concurrency / batch matrixbroadcaster: Fast vs slow shard streaming (fairness, interleaving, backpressure sensitivity). This suite remains as a baseline ahead of the upcoming ordered vs unordered merge benchmarks.
Planned (in active design):
merge: Ordered vs unordered streaming merge enumerators (k‑way heap vs combined interleave) – complements broadcaster suite.migration: Executor throughput (copy + verify + swap) across concurrency & retry scenarios (Marten checksum path & EF Core rowversion path forthcoming).- (Planned)
checksum: Canonicalization + hashing throughput & allocation profile.
After optimization: routing hot path avoids double hashing (via TryGetOrAdd) and maintains constant single miss emission under high contention.
xUnit tests live in test/Shardis.Tests/ covering:
- Routing correctness
- Consistent hashing determinism
- Metrics invocation
- DI registration & overrides
- Migration planning scaffolding
- Ordered merge enumerator
Run:
dotnet testAssertion policy: the test suite relies on the AwesomeAssertions NuGet package for fluent, deterministic assertions.
Additional invariants covered:
- Single route miss under high concurrency
- Dynamic ring add/remove maintains routing without KeyNotFound or inconsistent assignment
- Deterministic ordering for duplicate keys in ordered merge
- Statistical ring distribution bounds (coefficient of variation heuristic)
- Non-empty broadcaster shard enforcement & null parameter guards
All public surfaces across assemblies are snapshotted via Shardis.PublicApi.Tests using PublicApiGenerator. Baselines live under test/PublicApiApproval/*.approved.txt (committed). When an intentional API change is made:
- Run
dotnet test -c Debug -p:PublicApi(or simplydotnet test). - A
.receivedfile will be written alongside the affected.approvedfile. - Inspect the diff; if intentional, replace the
.approvedcontent with the.receivedcontent and delete the.receivedfile (the test does this automatically on next green run).
The test auto-creates missing .approved files (first run does not fail). Only stable, documented APIs should be added—avoid leaking internal abstractions.
Migration implementation now lives in the dedicated Shardis.Migration package. The core repository no longer exposes the previous migration stub. For migration work, prefer the Shardis.Migration executor which provides an end-to-end execution pipeline (copy, verify, swap) with checkpointing and metrics.
Quick start:
- Add the migration services in your composition root:
services.AddShardisMigration<TKey>(); - Use the planner / executor from the package to create a plan and execute it with durable components (data mover, verifier, swapper, checkpoint store).
See docs/MIGRATION.md, docs/adr/0002-key-migration-execution.md and the Shardis.Migration README for examples and operational guidance.
Two broadcaster abstractions exist today:
ShardBroadcaster– dispatches a synchronous / Task-returning delegate to every shard and aggregates materialized results.ShardStreamBroadcaster– dispatches async streaming queries (IAsyncEnumerable<T>per shard) and yields a merged asynchronous stream without buffering entire shard result sets.
Utility enumerators:
ShardisAsyncOrderedEnumerator– k-way merge for globally ordered streams.ShardisAsyncCombinedEnumerator– simple interleaving without ordering guarantees.
Higher-level fluent query API (LINQ-like) is under active design (see docs/api.md & docs/linq.md).
Experimental minimal provider (see ADR 0003 cross-link) allows composing simple per-shard filters and a single projection and executing unordered:
var exec = /* IShardQueryExecutor implementation (e.g. InMemory / EntityFrameworkCore) */;
var q = Shardis.Query.ShardQuery.For<Person>(exec)
.Where(p => p.Age >= 30)
.Select(p => new { p.Name, p.Age });
await foreach (var row in q) { Console.WriteLine($"{row.Name} ({row.Age})"); }Constraints (MVP):
- Only
Where(multiple) + single terminalSelect. - No ordering operators; use
ShardStreamBroadcaster.QueryAllShardsOrderedStreamingAsyncfor global ordering or order after materialization. - Unordered merge semantics identical to
QueryAllShardsAsync. - Cancellation respected mid-stream.
Future work (tracked): join support, ordering pushdown, aggregation.
| Provider | Package | Where | Select | Ordering | Cancellation | Metrics Hooks |
|---|---|---|---|---|---|---|
| InMemory | Shardis.Query.InMemory |
✅ | ✅ | ❌ (post-filter only) | Cooperative (no throw) | ✅ (OnShardStart/Stop/Items/Completed/Canceled) |
| EF Core | Shardis.Query.EntityFrameworkCore |
✅ server-side | ✅ server-side | ❌ (use ordered streaming merge for global order) | Cooperative | ✅ |
| Marten (adapter)* | Shardis.Marten |
✅ | ✅ | Backend native only (no global merge) | Cooperative | (planned) |
Ordering: for global ordering across shards use QueryAllShardsOrderedStreamingAsync(keySelector) (streaming k-way merge) or materialize then order.
| Provider | Unordered Streaming | Ordered Merge Compatible | Native Pagination | Adaptive Paging | Notes |
|---|---|---|---|---|---|
| InMemory | Yes (in-process) | Yes | N/A | N/A | Uses compiled expression pipelines. |
| EF Core | Yes (IAsyncEnumerable) | Yes | Yes (Skip/Take) | Not yet | Relies on underlying provider translation. |
| Marten | Yes (paged) | Yes | Yes | Yes | Fixed or adaptive paging materializer. |
Adaptive paging (Marten) grows/shrinks page size within configured bounds to keep batch latency near a target window. It is deterministic (pure function of prior elapsed times) and never exceeds maxPageSize. Choose:
- Fixed page size: predictable memory footprint, steady workload.
- Adaptive: heterogeneous shard performance, aims to reduce tail latency without overfetching.
| Aspect | InMemory | EF Core | Marten (Fixed) | Marten (Adaptive) |
|---|---|---|---|---|
| Mid-item check | Between MoveNext calls | Provider awaits next row | Before each page & per item | Before each page & per item |
| On cancel effect | Stops yielding, completes gracefully | Stops enumeration, disposes | Stops paging loop | Stops paging loop, retains last page decision state |
| Exception surface | None (cooperative) | OperationCanceledException may bubble internally then swallowed | Swallows after signaling metrics | Same as fixed |
Guidance: always pass a token with timeout for interactive workloads; enumerators honor cancellation promptly.
*Marten executor currently requires a PostgreSQL instance; tests are scaffolded and skipped in CI when no connection is available.
Unordered execution intentionally interleaves per-shard results based on arrival timing. For identical logical inputs, interleaving order may vary across runs. Applications requiring deterministic global ordering must either:
- Use an ordered merge (
ShardisAsyncOrderedEnumerator) supplying a stable key selector, or - Materialize then order results explicitly.
All executors observe CancellationToken cooperatively. Enumeration stops early without throwing unless the underlying provider surfaces an OperationCanceledException. Metrics observers receive OnCanceled exactly once.
Run the new query benchmark suite:
dotnet run -c Release -p benchmarks/Shardis.Benchmarks.csproj --filter *QueryBenchmarks*// InMemory
var inMemExec = new InMemoryShardQueryExecutor(new[] { shard1Objects, shard2Objects }, UnorderedMerge.Merge);
var inMemQuery = ShardQuery.For<Person>(inMemExec).Where(p => p.Age >= 30).Select(p => p.Name);
var names1 = await inMemQuery.ToListAsync();
// EF Core (Sqlite)
// EF Core (Sqlite) via shard factory
var EntityFrameworkCoreFactory = new EntityFrameworkCoreShardFactory<MyDbContext>(sid => new DbContextOptionsBuilder<MyDbContext>()
.UseSqlite($"Data Source=shard-{sid.Value}.db")
.Options);
IShardFactory<DbContext> efAdapter = new DelegatingShardFactory<DbContext>((sid, ct) => new ValueTask<DbContext>(EntityFrameworkCoreFactory.Create(sid)));
var efExec = new EntityFrameworkCoreShardQueryExecutor(2, efAdapter, UnorderedMerge.Merge);
var efQuery = ShardQuery.For<Person>(efExec).Where(p => p.Age >= 30).Select(p => p.Name);
var names2 = await efQuery.ToListAsync();
// Marten (single shard adapter for now)
using var session = documentStore.LightweightSession();
var martenNames = await MartenQueryExecutor.Instance
.Execute(session, q => q.Where(p => p.Age >= 30).Select(p => p))
.Select(p => p.Name)
.ToListAsync();
// NOTE: Unordered merge => arrival-order, not globally deterministic across shards.Important: Unordered execution is intentionally non-deterministic. For deterministic ordering across shards use an ordered merge (QueryAllShardsOrderedStreamingAsync) or materialize then order.
Runnable sample: samples/Shardis.Query.Samples.Marten (creates shardis_marten_sample DB, seeds a few Person docs).
var store = DocumentStore.For(o => o.Connection(connString));
await using var session = store.LightweightSession();
var exec = MartenQueryExecutor.Instance.WithPageSize(128);
await foreach (var p in exec.Execute<Person>(session, q => q.Where(x => x.Age >= 30)))
{
Console.WriteLine($"{p.Name} ({p.Age})");
}
// Ordered
await foreach (var p in exec.ExecuteOrdered<Person,int>(session, q => q.OrderBy(x => x.Age), x => x.Age)) { }
// Adaptive paging
var adaptive = MartenQueryExecutor.Instance.WithAdaptivePaging();
await foreach (var p in adaptive.Execute<Person>(session, q => q.Where(x => x.Age >= 30))) { }See sample for seeding + database bootstrap utilities.
Shardis executors use a cooperative cancellation model: when cancellation is requested the async iterator stops yielding without throwing unless the underlying provider surfaces an OperationCanceledException. Translation/database/provider exceptions are propagated unchanged. Consumers requiring explicit cancellation signaling should inspect the token externally.
For deterministic cross-shard ordering use OrderedMergeHelper.Merge supplying a key selector. Each shard stream must already be locally ordered by that key. The merge performs a streaming k-way heap merge (O(log n) per item, where n = shard count) without materializing full result sets.
| Package | Dependency | Where/Select | Streaming | Ordering | Notes |
|---|---|---|---|---|---|
| Shardis.Query | none | ✔️ | n/a | n/a | Core query model & merge helpers |
| Shardis.Query.InMemory | none | ✔️ | ✔️ | ❌ | Dev/test executor |
| Shardis.Query.EntityFrameworkCore | EF Core | ✔️ | ✔️ | ❌ | Server-side translation (Sqlite tests) |
| Shardis.Query.Marten | Marten | ✔️ | ✔️ (paged/native) | ❌ | Async/paged materializer |
UnorderedMerge uses an unbounded channel by default for lowest latency. Provide a channelCapacity (e.g. 64–256) to enforce backpressure and cap memory:
var broadcaster = new ShardStreamBroadcaster<MyShard, MySession>(shards, channelCapacity: 128);
await foreach (var item in broadcaster.QueryAllShardsAsync(s => Query(s))) { /* ... */ }
// Or directly via helper returning merged stream (capacity 128 example):
var merged = UnorderedMergeHelper.Merge(shardStreams, channelCapacity: 128);
await foreach (var row in merged) { /* consume */ }
// Minimal helper creation showing explicit capacity tradeoff
var merge = UnorderedMergeHelper.Merge(shardStreams, channelCapacity: 128); // capacity => more memory, fewer producer stalls (lower tail latency)Guidance:
- 0 / null (unbounded): lowest per-item latency, potential burst amplification.
- 32–64: balance memory vs throughput for medium fan-out.
- 128–256: higher sustained throughput where producers are faster than consumer.
-
512 rarely justified unless profiling shows persistent producer starvation.
Backpressure wait events are surfaced via IMergeObserver.OnBackpressureWaitStart/Stop so instrumentation can record stall time. Ordered (k-way) merges and unbounded unordered merges emit zero wait events by design.
Two observer surfaces exist:
| Interface | Purpose |
|---|---|
IQueryMetricsObserver |
Lifecycle + item counters (shard start/stop, items produced, completion, cancellation). |
IAdaptivePagingObserver |
Adaptive Marten paging decisions (previous size, next size, last batch latency). |
Marten executor can switch between fixed and adaptive paging:
var fixedExec = MartenQueryExecutor.Instance.WithPageSize(256);
var adaptiveExec = MartenQueryExecutor.Instance.WithAdaptivePaging(
minPageSize: 64,
maxPageSize: 1024,
targetBatchMilliseconds: 50,
observer: myAdaptiveObserver);Adaptive strategy grows/shrinks page size deterministically based on prior batch latency relative to a target window. It never exceeds bounds and emits a decision event only when the page size changes.
Additional telemetry (adaptive):
OnOscillationDetected(shardId, decisionsInWindow, window)– high churn signal; consider narrowing grow/shrink factors or widening latency target.OnFinalPageSize(shardId, finalSize, totalDecisions)– emitted once per shard enumeration for tuning & dashboards.
CI runs a smoke allocation benchmark comparing fixed vs adaptive Marten paging. A JSON exporter is parsed and a markdown delta report (ADAPTIVE-ALLOC-DELTA.md) is uploaded. Environment thresholds:
ADAPTIVE_ALLOC_MAX_PCT(default 20) – percentage delta guardADAPTIVE_ALLOC_MIN_BYTES(default 4096 B/op) – ignore noise below this absolute per-op allocation
Currently advisory / non-blocking. After several green runs remove the fallback || echo in the workflow to make regressions fail the job.
See docs/merge-modes.md for a full matrix. Quick guidance:
// Unordered streaming (arrival order, lowest latency)
await foreach (var item in broadcaster.QueryAllShardsAsync(s => Query(s))) { /* ... */ }
// Ordered streaming (bounded memory k-way merge)
await foreach (var item in broadcaster.QueryAllShardsOrderedStreamingAsync(s => Query(s), keySelector: x => x.Timestamp, prefetchPerShard: 2)) { /* ... */ }
// Tuning prefetch:
// 1 => minimal latency & memory (default)
// 2 => balanced latency vs throughput
// 4 => higher throughput if shards intermittently stall
int prefetch = isLowLatencyScenario ? 1 : 2; // rarely >4
await foreach (var item in broadcaster.QueryAllShardsOrderedStreamingAsync(s => Query(s), x => x.Id, prefetch)) { }// Cancellation & Observability // Early / mid-stream cancellation is tested (no deadlocks, resources released, no leaks via WeakReference probe). // Metrics observer tests assert heap sampling (>0 for ordered), symmetric backpressure wait events for bounded channels, // and zero wait events for unbounded / ordered streaming scenarios.
Memory scale: O(shards × prefetch). Increase only if profiling shows the merge heap frequently empty while shards are still producing (starvation).
Several IShardMapStore<TKey> implementations are (or will be) available:
| Implementation | Package / Location | Use Case |
|---|---|---|
InMemoryShardMapStore<TKey> |
Core | Tests, local dev, ephemeral scenarios |
RedisShardMapStore<TKey> |
Shardis.Redis |
Low-latency distributed cache + persistence |
| SQL-backed (planned) | 🚧 | Durable relational storage |
// Add package reference to Shardis.Redis (when published)
services.AddSingleton<IShardMapStore<string>>(sp => new RedisShardMapStore<string>("localhost:6379"));
services.AddShardis<IShard<string>, string, string>(opts =>
{
opts.Shards.Add(new SimpleShard(new("shard-001"), "postgres://host1/db"));
opts.Shards.Add(new SimpleShard(new("shard-002"), "postgres://host2/db"));
});The Redis implementation stores assignments as simple string keys under the prefix shardmap:. It should be supplemented with persistence / snapshot strategy if you rotate Redis.
Both InMemory and Redis map stores implement TryGetOrAdd to minimize the number of hash computations and branch decisions in router hot paths.
See docs/index.md for a curated set of design and roadmap documents (fluent query API, migration, backlog, benchmarks).
- Routing is deterministic (no randomness besides stable hash functions).
- No shard logic leaks into domain models; models are plain data structures.
- All public APIs are documented with XML docs.
- Hashing and ring strategies are pluggable (
IShardKeyHasher<TKey>,IShardRingHasher). - Metrics capture is optional and zero-cost when using the no-op implementation.
- Consistent hash ring rebuilds (add/remove) swap an immutable key snapshot atomically for lock-free lookups.
- Default router guarantees one
RouteMissper key via per-key lock, preserving historical metric semantics.
Configured via AddShardis<TShard,TKey,TSession>(opts => { ... }):
| Option | Purpose | Default |
|---|---|---|
| UseConsistentHashing | Toggle consistent vs simple router | true |
| ReplicationFactor | Virtual nodes per shard (ring) | 100 |
| RingHasher | Ring hashing implementation | DefaultShardRingHasher |
| ShardMapStoreFactory | Custom map store factory | InMemoryShardMapStore |
| ShardKeyHasher | Override key -> uint hash | DefaultShardKeyHasher |
| RouterFactory | Provide totally custom router | null |
| Shards | Collection of shards | (empty) |
Overridable services (register before AddShardis):
IShardMapStore<TKey>IShardisMetrics
Routers report two primitive events through IShardisMetrics:
RouteMiss(router)– a key had no prior assignment and hashing/selection occurred.RouteHit(router, shardId, existingAssignment)– a shard was chosen;existingAssignmentindicates whether the key already had a stored mapping.
You can plug in your own metrics export by implementing IShardisMetrics. A production-ready default using System.Diagnostics.Metrics is provided as MetricShardisMetrics (register it in DI to enable counters):
services.AddSingleton<IShardisMetrics, MetricShardisMetrics>();Exposed counters (names subject to refinement before first NuGet release):
| Counter | Description |
|---|---|
| shardis.route.hits | Total route resolutions (both new + existing assignments) |
| shardis.route.misses | Keys seen for the first time before assignment |
| shardis.route.assignments.existing | Route hits where mapping already existed |
| shardis.route.assignments.new | Route hits that resulted in a new persisted assignment |
Attach these to OpenTelemetry via the .NET Metrics provider or scrape via Prometheus exporters.
The fluent query layer (packages under Shardis.Query.*) records end-to-end merge latency for both unordered and ordered fan-out paths via a single unified histogram (see ADR 0006 – unified single-emission model):
| Instrument | Unit | Description |
|---|---|---|
shardis.query.merge.latency |
ms | Time from first shard enumeration start until merged sequence completion / failure / cancellation. |
Tags (dimensions) emitted (empty / null omitted by exporters):
| Tag | Meaning |
|---|---|
db.system |
Underlying EF Core provider (best-effort heuristic), e.g. sqlite, postgresql, sqlserver. |
provider |
Logical provider identifier (e.g. efcore, inmemory, marten). |
shard.count |
Total logical shards configured for the executor. |
target.shard.count |
Number of shards actually targeted (after WhereShard filtering & invalid id removal). |
merge.strategy |
unordered or ordered (single unified instrument; future fully streaming ordered merge may introduce an additional instrument if semantics diverge). |
ordering.buffered |
true if ordered (k‑way) merge path was used; false for unordered (present for parity). |
fanout.concurrency |
Effective parallel shard enumerations (min(configured limit, targeted shard count)). |
channel.capacity |
Configured channel capacity (unordered merge only; -1 indicates unbounded). |
failure.mode |
fail-fast or best-effort (best-effort: partial shard failures suppressed). |
result.status |
ok, failed, or canceled (ok for best-effort when ≥1 shard succeeded even if some failed). |
root.type |
CLR type name of the root element (e.g. Person). |
invalid.shard.count |
Number of user-requested shard ids ignored because they do not exist. |
Enable by supplying an IShardisQueryMetrics implementation (the default is a no-op):
services.AddSingleton<IShardisQueryMetrics, MetricShardisQueryMetrics>();
// Ensure OpenTelemetry is configured for the Shardis meter
builder.Services.AddOpenTelemetry().WithMetrics(m => m.AddMeter("Shardis.Query"));Tests (QueryMergeLatencyMetricsTests, QueryLatencyAdditionalOpenTelemetryTests) assert exactly one histogram record per enumeration (success, cancellation, failure, ordered, targeted, invalid-shard scenarios) and validate tag correctness including invalid.shard.count. Note: earlier drafts referred to a db.provider tag; the finalized stable schema uses provider for reduced cardinality and consistency with other Shardis metrics.
Use WhereShard to restrict fan‑out to a known subset of shards, reducing unnecessary work and latency:
var exec = /* IShardQueryExecutor */;
var q = ShardQuery.For<Person>(exec)
.WhereShard(new ShardId("1"), new ShardId("3"))
.Where(p => p.Age >= 30);
var people = await q.ToListAsync();Behavior:
- Unknown shard ids are ignored silently (telemetry tag
invalid.shard.countcaptures the count). target.shard.countreflects the number of valid shards actually enumerated.- Concurrency capping uses the lesser of the configured maximum and
target.shard.count(tag:fanout.concurrency). - If no valid shards remain after filtering the query yields an empty result quickly.
Rationale: many applications know a shard (or set) a priori (e.g., entity already carries a resolved ShardId); targeted execution avoids enumerating every shard only to filter after merge.
Unit tests (WhereShardTargetingTests, EfCoreExecutorConcurrencyTests, EntityFrameworkCoreTargetingInvalidShardTests) cover single-target, invalid-id handling, and concurrency semantics.
MIT License — free for personal and commercial use.
- Semantic Versioning will be used once packages are published.
- Until the first stable
1.0.0, minor version bumps (0.x) may introduce breaking changes with clear CHANGELOG entries. - Public APIs with XML docs are considered part of the contract; anything
internalor undocumented may change. - Experimental features are tagged in docs and may be excluded from backward compatibility guarantees until stabilized.
Planned publication sequence:
Shardis(core) – routing, hashing, map stores, metrics.Shardis.Redis– Redis map store.Shardis.Marten– query executor adapter (post fluent API MVP).- Migration utilities (once copy + verify pipeline complete).
"Because scaling your domain shouldn’t mean scaling your pain." 🚀