Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
2483157
perf: use streaming json.Decoder and lite structs for checkpoint meta…
pjbgf Mar 11, 2026
c9b8db9
feat: sparse metadata fetch with on-demand blob resolution
pjbgf Mar 11, 2026
36db444
perf: use shallow treeless fetch for metadata branch
pjbgf Mar 11, 2026
e29ae2e
fix: resume fetches metadata branch and checks actual remote for bran…
pjbgf Mar 12, 2026
9691be1
fix: test failures from global core.hooksPath and remote metadata tre…
pjbgf Mar 12, 2026
b92b37b
refactor: use FetchingTree for on-demand blob resolution in checkpoin…
pjbgf Mar 13, 2026
9ca78e1
refactor: remove unused hasBaseFile and simplify chunk collection in …
pjbgf Mar 17, 2026
6e36076
fix: correct stale nolint comment in CollectTranscriptBlobHashes
Soph Mar 18, 2026
b99e812
fix: log fetch-pack failure before falling back in FetchBlobsByHash
Soph Mar 18, 2026
a80bcea
fix: use git fetch instead of fetch-pack for blob fetching
Soph Mar 18, 2026
c72a3f7
refactor: extract shared decodeCheckpointInfo to deduplicate metadata…
Soph Mar 18, 2026
f92fdb6
docs: clarify why getMetadataTree opens the repo multiple times
Soph Mar 18, 2026
36176f8
docs: clarify freshRepo is always set before use in resumeFromCurrent…
Soph Mar 18, 2026
5af5ace
fix: avoid leaking git fetch output in logs and error messages
Soph Mar 18, 2026
db99d03
Merge pull request #721 from entireio/soph/jsonl-data2-feedback
Soph Mar 18, 2026
88b924a
Merge branch 'main' into pjbgf/jsonl-data2
pjbgf Mar 19, 2026
3005c7a
Remove unused resolver field and add batch blob prefetching
pjbgf Mar 19, 2026
cf64045
fix: enable blob fetching on strategy so resume works after treeless …
pjbgf Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions cmd/entire/cli/checkpoint/blob_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package checkpoint

import (
"fmt"
"io"
"strconv"
"strings"

"github.com/entireio/cli/cmd/entire/cli/agent"
"github.com/entireio/cli/cmd/entire/cli/checkpoint/id"
"github.com/entireio/cli/cmd/entire/cli/paths"

"github.com/go-git/go-git/v6/plumbing"
"github.com/go-git/go-git/v6/plumbing/object"
"github.com/go-git/go-git/v6/plumbing/storer"
)

// TranscriptBlobRef identifies a blob within a checkpoint tree on the metadata branch.
// It captures the blob hash from the tree entry without requiring the blob itself to be local.
type TranscriptBlobRef struct {
// SessionIndex is the 0-based session index within the checkpoint.
SessionIndex int

// Hash is the blob's SHA-1 hash from the tree entry.
Hash plumbing.Hash

// Path is the blob's path relative to the checkpoint directory,
// e.g. "0/full.jsonl" or "0/full.jsonl.001".
Path string
}

// BlobResolver checks blob existence and reads blobs from go-git's local
// object store (loose objects + packfiles). It performs no remote operations.
type BlobResolver struct {
storer storer.EncodedObjectStorer
}

// NewBlobResolver creates a BlobResolver backed by the given object store.
func NewBlobResolver(s storer.EncodedObjectStorer) *BlobResolver {
return &BlobResolver{storer: s}
}

// HasBlob returns true if the blob exists in the local object store.
// Checks both loose objects and packfile indices without reading blob content.
func (r *BlobResolver) HasBlob(hash plumbing.Hash) bool {
return r.storer.HasEncodedObject(hash) == nil
}

// ReadBlob reads a blob's content from the local object store.
// Returns plumbing.ErrObjectNotFound if the blob is not present locally.
func (r *BlobResolver) ReadBlob(hash plumbing.Hash) ([]byte, error) {
obj, err := r.storer.EncodedObject(plumbing.BlobObject, hash)
if err != nil {
return nil, err //nolint:wrapcheck // Propagating plumbing.ErrObjectNotFound
}

reader, err := obj.Reader()
if err != nil {
return nil, fmt.Errorf("blob reader %s: %w", hash, err)
}
defer reader.Close()

data, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("read blob %s: %w", hash, err)
}
return data, nil
}

// CollectTranscriptBlobHashes walks the metadata branch tree for a checkpoint
// and returns blob hashes for all transcript files (full.jsonl and chunks)
// across all sessions. Only reads tree objects — works after a treeless fetch
// where blobs have not been downloaded.
//
// The function navigates the sharded checkpoint directory structure:
//
// <id[:2]>/<id[2:]>/
// ├── 0/
// │ ├── full.jsonl ← collected
// │ ├── full.jsonl.001 ← collected (chunk)
// │ └── metadata.json
// ├── 1/
// │ └── full.jsonl ← collected
// └── metadata.json
func CollectTranscriptBlobHashes(tree *object.Tree, checkpointID id.CheckpointID) ([]TranscriptBlobRef, error) {
checkpointTree, err := tree.Tree(checkpointID.Path())
if err != nil {
return nil, fmt.Errorf("checkpoint tree %s: %w", checkpointID.Path(), err)
}

var refs []TranscriptBlobRef

// Enumerate session subdirectories (0, 1, 2, ...)
for i := 0; ; i++ {
sessionDir := strconv.Itoa(i)
sessionTree, treeErr := checkpointTree.Tree(sessionDir)
if treeErr != nil {
break // no more sessions
}

// Collect transcript blob hashes from tree entries.
// tree.Entries contains the direct children — no blob reads needed.
for _, entry := range sessionTree.Entries {
if entry.Name == paths.TranscriptFileName || entry.Name == paths.TranscriptFileNameLegacy {
refs = append(refs, TranscriptBlobRef{
SessionIndex: i,
Hash: entry.Hash,
Path: sessionDir + "/" + entry.Name,
})
}
// Check for chunk files (full.jsonl.001, full.jsonl.002, etc.)
if strings.HasPrefix(entry.Name, paths.TranscriptFileName+".") {
idx := agent.ParseChunkIndex(entry.Name, paths.TranscriptFileName)
if idx > 0 {
refs = append(refs, TranscriptBlobRef{
SessionIndex: i,
Hash: entry.Hash,
Path: sessionDir + "/" + entry.Name,
})
}
}
}
}

return refs, nil //nolint:nilerr // treeErr from session enumeration loop is used to break, not propagated
}
200 changes: 200 additions & 0 deletions cmd/entire/cli/checkpoint/blob_resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package checkpoint

import (
"context"
"testing"

"github.com/entireio/cli/cmd/entire/cli/checkpoint/id"

"github.com/go-git/go-git/v6/plumbing"
)

func TestBlobResolver_HasBlob_Present(t *testing.T) {
t.Parallel()

repo, store, cpID := setupRepoForUpdate(t)

// Get the metadata branch tree
tree, err := store.getSessionsBranchTree()
if err != nil {
t.Fatalf("getSessionsBranchTree() error = %v", err)
}

// Navigate to the transcript blob via tree entries
refs, err := CollectTranscriptBlobHashes(tree, cpID)
if err != nil {
t.Fatalf("CollectTranscriptBlobHashes() error = %v", err)
}
if len(refs) == 0 {
t.Fatal("expected at least one transcript blob ref")
}

resolver := NewBlobResolver(repo.Storer)

// Blob should exist — it was written by WriteCommitted
if !resolver.HasBlob(refs[0].Hash) {
t.Errorf("HasBlob(%s) = false, want true (blob was written locally)", refs[0].Hash)
}
}

func TestBlobResolver_HasBlob_Missing(t *testing.T) {
t.Parallel()

repo, _, _ := setupRepoForUpdate(t)
resolver := NewBlobResolver(repo.Storer)

// Random hash that doesn't exist
fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
if resolver.HasBlob(fakeHash) {
t.Error("HasBlob(fake) = true, want false")
}
}

func TestBlobResolver_ReadBlob(t *testing.T) {
t.Parallel()

repo, store, cpID := setupRepoForUpdate(t)

tree, err := store.getSessionsBranchTree()
if err != nil {
t.Fatalf("getSessionsBranchTree() error = %v", err)
}

refs, err := CollectTranscriptBlobHashes(tree, cpID)
if err != nil {
t.Fatalf("CollectTranscriptBlobHashes() error = %v", err)
}
if len(refs) == 0 {
t.Fatal("expected at least one transcript blob ref")
}

resolver := NewBlobResolver(repo.Storer)

data, err := resolver.ReadBlob(refs[0].Hash)
if err != nil {
t.Fatalf("ReadBlob() error = %v", err)
}
if len(data) == 0 {
t.Error("ReadBlob() returned empty data")
}
// The transcript content from setupRepoForUpdate
if string(data) != "provisional transcript line 1\n" {
t.Errorf("ReadBlob() = %q, want %q", string(data), "provisional transcript line 1\n")
}
}

func TestBlobResolver_ReadBlob_Missing(t *testing.T) {
t.Parallel()

repo, _, _ := setupRepoForUpdate(t)
resolver := NewBlobResolver(repo.Storer)

fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef")
_, err := resolver.ReadBlob(fakeHash)
if err == nil {
t.Error("ReadBlob(fake) should return error")
}
}

func TestCollectTranscriptBlobHashes_SingleSession(t *testing.T) {
t.Parallel()

_, store, cpID := setupRepoForUpdate(t)

tree, err := store.getSessionsBranchTree()
if err != nil {
t.Fatalf("getSessionsBranchTree() error = %v", err)
}

refs, err := CollectTranscriptBlobHashes(tree, cpID)
if err != nil {
t.Fatalf("CollectTranscriptBlobHashes() error = %v", err)
}

if len(refs) != 1 {
t.Fatalf("expected 1 transcript ref, got %d", len(refs))
}

ref := refs[0]
if ref.SessionIndex != 0 {
t.Errorf("SessionIndex = %d, want 0", ref.SessionIndex)
}
if ref.Hash.IsZero() {
t.Error("Hash should not be zero")
}
if ref.Path != "0/full.jsonl" {
t.Errorf("Path = %q, want %q", ref.Path, "0/full.jsonl")
}
}

func TestCollectTranscriptBlobHashes_MultiSession(t *testing.T) {
t.Parallel()

repo, store, cpID := setupRepoForUpdate(t)

// Write a second session to the same checkpoint
err := store.WriteCommitted(context.Background(), WriteCommittedOptions{
CheckpointID: cpID,
SessionID: "session-002",
Strategy: "manual-commit",
Transcript: []byte("second session transcript\n"),
Prompts: []string{"second prompt"},
AuthorName: "Test",
AuthorEmail: "test@test.com",
})
if err != nil {
t.Fatalf("WriteCommitted() for second session error = %v", err)
}

tree, err := store.getSessionsBranchTree()
if err != nil {
t.Fatalf("getSessionsBranchTree() error = %v", err)
}

refs, err := CollectTranscriptBlobHashes(tree, cpID)
if err != nil {
t.Fatalf("CollectTranscriptBlobHashes() error = %v", err)
}

if len(refs) != 2 {
t.Fatalf("expected 2 transcript refs, got %d", len(refs))
}

// Verify session indices
if refs[0].SessionIndex != 0 {
t.Errorf("refs[0].SessionIndex = %d, want 0", refs[0].SessionIndex)
}
if refs[1].SessionIndex != 1 {
t.Errorf("refs[1].SessionIndex = %d, want 1", refs[1].SessionIndex)
}

// Verify they have different hashes (different transcript content)
if refs[0].Hash == refs[1].Hash {
t.Error("multi-session refs should have different blob hashes")
}

// Verify all blobs exist locally
resolver := NewBlobResolver(repo.Storer)
for i, ref := range refs {
if !resolver.HasBlob(ref.Hash) {
t.Errorf("session %d blob %s should be present locally", i, ref.Hash)
}
}
}

func TestCollectTranscriptBlobHashes_NonexistentCheckpoint(t *testing.T) {
t.Parallel()

_, store, _ := setupRepoForUpdate(t)

tree, err := store.getSessionsBranchTree()
if err != nil {
t.Fatalf("getSessionsBranchTree() error = %v", err)
}

fakeID := id.MustCheckpointID("ffffffffffff")
_, err = CollectTranscriptBlobHashes(tree, fakeID)
if err == nil {
t.Error("expected error for nonexistent checkpoint")
}
}
Loading
Loading