Skip to content

Commit b87858b

Browse files
authored
Merge pull request #564 from rusq/i560-export-source
Export source fixes
2 parents efbd7b7 + 53da54f commit b87858b

File tree

9 files changed

+733
-109
lines changed

9 files changed

+733
-109
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ require (
8888
github.com/muesli/cancelreader v0.2.2 // indirect
8989
github.com/muesli/termenv v0.16.0 // indirect
9090
github.com/ncruces/go-strftime v0.1.9 // indirect
91+
github.com/phuslu/lru v1.0.18 // indirect
9192
github.com/pmezard/go-difflib v1.0.0 // indirect
9293
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
9394
github.com/rivo/uniseg v0.4.7 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ github.com/muesli/termenv v0.16.0 h1:S5AlUN9dENB57rsbnkPyfdGuWIlkmzJjbFf0Tf5FWUc
177177
github.com/muesli/termenv v0.16.0/go.mod h1:ZRfOIKPFDYQoDFF4Olj7/QJbW60Ol/kL1pU3VfY/Cnk=
178178
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
179179
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
180+
github.com/phuslu/lru v1.0.18 h1:ioKRYLym7nv6UmaKHXSR0Z8s2KCEra+mcWcn9zXQnlM=
181+
github.com/phuslu/lru v1.0.18/go.mod h1:ci5hb8dRIa+2I+KcPl4958OWCg09FxwZCP8InU1L1ME=
180182
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
181183
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
182184
github.com/playwright-community/playwright-go v0.5200.0 h1:z/5LGuX2tBrg3ug1HupMXLjIG93f1d2MWdDsNhkMQ9c=

internal/convert/src2enc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/rusq/slackdump/v3/source"
1717
)
1818

19-
// Source encoder allows to convert any source to a chunked format.
19+
// SourceEncoder allows to convert any source to a chunked format.
2020
type SourceEncoder struct {
2121
src source.Sourcer
2222
enc chunk.Encoder

internal/structures/conversation.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package structures
22

33
import "github.com/rusq/slack"
44

5-
// IsThreadStart check if the message is a lead message of a thread.
5+
// IsThreadStart check if the message is a lead message of a thread and the
6+
// thread is a non-empty thread.
67
func IsThreadStart(m *slack.Message) bool {
7-
return m.ThreadTimestamp != "" && m.Timestamp == m.ThreadTimestamp
8+
return m.ThreadTimestamp != "" && m.Timestamp == m.ThreadTimestamp && !IsEmptyThread(m)
89
}
910

1011
// IsEmptyThread checks if the message is a thread with no replies.

notes.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
Issue # 560
2+
-----------
3+
4+
Convert from database to export.
5+
Convert from export to database.
6+
Run comparison.
7+
8+
Run convert of the tmp/reduced to database.
9+
```
10+
$ sd convert -f database tmp/reduced
11+
```
12+
13+
in tmp/compare, start sqlite
14+
run
15+
```sql
16+
attach database 'conv.sqlite' as conv;
17+
attach database 'orig.sqlite' as orig;
18+
```
19+
20+
Output of the missing data:
21+
```sql
22+
with missing as (select id from orig.message where id not in (select id from conv.message))
23+
select id,chunk_id,channel_id,ts,parent_id,thread_ts,is_parent from orig.message where id in (select * from missing);
24+
```
25+
26+
comparison of IDs:
27+
```sql
28+
select o.id,o.cnt,c.id,c.cnt from (select id,count(1) cnt from orig.message group by id) o left join (select id,count(1) cnt from conv.message group by id) c on o.id=c.id where o.cnt <> coalesce(c.cnt,-1);
29+
```

source/export.go

Lines changed: 190 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ import (
99
"iter"
1010
"log/slog"
1111
"path"
12+
"runtime/trace"
1213
"time"
1314

14-
"github.com/rusq/slackdump/v3/internal/chunk"
15-
1615
"github.com/rusq/slack"
1716

1817
"github.com/rusq/slackdump/v3/export"
18+
"github.com/rusq/slackdump/v3/internal/chunk"
1919
"github.com/rusq/slackdump/v3/internal/structures"
2020
)
2121

@@ -28,8 +28,13 @@ type Export struct {
2828
idx structures.ExportIndex
2929
files Storage
3030
avatars Storage
31+
cache *threadCache
3132
}
3233

34+
const cacheSz = 1 << 20
35+
36+
// OpenExport opens a Slack export with the given name from the filesystem
37+
// fsys.
3338
func OpenExport(fsys fs.FS, name string) (*Export, error) {
3439
var idx structures.ExportIndex
3540
if err := idx.Unmarshal(fsys); err != nil {
@@ -44,6 +49,7 @@ func OpenExport(fsys fs.FS, name string) (*Export, error) {
4449
chanNames: make(map[string]string, len(chans)),
4550
files: NoStorage{},
4651
avatars: NoStorage{},
52+
cache: newThreadCache(cacheSz),
4753
}
4854
// initialise channels for quick lookup
4955
for _, ch := range z.channels {
@@ -95,15 +101,46 @@ func (e *Export) Type() Flags {
95101
return FExport
96102
}
97103

104+
// buildThreadCache walks all messages in the channel with the given name and
105+
// indexes all threads for faster lookup.
106+
func (e *Export) buildThreadCache(ctx context.Context, name string) error {
107+
lg := slog.With("channel_name", name)
108+
lg.Debug("building thread cache")
109+
var n int
110+
if err := walkDir(e.fs, name, func(file string) error {
111+
if err := yieldFileContents(ctx, e.fs, file, func(m slack.Message, err error) bool {
112+
if err != nil {
113+
return false
114+
}
115+
if (structures.IsThreadStart(&m) && !structures.IsEmptyThread(&m)) || structures.IsThreadMessage(&m.Msg) {
116+
if err := e.cache.Update(ctx, name, m.ThreadTimestamp, file); err != nil {
117+
slog.ErrorContext(ctx, "error updating cache", "error", err)
118+
}
119+
n++
120+
}
121+
return true
122+
}); err != nil {
123+
return err
124+
}
125+
return nil
126+
}); err != nil {
127+
return err
128+
}
129+
slog.DebugContext(ctx, "caching completed", "thread_count", n)
130+
return nil
131+
}
132+
98133
// AllMessages returns all channel messages without thread messages.
99-
func (e *Export) AllMessages(_ context.Context, channelID string) (iter.Seq2[slack.Message, error], error) {
100-
it, err := e.walkChannelMessages(channelID)
134+
func (e *Export) AllMessages(ctx context.Context, channelID string) (iter.Seq2[slack.Message, error], error) {
135+
name, err := e.nameByID(channelID)
101136
if err != nil {
102-
if errors.Is(err, fs.ErrNotExist) {
103-
return nil, ErrNotFound
104-
}
105137
return nil, err
106138
}
139+
140+
if err := e.buildThreadCache(ctx, name); err != nil {
141+
return nil, err
142+
}
143+
it := e.walkChannelMessages(ctx, name)
107144
return func(yield func(slack.Message, error) bool) {
108145
for m, err := range it {
109146
if err != nil {
@@ -122,63 +159,164 @@ func (e *Export) AllMessages(_ context.Context, channelID string) (iter.Seq2[sla
122159
}, nil
123160
}
124161

125-
func (e *Export) walkChannelMessages(channelID string) (iter.Seq2[slack.Message, error], error) {
126-
name, ok := e.chanNames[channelID]
127-
if !ok {
128-
return nil, fmt.Errorf("%w: %s", fs.ErrNotExist, channelID)
162+
// yieldFileContents is meant to work with export json files and will call
163+
// yield function for every message in the file. It expects to be called by
164+
// fs.WalkDir function, therefore when the yield function returns false (stop
165+
// iteration), it returns `fs.SkipAll` error. If calling this function not
166+
// from the Walk function, this error indicates that file iteration should
167+
// stop.
168+
func yieldFileContents(ctx context.Context, fsys fs.FS, file string, yield func(slack.Message, error) bool) error {
169+
em, err := unmarshal[[]export.ExportMessage](fsys, file)
170+
if err != nil {
171+
var jsonErr *json.SyntaxError
172+
if errors.As(err, &jsonErr) {
173+
slog.WarnContext(ctx, "skipping broken file", "pth", file, "err", err)
174+
return nil
175+
}
176+
return err
177+
}
178+
for i, m := range em {
179+
if m.Msg == nil {
180+
slog.DebugContext(ctx, "skipping an empty message", "pth", file, "index", i)
181+
continue
182+
}
183+
sm := slack.Message{Msg: *m.Msg}
184+
if !yield(sm, nil) {
185+
return fs.SkipAll
186+
}
129187
}
130-
_, err := fs.Stat(e.fs, name)
188+
return nil
189+
}
190+
191+
// fullScanIter is the message iterator that always scans all messages and
192+
// populates the cache with discovered threads.
193+
type fullScanIter struct {
194+
ctx context.Context
195+
name string
196+
fs fs.FS
197+
}
198+
199+
func newFullScanIter(ctx context.Context, fs fs.FS, chanName string) *fullScanIter {
200+
return &fullScanIter{
201+
ctx: ctx,
202+
name: chanName,
203+
fs: fs,
204+
}
205+
}
206+
207+
// Iter iterates through all messages for the given channel name. It
208+
// updates the cache with discovered threads.
209+
func (w *fullScanIter) Iter(yield func(slack.Message, error) bool) {
210+
ctx, task := trace.NewTask(w.ctx, "full_scan_iter")
211+
defer task.End()
212+
err := walkDir(w.fs, w.name, func(file string) error {
213+
if err := yieldFileContents(ctx, w.fs, file, yield); err != nil {
214+
return err
215+
}
216+
return nil
217+
})
131218
if err != nil {
132-
return nil, fmt.Errorf("%w: %s", fs.ErrNotExist, name)
219+
yield(slack.Message{}, err)
220+
return
133221
}
134-
iterFn := func(yield func(slack.Message, error) bool) {
135-
err := fs.WalkDir(e.fs, name, func(pth string, d fs.DirEntry, err error) error {
136-
if err != nil {
137-
return err
138-
}
139-
if d.IsDir() && pth != name {
140-
return fs.SkipDir
141-
}
142-
if path.Ext(pth) != ".json" {
143-
return nil
144-
}
145-
// read the file
146-
em, err := unmarshal[[]export.ExportMessage](e.fs, pth)
147-
if err != nil {
148-
var jsonErr *json.SyntaxError
149-
if errors.As(err, &jsonErr) {
150-
slog.Default().Debug("skipping a broken file", "pth", pth, "err", err)
151-
return nil
152-
}
153-
return err
154-
}
155-
for i, m := range em {
156-
if m.Msg == nil {
157-
slog.Default().Debug("skipping an empty message", "pth", pth, "index", i)
158-
continue
159-
}
160-
sm := slack.Message{Msg: *m.Msg}
161-
if !yield(sm, nil) {
162-
return fs.SkipAll
163-
}
164-
}
165-
return nil
166-
})
222+
}
223+
224+
// walkDir walks through the directory with given name on the filesystem fsys,
225+
// calling the callback function cb for every JSON file it encounters.
226+
func walkDir(fsys fs.FS, dirName string, cb func(file string) error) error {
227+
err := fs.WalkDir(fsys, dirName, func(file string, d fs.DirEntry, err error) error {
167228
if err != nil {
229+
return err
230+
}
231+
if d.IsDir() && file != dirName {
232+
return fs.SkipDir
233+
}
234+
if path.Ext(file) != ".json" {
235+
return nil
236+
}
237+
return cb(file)
238+
})
239+
return err
240+
}
241+
242+
// fileListIter is meant to reduce the scope of iteration to the given file
243+
// list.
244+
type fileListIter struct {
245+
ctx context.Context
246+
fs fs.FS
247+
files []string
248+
}
249+
250+
func (w *fileListIter) Iter(yield func(slack.Message, error) bool) {
251+
ctx, task := trace.NewTask(w.ctx, "file_list_iter")
252+
defer task.End()
253+
for _, file := range w.files {
254+
if err := yieldFileContents(ctx, w.fs, file, yield); err != nil {
255+
if errors.Is(err, fs.SkipAll) {
256+
// bail out if instructed
257+
return
258+
}
168259
yield(slack.Message{}, err)
260+
return
169261
}
170262
}
171-
return iterFn, nil
172263
}
173264

174-
func (e *Export) AllThreadMessages(_ context.Context, channelID, threadID string) (iter.Seq2[slack.Message, error], error) {
175-
it, err := e.walkChannelMessages(channelID)
265+
// nameByID returns a channel name (directory name) by the channelID.
266+
// It ensures that the directory exists. It will return ErrNotFound
267+
// if it doesn't find the channel or it's directory.
268+
func (e *Export) nameByID(channelID string) (string, error) {
269+
name, ok := e.chanNames[channelID]
270+
if !ok {
271+
return "", fmt.Errorf("%w: %s", ErrNotFound, channelID)
272+
}
273+
if fi, err := fs.Stat(e.fs, name); err != nil {
274+
return "", fmt.Errorf("%w: %s", ErrNotFound, name)
275+
} else if !fi.IsDir() {
276+
return "", fmt.Errorf("%s is not a directory", name)
277+
}
278+
return name, nil
279+
}
280+
281+
func (e *Export) walkChannelMessages(ctx context.Context, name string) iter.Seq2[slack.Message, error] {
282+
return newFullScanIter(ctx, e.fs, name).Iter
283+
}
284+
285+
var errNotInCache = errors.New("channel not in cache")
286+
287+
func (e *Export) walkCachedThreads(ctx context.Context, channelName, threadID string) (iter.Seq2[slack.Message, error], error) {
288+
if !e.cache.Exists(channelName) {
289+
return nil, fmt.Errorf("channel: %w", errNotInCache)
290+
}
291+
// get all files for the thread.
292+
files, ok := e.cache.Get(channelName, threadID)
293+
if !ok {
294+
return nil, fmt.Errorf("thread: %w", errNotInCache)
295+
}
296+
fli := fileListIter{ctx, e.fs, files}
297+
return fli.Iter, nil
298+
}
299+
300+
// AllThreadMessages returns all thread messages for the channelID:threadID. If the thread
301+
// is contained in the cache, it will iterate only through the files that contain the thread
302+
// messages, otherwise it will iterate through all messages in the channel and extract the thread
303+
// messages. Call [buildThreadCache] for the channelID, before calling this
304+
// method to speed up search.
305+
func (e *Export) AllThreadMessages(ctx context.Context, channelID, threadID string) (iter.Seq2[slack.Message, error], error) {
306+
// try cached first
307+
name, err := e.nameByID(channelID)
176308
if err != nil {
177-
if errors.Is(err, fs.ErrNotExist) {
178-
return nil, ErrNotFound
179-
}
180309
return nil, err
181310
}
311+
lg := slog.With("channel_name", name, "channel_id", channelID, "thread_ts", threadID)
312+
it, err := e.walkCachedThreads(ctx, name, threadID)
313+
if err != nil {
314+
if !errors.Is(err, errNotInCache) {
315+
return nil, err
316+
}
317+
lg.WarnContext(ctx, "cache not available, initiating full scan", "err", err)
318+
it = e.walkChannelMessages(ctx, name)
319+
}
182320
iterFn := func(yield func(slack.Message, error) bool) {
183321
for m, err := range it {
184322
if err != nil {

0 commit comments

Comments
 (0)