Skip to content

Commit 811e032

Browse files
authored
use key_may_exist() from rocksdict (#479)
* use key_may_exist() from rocksdict * cleanup tests and lint * remove raw_mode=True kwarg from ReadOptions * set use_rocksdict lowercase
1 parent d19bd4a commit 811e032

File tree

3 files changed

+13
-98
lines changed

3 files changed

+13
-98
lines changed

faust/stores/rocksdb.py

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ def open(self, path: Path, *, read_only: bool = False) -> DB:
136136
[rocksdict.DBPath(str(path), self.target_file_size_base)]
137137
)
138138
db = DB(str(path), options=self.as_options())
139-
db.set_read_options(rocksdict.ReadOptions(raw_mode=True))
139+
db.set_read_options(rocksdict.ReadOptions())
140140
return db
141141
else:
142142
return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
@@ -241,11 +241,11 @@ def __init__(
241241

242242
self.driver = self.options.pop("driver", driver)
243243
if self.driver == "rocksdict":
244-
self.USE_ROCKSDICT = True
244+
self.use_rocksdict = True
245245
elif self.driver == "python-rocksdb":
246-
self.USE_ROCKSDICT = False
246+
self.use_rocksdict = False
247247
else:
248-
self.USE_ROCKSDICT = USE_ROCKSDICT
248+
self.use_rocksdict = USE_ROCKSDICT
249249

250250
self.rocksdb_options = RocksDBOptions(**self.options)
251251
if key_index_size is None:
@@ -413,7 +413,7 @@ def apply_changelog_batch(
413413
of a changelog event.
414414
"""
415415
batches: DefaultDict[int, WriteBatch]
416-
if self.USE_ROCKSDICT:
416+
if self.use_rocksdict:
417417
batches = defaultdict(lambda: WriteBatch(raw_mode=True))
418418
else:
419419
batches = defaultdict(rocksdb.WriteBatch)
@@ -424,12 +424,12 @@ def apply_changelog_batch(
424424
offset if tp not in tp_offsets else max(offset, tp_offsets[tp])
425425
)
426426
msg = event.message
427-
if self.USE_ROCKSDICT:
427+
if self.use_rocksdict:
428428
msg.key = msg.key.encode()
429429
if msg.value is None:
430430
batches[msg.partition].delete(msg.key)
431431
else:
432-
if self.USE_ROCKSDICT:
432+
if self.use_rocksdict:
433433
msg.value = msg.value.encode()
434434
batches[msg.partition].put(msg.key, msg.value)
435435

@@ -494,18 +494,11 @@ def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]:
494494
dbs = cast(Iterable[PartitionDB], self._dbs.items())
495495

496496
for partition, db in dbs:
497-
if self.USE_ROCKSDICT:
498-
# TODO: Remove this once key_may_exist is added
497+
if db.key_may_exist(key)[0]:
499498
value = db.get(key)
500499
if value is not None:
501500
self._key_index[key] = partition
502501
return _DBValueTuple(db, value)
503-
else:
504-
if db.key_may_exist(key)[0]:
505-
value = db.get(key)
506-
if value is not None:
507-
self._key_index[key] = partition
508-
return _DBValueTuple(db, value)
509502
return None
510503

511504
def _del(self, key: bytes) -> None:
@@ -629,15 +622,8 @@ def _contains(self, key: bytes) -> bool:
629622
else:
630623
for db in self._dbs_for_key(key):
631624
# bloom filter: false positives possible, but not false negatives
632-
if self.USE_ROCKSDICT:
633-
# TODO: Remove once key_may_exist is added
634-
if db.get(key) is not None:
635-
return True
636-
else:
637-
return False
638-
else:
639-
if db.key_may_exist(key)[0] and db.get(key) is not None:
640-
return True
625+
if db.key_may_exist(key)[0] and db.get(key) is not None:
626+
return True
641627
return False
642628

643629
def _dbs_for_key(self, key: bytes) -> Iterable[DB]:
@@ -662,7 +648,7 @@ def _size(self) -> int:
662648
return sum(self._size1(db) for db in self._dbs_for_actives())
663649

664650
def _visible_keys(self, db: DB) -> Iterator[bytes]:
665-
if self.USE_ROCKSDICT:
651+
if self.use_rocksdict:
666652
it = db.keys()
667653
iter = db.iter()
668654
iter.seek_to_first()
@@ -674,7 +660,7 @@ def _visible_keys(self, db: DB) -> Iterator[bytes]:
674660
yield key
675661

676662
def _visible_items(self, db: DB) -> Iterator[Tuple[bytes, bytes]]:
677-
if self.USE_ROCKSDICT:
663+
if self.use_rocksdict:
678664
it = db.items()
679665
else:
680666
it = db.iteritems() # noqa: B301

requirements/extras/rocksdict.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rocksdict>=0.3.8
1+
rocksdict==0.3.9

tests/unit/stores/test_rocksdb.py

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -757,46 +757,6 @@ class Test_Store_Rocksdict(Test_Store_RocksDB):
757757
def store(self, *, app, rocks, table):
758758
return Store("rocksdb://", app, table, driver="rocksdict")
759759

760-
def test__get__has_event(self, *, store, current_event):
761-
partition = 1
762-
message = Mock(name="message")
763-
message.partition.return_value = partition
764-
765-
current_event.return_value = message
766-
767-
db = Mock(name="db")
768-
store._db_for_partition = Mock("_db_for_partition")
769-
store._db_for_partition.return_value = db
770-
db.get.return_value = b"value"
771-
db.__getitem__ = Mock()
772-
db.__getitem__.return_value = b"value"
773-
store.table = Mock(name="table")
774-
store.table.is_global = False
775-
store.table.synchronize_all_active_partitions = False
776-
store.table.use_partitioner = False
777-
778-
assert store._get(b"key") == b"value"
779-
780-
db.get.return_value = None
781-
db.__getitem__ = Mock()
782-
db.__getitem__.return_value = None
783-
assert store._get(b"key2") is None
784-
785-
@pytest.mark.skip("key_may_exist not available in rocksdict yet")
786-
def test_get_bucket_for_key__is_in_index(self, *, store):
787-
store._key_index[b"key"] = 30
788-
db = store._dbs[30] = Mock(name="db-p30")
789-
790-
db.key_may_exist.return_value = [False]
791-
assert store._get_bucket_for_key(b"key") is None
792-
793-
db.key_may_exist.return_value = [True]
794-
db.get.return_value = None
795-
assert store._get_bucket_for_key(b"key") is None
796-
797-
db.get.return_value = b"value"
798-
assert store._get_bucket_for_key(b"key") == (db, b"value")
799-
800760
def test__iteritems(self, *, store):
801761
dbs = self._setup_items(
802762
db1=[
@@ -823,37 +783,6 @@ def test__iteritems(self, *, store):
823783
# iteritems not available in rocksdict yet
824784
db.items.assert_called_once_with()
825785

826-
def new_db(self, name, exists=False):
827-
db = Mock(name=name)
828-
db.key_may_exist.return_value = [exists]
829-
db.get.return_value = name
830-
return db
831-
832-
@pytest.mark.skip("key_may_exist not available in rocksdict yet")
833-
def test_get_bucket_for_key__not_in_index(self, *, store):
834-
dbs = {
835-
1: self.new_db(name="db1"),
836-
2: self.new_db(name="db2"),
837-
3: self.new_db(name="db3", exists=True),
838-
4: self.new_db(name="db4", exists=True),
839-
}
840-
store._dbs.update(dbs)
841-
842-
assert store._get_bucket_for_key(b"key") == (dbs[3], "db3")
843-
844-
@pytest.mark.skip("key_may_exist not available in rocksdict yet")
845-
def test__contains(self, *, store):
846-
db1 = self.new_db("db1", exists=False)
847-
db2 = self.new_db("db2", exists=True)
848-
dbs = {b"key": [db1, db2]}
849-
store._dbs_for_key = Mock(side_effect=dbs.get)
850-
851-
db2.get.return_value = None
852-
assert not store._contains(b"key")
853-
854-
db2.get.return_value = b"value"
855-
assert store._contains(b"key")
856-
857786
def test__iterkeys(self, *, store):
858787
dbs = self._setup_keys(
859788
db1=[

0 commit comments

Comments
 (0)