Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/millipds/app_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


# these helpers are useful for conciseness and type hinting
def get_db(req: web.Request):
def get_db(req: web.Request) -> database.Database:
return req.app[MILLIPDS_DB]


Expand Down
74 changes: 58 additions & 16 deletions src/millipds/atproto_sync.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
from typing import Optional, Tuple
from typing import Optional, Tuple, cast
import logging
import asyncio

from aiohttp import web
import cbrrr
from apsw import Connection
from atmst.blockstore import BlockStore
from atmst.mst.node import MSTNode
from atmst.mst.node_store import NodeStore

from . import static_config
from . import repo_ops
from . import util
from .app_util import *
from .database import DBBlockStore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -152,6 +157,7 @@ async def sync_get_blocks(request: web.Request):
await res.write(util.serialize_car_header())
for cid in cids:
# we don't use executemany so as not to hog the db
# XXX: there are no indexes on user(id, head) or record(repo, cid)!!!
row = db.con.execute(
"""
SELECT commit_bytes FROM user WHERE head=? AND id=?
Expand Down Expand Up @@ -220,6 +226,37 @@ async def sync_get_repo_status(request: web.Request):
return web.json_response({"did": did, "active": True, "rev": row[0]})


async def _visit_mst_node(
res: web.StreamResponse, bs: DBBlockStore, node_cid: Optional[cbrrr.CID]
):
if node_cid is None:
return
node_cid_bytes = bytes(node_cid)
node_bytes = bs.get_block(node_cid_bytes)

# emit this node
await res.write(util.serialize_car_entry(node_cid_bytes, node_bytes))

node = MSTNode.deserialise(node_bytes)
await _visit_mst_node(res, bs, node.subtrees[0])
for record_path, record_cid, subtree_cid in zip(
node.keys, node.vals, node.subtrees[1:]
):
record_cid_bytes = bytes(record_cid)
collection, rkey = util.split_path(record_path)
# this is a bit of a hack to reuse the blockstore's db connection.
# we lookup records by repo/collection/rkey because there is no index on CID!
record_bytes = bs.db.execute(
"SELECT value FROM record WHERE repo=? AND nsid=? AND rkey=?",
(bs.user_id, collection, rkey),
).get
assert isinstance(record_bytes, bytes)
await res.write(
util.serialize_car_entry(record_cid_bytes, record_bytes)
)
await _visit_mst_node(res, bs, subtree_cid)


@routes.get("/xrpc/com.atproto.sync.getRepo")
async def sync_get_repo(request: web.Request):
did = request.query.get("did")
Expand All @@ -245,21 +282,26 @@ async def sync_get_repo(request: web.Request):
await res.write(util.serialize_car_header(head))
await res.write(util.serialize_car_entry(head, commit_bytes))

for mst_cid, mst_value in con.execute(
"SELECT cid, value FROM mst WHERE repo=? AND since>?",
(user_id, since),
):
await res.write(
util.serialize_car_entry(mst_cid, mst_value)
)

for record_cid, record_value in con.execute(
"SELECT cid, value FROM record WHERE repo=? AND since>?",
(user_id, since),
):
await res.write(
util.serialize_car_entry(record_cid, record_value)
)
if since: # "since" is deprecated in sync1.1 - we support it but the order of returned blocks is undefined
for mst_cid, mst_value in con.execute(
"SELECT cid, value FROM mst WHERE repo=? AND since>?",
(user_id, since),
):
await res.write(
util.serialize_car_entry(mst_cid, mst_value)
)

for record_cid, record_value in con.execute(
"SELECT cid, value FROM record WHERE repo=? AND since>?",
(user_id, since),
):
await res.write(
util.serialize_car_entry(record_cid, record_value)
)
else: # sync1.1 ordering
bs = DBBlockStore(con, did)
commit = cast(dict, cbrrr.decode_dag_cbor(commit_bytes))
await _visit_mst_node(res, bs, commit["data"])

await res.write_eof()
return res
Expand Down