diff --git a/src/millipds/app_util.py b/src/millipds/app_util.py index 268ea502..84d16a7e 100644 --- a/src/millipds/app_util.py +++ b/src/millipds/app_util.py @@ -23,7 +23,7 @@ # these helpers are useful for conciseness and type hinting -def get_db(req: web.Request): +def get_db(req: web.Request) -> database.Database: return req.app[MILLIPDS_DB] diff --git a/src/millipds/atproto_sync.py b/src/millipds/atproto_sync.py index 0dc4326c..4bb0bf74 100644 --- a/src/millipds/atproto_sync.py +++ b/src/millipds/atproto_sync.py @@ -1,14 +1,19 @@ -from typing import Optional, Tuple +from typing import Optional, Tuple, cast import logging import asyncio from aiohttp import web import cbrrr +from apsw import Connection +from atmst.blockstore import BlockStore +from atmst.mst.node import MSTNode +from atmst.mst.node_store import NodeStore from . import static_config from . import repo_ops from . import util from .app_util import * +from .database import DBBlockStore logger = logging.getLogger(__name__) @@ -152,6 +157,7 @@ async def sync_get_blocks(request: web.Request): await res.write(util.serialize_car_header()) for cid in cids: # we don't use executemany so as not to hog the db + # XXX: there are no indexes on user(id, head) or record(repo, cid)!!! row = db.con.execute( """ SELECT commit_bytes FROM user WHERE head=? AND id=? @@ -220,6 +226,37 @@ async def sync_get_repo_status(request: web.Request): return web.json_response({"did": did, "active": True, "rev": row[0]}) +async def _visit_mst_node( + res: web.StreamResponse, bs: DBBlockStore, node_cid: Optional[cbrrr.CID] +): + if node_cid is None: + return + node_cid_bytes = bytes(node_cid) + node_bytes = bs.get_block(node_cid_bytes) + + # emit this node + await res.write(util.serialize_car_entry(node_cid_bytes, node_bytes)) + + node = MSTNode.deserialise(node_bytes) + await _visit_mst_node(res, bs, node.subtrees[0]) + for record_path, record_cid, subtree_cid in zip( + node.keys, node.vals, node.subtrees[1:] + ): + record_cid_bytes = bytes(record_cid) + collection, rkey = util.split_path(record_path) + # this is a bit of a hack to reuse the blockstore's db connection. + # we lookup records by repo/collection/rkey because there is no index on CID! + record_bytes = bs.db.execute( + "SELECT value FROM record WHERE repo=? AND nsid=? AND rkey=?", + (bs.user_id, collection, rkey), + ).get + assert isinstance(record_bytes, bytes) + await res.write( + util.serialize_car_entry(record_cid_bytes, record_bytes) + ) + await _visit_mst_node(res, bs, subtree_cid) + + @routes.get("/xrpc/com.atproto.sync.getRepo") async def sync_get_repo(request: web.Request): did = request.query.get("did") @@ -245,21 +282,26 @@ async def sync_get_repo(request: web.Request): await res.write(util.serialize_car_header(head)) await res.write(util.serialize_car_entry(head, commit_bytes)) - for mst_cid, mst_value in con.execute( - "SELECT cid, value FROM mst WHERE repo=? AND since>?", - (user_id, since), - ): - await res.write( - util.serialize_car_entry(mst_cid, mst_value) - ) - - for record_cid, record_value in con.execute( - "SELECT cid, value FROM record WHERE repo=? AND since>?", - (user_id, since), - ): - await res.write( - util.serialize_car_entry(record_cid, record_value) - ) + if since: # "since" is deprecated in sync1.1 - we support it but the order of returned blocks is undefined + for mst_cid, mst_value in con.execute( + "SELECT cid, value FROM mst WHERE repo=? AND since>?", + (user_id, since), + ): + await res.write( + util.serialize_car_entry(mst_cid, mst_value) + ) + + for record_cid, record_value in con.execute( + "SELECT cid, value FROM record WHERE repo=? AND since>?", + (user_id, since), + ): + await res.write( + util.serialize_car_entry(record_cid, record_value) + ) + else: # sync1.1 ordering + bs = DBBlockStore(con, did) + commit = cast(dict, cbrrr.decode_dag_cbor(commit_bytes)) + await _visit_mst_node(res, bs, commit["data"]) await res.write_eof() return res