Skip to content

Commit 0bb2685

Browse files
wbarnhapatkivikram
andauthored
Add method to RocksDB for backing up partitions (#304)
* create method for backing up partition * if we're not flushing, just directly read the db * annotate backup method * Define backup_partition in StoreT baseclass and derivatives * change partition to tp * change partition to union tp or int since all we care about is partition index * fix error log * add method to restore backups * add forgotten ellipses * remove misleading docstring * Check if backup path is directory and make paths * Convert partition paths used in restoration to str * dedicate backup path by tablename * update backup docstring * dont import BackupEngine to fix linting * commit lint changes * reformat docstrings * add general Exception * add backup_partition and restore_backup to MyStore test class * add backup_partition and restore_backup to MySerializedStore test class * check permissions to create dirs and write to backup dir before spawning backupengine * remove redundant exception handle * add backup methods to ChangeloggedObjectManager Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
1 parent a887571 commit 0bb2685

File tree

6 files changed

+174
-1
lines changed

6 files changed

+174
-1
lines changed

faust/stores/aerospike.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,3 +262,23 @@ def aerospike_fun_call_with_retry(self, fun, *args, **kwargs):
262262
ex
263263
) # crash the app to prevent the offset from progressing
264264
raise ex
265+
266+
async def backup_partition(
267+
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
268+
) -> None:
269+
"""Backup partition from this store.
270+
271+
Not yet implemented for Aerospike.
272+
273+
"""
274+
raise NotImplementedError("Not yet implemented for Aerospike.")
275+
276+
def restore_backup(
277+
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
278+
) -> None:
279+
"""Restore partition backup from this store.
280+
281+
Not yet implemented for Aerospike.
282+
283+
"""
284+
raise NotImplementedError("Not yet implemented for Aerospike.")

faust/stores/memory.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""In-memory table storage."""
2-
from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple
2+
from typing import Any, Callable, Iterable, MutableMapping, Optional, Set, Tuple, Union
33

44
from faust.types import TP, EventT
55
from faust.types.stores import KT, VT
@@ -82,3 +82,23 @@ def reset_state(self) -> None:
8282
8383
"""
8484
...
85+
86+
async def backup_partition(
87+
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
88+
) -> None:
89+
"""Backup partition from this store.
90+
91+
This does nothing when using the in-memory store.
92+
93+
"""
94+
...
95+
96+
def restore_backup(
97+
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
98+
) -> None:
99+
"""Restore partition backup from this store.
100+
101+
This does nothing when using the in-memory store.
102+
103+
"""
104+
...

faust/stores/rocksdb.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import asyncio
33
import gc
44
import math
5+
import os
56
import shutil
7+
import tempfile
68
import typing
79
from collections import defaultdict
810
from contextlib import suppress
@@ -183,6 +185,83 @@ def __init__(
183185
self._key_index = LRUCache(limit=self.key_index_size)
184186
self.db_lock = asyncio.Lock()
185187
self.rebalance_ack = False
188+
self._backup_path = os.path.join(self.path, f"{str(self.basename)}-backups")
189+
try:
190+
self._backup_engine = None
191+
if not os.path.isdir(self._backup_path):
192+
os.makedirs(self._backup_path, exist_ok=True)
193+
testfile = tempfile.TemporaryFile(dir=self._backup_path)
194+
testfile.close()
195+
except PermissionError:
196+
self.log.warning(
197+
f'Unable to make directory for path "{self._backup_path}",'
198+
f"disabling backups."
199+
)
200+
except OSError:
201+
self.log.warning(
202+
f'Unable to create files in "{self._backup_path}",' f"disabling backups"
203+
)
204+
else:
205+
self._backup_engine = rocksdb.BackupEngine(self._backup_path)
206+
207+
async def backup_partition(
208+
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
209+
) -> None:
210+
"""Backup partition from this store.
211+
212+
This will be saved in a separate directory in the data directory called
213+
'{table-name}-backups'.
214+
215+
Arguments:
216+
tp: Partition to backup
217+
flush: Flush the memset before backing up the state of the table.
218+
purge: Purge old backups in the process
219+
keep: How many backups to keep after purging
220+
221+
This is only supported in newer versions of python-rocksdb which can read
222+
the RocksDB database using multi-process read access.
223+
See https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB to know more.
224+
"""
225+
if self._backup_engine:
226+
partition = tp
227+
if isinstance(tp, TP):
228+
partition = tp.partition
229+
try:
230+
if flush:
231+
db = await self._try_open_db_for_partition(partition)
232+
else:
233+
db = self.rocksdb_options.open(
234+
self.partition_path(partition), read_only=True
235+
)
236+
self._backup_engine.create_backup(db, flush_before_backup=flush)
237+
if purge:
238+
self._backup_engine.purge_old_backups(keep)
239+
except Exception:
240+
self.log.info(f"Unable to backup partition {partition}.")
241+
242+
def restore_backup(
243+
self, tp: Union[TP, int], latest: bool = True, backup_id: int = 0
244+
) -> None:
245+
"""Restore partition backup from this store.
246+
247+
Arguments:
248+
tp: Partition to restore
249+
latest: Restore the latest backup, set as False to restore a specific ID
250+
backup_id: Backup to restore
251+
252+
"""
253+
if self._backup_engine:
254+
partition = tp
255+
if isinstance(tp, TP):
256+
partition = tp.partition
257+
if latest:
258+
self._backup_engine.restore_latest_backup(
259+
str(self.partition_path(partition)), self._backup_path
260+
)
261+
else:
262+
self._backup_engine.restore_backup(
263+
backup_id, str(self.partition_path(partition)), self._backup_path
264+
)
186265

187266
def persisted_offset(self, tp: TP) -> Optional[int]:
188267
"""Return the last persisted offset.

faust/tables/objects.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,16 @@ def apply_changelog_batch(
183183

184184
for tp, offset in tp_offsets.items():
185185
self.set_persisted_offset(tp, offset)
186+
187+
async def backup_partition(
188+
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
189+
) -> None:
190+
raise NotImplementedError
191+
192+
def restore_backup(
193+
self,
194+
tp,
195+
latest: bool = True,
196+
backup_id: int = 0,
197+
) -> None:
198+
raise NotImplementedError

faust/types/stores.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,18 @@ async def on_recovery_completed(
101101
self, active_tps: Set[TP], standby_tps: Set[TP]
102102
) -> None:
103103
...
104+
105+
@abc.abstractmethod
106+
async def backup_partition(
107+
self, tp: Union[TP, int], flush: bool = True, purge: bool = False, keep: int = 1
108+
) -> None:
109+
...
110+
111+
@abc.abstractmethod
112+
def restore_backup(
113+
self,
114+
tp: Union[TP, int],
115+
latest: bool = True,
116+
backup_id: int = 0,
117+
) -> None:
118+
...

tests/unit/stores/test_base.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ def apply_changelog_batch(self, *args, **kwargs):
2929
def reset_state(self):
3030
...
3131

32+
async def backup_partition(
33+
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
34+
) -> None:
35+
...
36+
37+
def restore_backup(
38+
self,
39+
tp,
40+
latest: bool = True,
41+
backup_id: int = 0,
42+
) -> None:
43+
...
44+
3245

3346
class Test_Store:
3447
@pytest.fixture
@@ -120,6 +133,19 @@ def _clear(self):
120133
def reset_state(self):
121134
...
122135

136+
async def backup_partition(
137+
self, tp, flush: bool = True, purge: bool = False, keep: int = 1
138+
) -> None:
139+
...
140+
141+
def restore_backup(
142+
self,
143+
tp,
144+
latest: bool = True,
145+
backup_id: int = 0,
146+
) -> None:
147+
...
148+
123149

124150
class Test_SerializedStore:
125151
@pytest.fixture

0 commit comments

Comments
 (0)