Skip to content

Commit d19bd4a

Browse files
authored
Add unit tests for rocksdict to guarantee reliability (#478)
* initial commit * create new class for rocksdict testing * commit progress * update ci * fix partition get call * commit some fixed tests * commit remaining TODO tests * commit and finish later * upgrade rocksdict minimum to 0.3.8 * commit cleanup * 8 tests to go * cleanup * cleanup redundant tests * cleanup * cleanup * undo breaking changes * commit fixes for more tests * commit fixes for more tests * commit remaining tests * bring test__size back
1 parent 5c08c27 commit d19bd4a

File tree

3 files changed

+272
-62
lines changed

3 files changed

+272
-62
lines changed

faust/stores/rocksdb.py

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class RocksDBOptions:
9595
block_cache_size: int = DEFAULT_BLOCK_CACHE_SIZE
9696
block_cache_compressed_size: int = DEFAULT_BLOCK_CACHE_COMPRESSED_SIZE
9797
bloom_filter_size: int = DEFAULT_BLOOM_FILTER_SIZE
98+
use_rocksdict: bool = USE_ROCKSDICT
9899
extra_options: Mapping
99100

100101
def __init__(
@@ -106,6 +107,7 @@ def __init__(
106107
block_cache_size: Optional[int] = None,
107108
block_cache_compressed_size: Optional[int] = None,
108109
bloom_filter_size: Optional[int] = None,
110+
use_rocksdict: Optional[bool] = None,
109111
**kwargs: Any,
110112
) -> None:
111113
if max_open_files is not None:
@@ -122,11 +124,13 @@ def __init__(
122124
self.block_cache_compressed_size = block_cache_compressed_size
123125
if bloom_filter_size is not None:
124126
self.bloom_filter_size = bloom_filter_size
127+
if use_rocksdict is not None:
128+
self.use_rocksdict = use_rocksdict
125129
self.extra_options = kwargs
126130

127131
def open(self, path: Path, *, read_only: bool = False) -> DB:
128132
"""Open RocksDB database using this configuration."""
129-
if USE_ROCKSDICT:
133+
if self.use_rocksdict:
130134
db_options = self.as_options()
131135
db_options.set_db_paths(
132136
[rocksdict.DBPath(str(path), self.target_file_size_base)]
@@ -139,7 +143,7 @@ def open(self, path: Path, *, read_only: bool = False) -> DB:
139143

140144
def as_options(self) -> Options:
141145
"""Return :class:`rocksdb.Options` object using this configuration."""
142-
if USE_ROCKSDICT:
146+
if self.use_rocksdict:
143147
db_options = Options(raw_mode=True)
144148
db_options.create_if_missing(True)
145149
db_options.set_max_open_files(self.max_open_files)
@@ -156,9 +160,6 @@ def as_options(self) -> Options:
156160
table_factory_options.set_index_type(
157161
rocksdict.BlockBasedIndexType.binary_search()
158162
)
159-
table_factory_options.set_block_cache_compressed(
160-
rocksdict.Cache(self.block_cache_compressed_size)
161-
)
162163
db_options.set_block_based_table_factory(table_factory_options)
163164
return db_options
164165
else:
@@ -221,16 +222,13 @@ def __init__(
221222
driver: Optional[str] = None,
222223
**kwargs: Any,
223224
) -> None:
224-
if rocksdict is None and rocksdb is None:
225+
if rocksdb is None and rocksdict is None:
225226
error = ImproperlyConfigured(
226227
"RocksDB bindings not installed? pip install faust-streaming-rocksdb"
227228
" or rocksdict"
228229
)
229230
try:
230-
try:
231-
import rocksdb as _rocksdb # noqa: F401
232-
except ImportError:
233-
import rocksdict as _rocksdict # noqa: F401
231+
import rocksdb as _rocksdb # noqa: F401
234232
except Exception as exc: # pragma: no cover
235233
raise error from exc
236234
else: # pragma: no cover
@@ -242,11 +240,12 @@ def __init__(
242240
self.read_only = self.options.pop("read_only", read_only)
243241

244242
self.driver = self.options.pop("driver", driver)
245-
global USE_ROCKSDICT
246243
if self.driver == "rocksdict":
247-
USE_ROCKSDICT = True
244+
self.USE_ROCKSDICT = True
248245
elif self.driver == "python-rocksdb":
249-
USE_ROCKSDICT = False
246+
self.USE_ROCKSDICT = False
247+
else:
248+
self.USE_ROCKSDICT = USE_ROCKSDICT
250249

251250
self.rocksdb_options = RocksDBOptions(**self.options)
252251
if key_index_size is None:
@@ -353,13 +352,7 @@ def persisted_offset(self, tp: TP) -> Optional[int]:
353352
354353
See :meth:`set_persisted_offset`.
355354
"""
356-
if USE_ROCKSDICT:
357-
try:
358-
offset = self._db_for_partition(tp.partition)[self.offset_key]
359-
except Exception:
360-
offset = None
361-
else:
362-
offset = self._db_for_partition(tp.partition).get(self.offset_key)
355+
offset = self._db_for_partition(tp.partition).get(self.offset_key)
363356
if offset is not None:
364357
return int(offset)
365358
return None
@@ -372,12 +365,7 @@ def set_persisted_offset(self, tp: TP, offset: int) -> None:
372365
to only read the events that occurred recently while
373366
we were not an active replica.
374367
"""
375-
if USE_ROCKSDICT:
376-
self._db_for_partition(tp.partition)[self.offset_key] = str(offset).encode()
377-
else:
378-
self._db_for_partition(tp.partition).put(
379-
self.offset_key, str(offset).encode()
380-
)
368+
self._db_for_partition(tp.partition).put(self.offset_key, str(offset).encode())
381369

382370
async def need_active_standby_for(self, tp: TP) -> bool:
383371
"""Decide if an active standby is needed for this topic partition.
@@ -425,7 +413,7 @@ def apply_changelog_batch(
425413
of a changelog event.
426414
"""
427415
batches: DefaultDict[int, WriteBatch]
428-
if USE_ROCKSDICT:
416+
if self.USE_ROCKSDICT:
429417
batches = defaultdict(lambda: WriteBatch(raw_mode=True))
430418
else:
431419
batches = defaultdict(rocksdb.WriteBatch)
@@ -436,9 +424,13 @@ def apply_changelog_batch(
436424
offset if tp not in tp_offsets else max(offset, tp_offsets[tp])
437425
)
438426
msg = event.message
427+
if self.USE_ROCKSDICT:
428+
msg.key = msg.key.encode()
439429
if msg.value is None:
440430
batches[msg.partition].delete(msg.key)
441431
else:
432+
if self.USE_ROCKSDICT:
433+
msg.value = msg.value.encode()
442434
batches[msg.partition].put(msg.key, msg.value)
443435

444436
for partition, batch in batches.items():
@@ -453,10 +445,7 @@ def _set(self, key: bytes, value: Optional[bytes]) -> None:
453445
partition = event.message.partition
454446
db = self._db_for_partition(partition)
455447
self._key_index[key] = partition
456-
if USE_ROCKSDICT:
457-
db[key] = value
458-
else:
459-
db.put(key, value)
448+
db.put(key, value)
460449

461450
def _db_for_partition(self, partition: int) -> DB:
462451
try:
@@ -481,13 +470,7 @@ def _get(self, key: bytes) -> Optional[bytes]:
481470
if partition_from_message:
482471
partition = event.message.partition
483472
db = self._db_for_partition(partition)
484-
if USE_ROCKSDICT:
485-
try:
486-
value = db[key]
487-
except Exception:
488-
value = None
489-
else:
490-
value = db.get(key)
473+
value = db.get(key)
491474
if value is not None:
492475
self._key_index[key] = partition
493476
return value
@@ -511,11 +494,9 @@ def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]:
511494
dbs = cast(Iterable[PartitionDB], self._dbs.items())
512495

513496
for partition, db in dbs:
514-
if USE_ROCKSDICT:
515-
try:
516-
value = db[key]
517-
except Exception:
518-
value = None
497+
if self.USE_ROCKSDICT:
498+
# TODO: Remove this once key_may_exist is added
499+
value = db.get(key)
519500
if value is not None:
520501
self._key_index[key] = partition
521502
return _DBValueTuple(db, value)
@@ -640,25 +621,19 @@ def _contains(self, key: bytes) -> bool:
640621
if partition_from_message:
641622
partition = event.message.partition
642623
db = self._db_for_partition(partition)
643-
if USE_ROCKSDICT:
644-
try:
645-
value = db[key]
646-
except Exception:
647-
value = None
648-
else:
649-
value = db.get(key)
624+
value = db.get(key)
650625
if value is not None:
651626
return True
652627
else:
653628
return False
654629
else:
655630
for db in self._dbs_for_key(key):
656631
# bloom filter: false positives possible, but not false negatives
657-
if USE_ROCKSDICT:
658-
try:
659-
_ = db[key]
632+
if self.USE_ROCKSDICT:
633+
# TODO: Remove once key_may_exist is added
634+
if db.get(key) is not None:
660635
return True
661-
except Exception:
636+
else:
662637
return False
663638
else:
664639
if db.key_may_exist(key)[0] and db.get(key) is not None:
@@ -687,7 +662,7 @@ def _size(self) -> int:
687662
return sum(self._size1(db) for db in self._dbs_for_actives())
688663

689664
def _visible_keys(self, db: DB) -> Iterator[bytes]:
690-
if USE_ROCKSDICT:
665+
if self.USE_ROCKSDICT:
691666
it = db.keys()
692667
iter = db.iter()
693668
iter.seek_to_first()
@@ -699,7 +674,7 @@ def _visible_keys(self, db: DB) -> Iterator[bytes]:
699674
yield key
700675

701676
def _visible_items(self, db: DB) -> Iterator[Tuple[bytes, bytes]]:
702-
if USE_ROCKSDICT:
677+
if self.USE_ROCKSDICT:
703678
it = db.items()
704679
else:
705680
it = db.iteritems() # noqa: B301

requirements/extras/rocksdict.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rocksdict>=0.3.7
1+
rocksdict>=0.3.8

0 commit comments

Comments
 (0)