Skip to content

Commit 8e75c8e

Browse files
authored
Fix test case for test_apply_changelog_batch (#480)
1 parent 811e032 commit 8e75c8e

File tree

2 files changed

+1
-6
lines changed

2 files changed

+1
-6
lines changed

faust/stores/rocksdb.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ def apply_changelog_batch(
414414
"""
415415
batches: DefaultDict[int, WriteBatch]
416416
if self.use_rocksdict:
417-
batches = defaultdict(lambda: WriteBatch(raw_mode=True))
417+
batches = defaultdict(lambda: rocksdict.WriteBatch(raw_mode=True))
418418
else:
419419
batches = defaultdict(rocksdb.WriteBatch)
420420
tp_offsets: Dict[TP, int] = {}
@@ -424,13 +424,9 @@ def apply_changelog_batch(
424424
offset if tp not in tp_offsets else max(offset, tp_offsets[tp])
425425
)
426426
msg = event.message
427-
if self.use_rocksdict:
428-
msg.key = msg.key.encode()
429427
if msg.value is None:
430428
batches[msg.partition].delete(msg.key)
431429
else:
432-
if self.use_rocksdict:
433-
msg.value = msg.value.encode()
434430
batches[msg.partition].put(msg.key, msg.value)
435431

436432
for partition, batch in batches.items():

tests/unit/stores/test_rocksdb.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,6 @@ def test__itervalues(self, *, store):
836836
# TODO: seek_to_first() should be called once rocksdict is updated
837837
db.items.assert_called_once_with()
838838

839-
@pytest.mark.skip("Needs fixing")
840839
def test_apply_changelog_batch(self, *, store, rocksdict, db_for_partition):
841840
def new_event(name, tp: TP, offset, key, value) -> Mock:
842841
return Mock(

0 commit comments

Comments
 (0)