Lightweight CRDT utilities for Drift that add conflict-free replication to your local SQLite database. It provides:
- CRDT columns (
isDeleted,hlc,nodeId,modified) via amixinfor Drift tables - Safe writes that automatically set CRDT metadata and resolve conflicts using Hybrid Logical Clocks (HLC)
- Read helpers that transparently filter tombstoned rows
- Export/import of changesets to sync multiple nodes/devices
An end-to-end example is available in example/.
Many apps need offline-first storage and conflict resolution across devices. This package composes with Drift to add minimal, explicit primitives built on HLCs so you can:
- Write data normally while embedding CRDT metadata
- Observe tables without deleted rows
- Compute and merge changesets between nodes
dart pub add drift drift_crdtAdd the CrdtColumns mixin to any table that should be replicated. Include your app-specific columns as usual.
class Todos extends Table with CrdtColumns {
@override
Set<Column> get primaryKey => {}; // remember to add your primary key
}CRDT columns added by the mixin:
isDeleted(bool): tombstone flag for logical deleteshlc(text): write HLCnodeId(text): node id extracted from the HLCmodified(text): last write HLC
import 'package:drift_crdt/drift_crdt.dart';
final crdt = DriftCrdt<AppDatabase>(db);
await crdt.init(); // optionally pass a UNIQUE node id (a user id, for example)Use the included reader helpers. They automatically filter out tombstoned rows (isDeleted = true).
final rows = await crdt.select(crdt.db.todos);
// This is equivalent to:
final rows = await (db.select(db.todos)..where((t) => t.isDeleted.equals(false))).get();final stream = crdt.watch(crdt.db.todos);
// This is equivalent to:
final stream = (db.select(db.todos)..where((t) => t.isDeleted.equals(false))).watch();final stream = crdt.watch(
crdt.db.todos,
where: (t) => [
t.title.equals('Learn Flutter'),
t.done.equals(false),
],
);
// This is equivalent to:
final stream =
await (db.select(db.todos)
..where((t) => t.title.equals('Learn Flutter'))
..where((t) => t.done.equals(false))
..where((t) => t.isDeleted.equals(false)))
.get();This uses directly the Drift API, it just filters out tombstoned rows.
Use write to perform inserts/updates/deletes. CRDT columns (hlc, nodeId, modified, isDeleted) are filled automatically and conflicts are resolved by HLC ordering.
await crdt.write((w) async {
await w.insert(
crdt.db.todos,
TodosCompanion(id: 'id-1', title: 'Learn Dart'),
);
});await crdt.write((w) async {
await w.insertOnConflictUpdate(
crdt.db.todos,
TodosCompanion(id: 'id-1', title: 'Learn Dart'),
);
});await crdt.write((w) async {
await w.update(
crdt.db.todos,
TodosCompanion(done: Value(true)),
);
});await crdt.write((w) async {
await w.delete(
crdt.db.todos,
where: (t) => [t.id.equals('id-1')],
);
});By default, all writes are performed in a transaction so you can write something like this:
await crdt.write((w) async {
final q = crdt.db.select(crdt.db.todos)
..where((t) => t.id.equals('id-1'));
final row = await q.getSingle();
await w.update(
crdt.db.todos,
TodosCompanion(done: Value(!row.done)),
where: (t) => [t.id.equals('id-1')],
);
});Because of how Drift works you can't use the <MyTable>Companion.insert constructor because it forces you to provide also the required CRDTs columns.
For the same reason you also can't use the generated <MyClass> DataClass class.
Note that you can still use the <MyTable>Companion default constructor and and if you forget to provide some of columns that are marked as required by your table, you will get a runtime error.
If you need full control over Drift APIs (e.g. raw queries), you can use writeUnsafe function.
It requires a callback function that will provide you with some helpful parameters:
db: the database instanceparams: a record that contains the CRDT params that you can use to fill your own columnsfilters: a record that contains the WHERE clauses that you need to apply to your query to prevent conflicts. It contains the same filters just in a different format:filters.hlcFilteris a filter that you can use inwherewhen su you the Drift's query builderfilters.customHlcFilteris a filter that you can use when you usedb.custom*methods.
You must use the params record to fill your own columns.
await crdt.writeUnsafe((db, params, filters) async {
final query = db.update(db.todos)..where(filters.hlcFilter);
await query.write(
TodosCompanion(
isDeleted: const Value(true),
hlc: Value(params.hlc),
nodeId: Value(params.nodeId),
modified: Value(params.modified),
),
);
return (result: null, affectedTables: {db.todos});
});Manually setting the CRDT columns everytime is a bit verbose, so you can use the withParams extension method to fill the CRDT columns.
await crdt.writeUnsafe((db, params, filters) async {
final query = db.update(db.todos)..where(filters.hlcFilter);
await query.write(
const TodosCompanion(isDeleted: Value(true)).withParams(params),
);
return (result: null, affectedTables: {db.todos});
});You must use the filters record to apply the WHERE clauses to your query.
You must use the filters.hlcFilter filter to apply the WHERE clauses to your query.
await crdt.writeUnsafe((db, params, filters) async {
final query = db.update(db.todos)..where(filters.hlcFilter);
/* ... */
});You must use the filters.customHlcFilter filter to apply the WHERE clauses to your query.
await crdt.writeUnsafe((db, params, filters) async {
final sql = '''
UPDATE ${db.todos.actualTableName} SET isDeleted = 1
WHERE ${filters.customHlcFilter(db.todos.actualTableName)}
''';
await db.customUpdate(
sql,
updates: {db.todos},
updateKind: UpdateKind.update,
);
return (result: null, affectedTables: {db.todos});
});You must not use writeUnsafe to perform native delete operation otherwise you will break the CRDT logic.
Instead you must update the isDeleted column to true.
BAD ❌
await crdt.writeUnsafe((db, params, filters) async {
final query = db.delete(db.todos)..where(filters.hlcFilter);
await query.go();
return (result: null, affectedTables: {db.todos});
});GOOD ✅ Good:
await crdt.writeUnsafe((db, params, filters) async {
final query = db.update(db.todos)..where(filters.hlcFilter);
await query.write(
const TodosCompanion(isDeleted: Value(true)).withParams(params),
);
return (result: null, affectedTables: {db.todos});
});You can use the withParams extension method with the delete parameter set to true to automatically set the isDeleted column to true.
await crdt.writeUnsafe((db, params, filters) async {
final query = db.update(db.todos)..where(filters.hlcFilter);
await query.write(
const TodosCompanion().withParams(params, delete: true),
);
return (result: null, affectedTables: {db.todos});
});Export a changeset from one node and merge it into another. The package uses HLCs to ensure deterministic conflict resolution.
// On node A`
final changeset = await crdtA.getChangeset();
// Send `changeset` to node B via your transport of choice
// On node B
await crdtB.merge(changeset);You can scope changesets:
// Only specific tables
await crdt.getChangeset(onlyTables: ['todos']);
// Filter by node id
await crdt.getChangeset(onlyNodeId: 'node-A');
await crdt.getChangeset(exceptNodeId: 'node-A');
// Time-based filters using HLCs
await crdt.getChangeset(modifiedOn: someHlc);
await crdt.getChangeset(modifiedAfter: someHlc);See example/lib/data/ for the complete setup (tables, mappers, repositories, and database class).
CrdtColumns: mixin with required CRDT columnsDriftCrdt<T extends GeneratedDatabase>:init([nodeId])- Reads:
select,selectSingle(OrNull),watch,watchSingle(OrNull) - Writes:
write((w) => w.insert|insertOnConflictUpdate|update|delete(...)) - Advanced:
writeUnsafe((db, params, filters) => ...) - Sync:
getChangeset(...),merge(changeset),getLastModified(...)
- Deletions are logical (tombstone). Reads exclude rows where
isDeleted == true. - You control transport of changesets between nodes; this package only serializes and merges.
MIT