Skip to content

Commit 4040bca

Browse files
authored
Merge pull request #6 from rusq/refactor
Refactor spaghetti file downloader. Make it lasagna.
2 parents f3e49da + 9302a81 commit 4040bca

File tree

4 files changed

+116
-162
lines changed

4 files changed

+116
-162
lines changed

files.go

Lines changed: 61 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (sd *SlackDumper) filesFromMessages(m []Message) []slack.File {
4141
return files
4242
}
4343

44-
// SaveFileTo saves file to the specified directory
44+
// SaveFileTo saves file to the specified directory.
4545
func (sd *SlackDumper) SaveFileTo(dir string, f *slack.File) (int64, error) {
4646
filePath := filepath.Join(dir, filename(f))
4747
file, err := os.Create(filePath)
@@ -62,68 +62,78 @@ func filename(f *slack.File) string {
6262
return fmt.Sprintf("%s-%s", f.ID, f.Name)
6363
}
6464

65-
func (sd *SlackDumper) fileDownloader(dir string, files <-chan *slack.File, done chan<- bool) error {
66-
const parallelDls = 4
65+
// fileDownloader will downloadstarts an sd.numDownloaders goroutines to
66+
// download files in parallel. It will download any files that were received on toDownload channel,
67+
// and will close "done" once all downloads are complete.
68+
func (sd *SlackDumper) fileDownloader(dir string, toDownload <-chan *slack.File) (chan struct{}, error) {
69+
done := make(chan struct{})
6770

68-
var wg sync.WaitGroup
69-
dlQ := make(chan *slack.File)
70-
stop := make(chan bool)
71-
72-
// downloaded contains file ids that already been downloaded,
73-
// so we don't download the same file twice
74-
downloaded := make(map[string]bool)
75-
76-
defer close(done)
71+
if !sd.options.dumpfiles {
72+
// terminating if dumpfiles is not enabled.
73+
close(done)
74+
return done, nil
75+
}
7776

7877
if err := os.Mkdir(dir, 0777); err != nil {
7978
if !os.IsExist(err) {
8079
// channels done is closed by defer
81-
return err
80+
return done, err
8281
}
8382
}
8483

85-
worker := func(fs <-chan *slack.File) {
86-
LOOP:
87-
for {
88-
select {
89-
case file := <-fs:
90-
// download file
91-
log.Printf("saving %s, size: %d", filename(file), file.Size)
92-
n, err := sd.SaveFileTo(dir, file)
93-
if err != nil {
94-
log.Printf("error saving %q: %s", filename(file), err)
95-
}
96-
log.Printf("file %s saved: %d bytes written", filename(file), n)
97-
case <-stop:
98-
break LOOP
99-
}
84+
var wg sync.WaitGroup
85+
go func() {
86+
// create workers
87+
for i := 0; i < sd.options.workers; i++ {
88+
wg.Add(1)
89+
go func() {
90+
sd.worker(dir, seenFilter(toDownload))
91+
wg.Done()
92+
}()
10093
}
101-
wg.Done()
102-
}
94+
}()
10395

104-
// create workers
105-
for i := 0; i < parallelDls; i++ {
106-
wg.Add(1)
107-
go worker(dlQ)
108-
}
96+
// sentinel
97+
go func() {
98+
wg.Wait()
99+
close(done)
100+
}()
101+
102+
return done, nil
103+
}
109104

110-
// files queue must be closed on the sender side (see DumpToDir.(1))
111-
for f := range files {
112-
_, ok := downloaded[f.ID]
113-
if ok {
114-
log.Printf("already seen %s, skipping", filename(f))
115-
continue
105+
func (sd *SlackDumper) worker(dir string, filesC <-chan *slack.File) {
106+
for file := range filesC {
107+
// download file
108+
log.Printf("saving %s, size: %d", filename(file), file.Size)
109+
n, err := sd.SaveFileTo(dir, file)
110+
if err != nil {
111+
log.Printf("error saving %q: %s", filename(file), err)
116112
}
117-
dlQ <- f
118-
downloaded[f.ID] = true
113+
log.Printf("file %s saved: %d bytes written", filename(file), n)
119114
}
115+
}
120116

121-
// closing stop will terminate all workers (1)
122-
close(stop)
123-
// workers mark all WorkGroups as done (2)
124-
wg.Wait()
125-
// we send the signal to caller that we're done too
126-
done <- true
127-
128-
return nil
117+
// seenFilter filters the files from filesC to ensure that no duplicates
118+
// are downloaded.
119+
func seenFilter(filesC <-chan *slack.File) <-chan *slack.File {
120+
dlQ := make(chan *slack.File)
121+
go func() {
122+
// closing stop will lead to all worker goroutines to terminate.
123+
defer close(dlQ)
124+
125+
// seen contains file ids that already been seen,
126+
// so we don't download the same file twice
127+
seen := make(map[string]bool)
128+
// files queue must be closed by the caller (see DumpToDir.(1))
129+
for f := range filesC {
130+
if _, ok := seen[f.ID]; ok {
131+
log.Printf("already seen %s, skipping", filename(f))
132+
continue
133+
}
134+
seen[f.ID] = true
135+
dlQ <- f
136+
}
137+
}()
138+
return dlQ
129139
}

go.mod

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
module github.com/rusq/slackdump
22

3-
go 1.13
3+
go 1.17
44

55
require (
66
github.com/joho/godotenv v1.4.0
77
github.com/pkg/errors v0.9.1
88
github.com/slack-go/slack v0.9.5
9-
github.com/stretchr/testify v1.4.0 // indirect
9+
github.com/stretchr/testify v1.4.0
1010
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11
1111
)
1212

13+
require (
14+
github.com/davecgh/go-spew v1.1.1 // indirect
15+
github.com/gorilla/websocket v1.4.2 // indirect
16+
github.com/pmezard/go-difflib v1.0.0 // indirect
17+
gopkg.in/yaml.v2 v2.2.2 // indirect
18+
)
19+
1320
replace github.com/slack-go/slack => github.com/rusq/slack v0.9.6

slackdump.go

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"log"
8+
"runtime"
89
"sort"
910

1011
"github.com/slack-go/slack"
@@ -25,6 +26,7 @@ type SlackDumper struct {
2526

2627
type options struct {
2728
dumpfiles bool
29+
workers int
2830
}
2931

3032
var allChanTypes = []string{"mpim", "im", "public_channel", "private_channel"}
@@ -42,12 +44,28 @@ func DumpFiles(b bool) Option {
4244
}
4345
}
4446

47+
const defNumWorkers = 4 // seems reasonable
48+
49+
// NumWorkers allows to set the number of file download workers. n should be in
50+
// range [1, NumCPU]. If not in range, will be reset to a defNumWorkers number,
51+
// which seems reasonable.
52+
func NumWorkers(n int) Option {
53+
return func(sd *SlackDumper) {
54+
if n < 1 || runtime.NumCPU() < n {
55+
n = defNumWorkers
56+
}
57+
sd.options.workers = n
58+
}
59+
}
60+
4561
// New creates new client and populates the internal cache of users and channels
4662
// for lookups.
4763
func New(ctx context.Context, token string, cookie string, opts ...Option) (*SlackDumper, error) {
48-
var err error
4964
sd := &SlackDumper{
5065
client: slack.New(token, slack.OptionCookie(cookie)),
66+
options: options{
67+
workers: defNumWorkers,
68+
},
5169
}
5270
for _, opt := range opts {
5371
opt(sd)
@@ -59,6 +77,7 @@ func New(ctx context.Context, token string, cookie string, opts ...Option) (*Sla
5977

6078
go func() {
6179
defer close(errC)
80+
6281
var err error
6382
chanTypes := allChanTypes
6483
log.Println("> caching channels, might take a while...")
@@ -73,7 +92,7 @@ func New(ctx context.Context, token string, cookie string, opts ...Option) (*Sla
7392
return nil, fmt.Errorf("error fetching users: %s", err)
7493
}
7594

76-
if err = <-errC; err != nil {
95+
if err := <-errC; err != nil {
7796
return nil, fmt.Errorf("error fetching channels: %s", err)
7897
}
7998

@@ -93,43 +112,36 @@ func (sd *SlackDumper) IsDeletedUser(id string) bool {
93112

94113
// DumpMessages fetches messages from the conversation identified by channelID.
95114
func (sd *SlackDumper) DumpMessages(ctx context.Context, channelID string) (*Channel, error) {
115+
var filesC = make(chan *slack.File, 20)
96116

97-
params := &slack.GetConversationHistoryParameters{
98-
ChannelID: channelID,
99-
}
100-
101-
filesC := make(chan *slack.File, 20)
102-
done := make(chan bool)
103-
errC := make(chan error, 1)
104-
105-
if sd.options.dumpfiles {
106-
go func() {
107-
errC <- sd.fileDownloader(channelID, filesC, done)
108-
}()
117+
dlDoneC, err := sd.fileDownloader(channelID, filesC)
118+
if err != nil {
119+
return nil, err
109120
}
110121

111122
limiter := newLimiter(tier3)
112-
var messages []Message
113123

124+
var (
125+
messages []Message
126+
cursor string
127+
)
114128
for i := 1; ; i++ {
115-
select {
116-
case err := <-errC:
117-
// stop the goroutine gracefully if it's running
118-
close(filesC)
119-
<-done
120-
return nil, err
121-
default:
122-
}
123-
resp, err := sd.client.GetConversationHistoryContext(ctx, params)
129+
resp, err := sd.client.GetConversationHistoryContext(
130+
ctx,
131+
&slack.GetConversationHistoryParameters{
132+
ChannelID: channelID,
133+
Cursor: cursor,
134+
},
135+
)
124136
if err != nil {
125137
return nil, err
126138
}
127-
chunk := sd.convertMsgs(resp.Messages)
128139

140+
chunk := sd.convertMsgs(resp.Messages)
129141
if err := sd.populateThreads(ctx, chunk, channelID, limiter); err != nil {
130142
return nil, err
131143
}
132-
sd.extractFiles(filesC, chunk)
144+
sd.pipeFiles(filesC, chunk)
133145
messages = append(messages, chunk...)
134146

135147
log.Printf("request #%d, fetched: %d, total: %d\n",
@@ -139,7 +151,7 @@ func (sd *SlackDumper) DumpMessages(ctx context.Context, channelID string) (*Cha
139151
break
140152
}
141153

142-
params.Cursor = resp.ResponseMetaData.NextCursor
154+
cursor = resp.ResponseMetaData.NextCursor
143155

144156
limiter.Wait(ctx)
145157
}
@@ -150,7 +162,7 @@ func (sd *SlackDumper) DumpMessages(ctx context.Context, channelID string) (*Cha
150162

151163
if sd.options.dumpfiles {
152164
close(filesC)
153-
<-done
165+
<-dlDoneC
154166
}
155167

156168
return &Channel{Messages: messages, ID: channelID}, nil
@@ -165,21 +177,22 @@ func (sd *SlackDumper) convertMsgs(sm []slack.Message) []Message {
165177
return msgs
166178
}
167179

168-
// extractFiles scans the messages and sends all the files discovered to the filesC.
169-
func (sd *SlackDumper) extractFiles(filesC chan<- *slack.File, msgs []Message) {
180+
// pipeFiles scans the messages and sends all the files discovered to the filesC.
181+
func (sd *SlackDumper) pipeFiles(filesC chan<- *slack.File, msgs []Message) {
170182
if !sd.options.dumpfiles {
171183
return
172184
}
173185
// place files in download queue
174-
chunk := sd.filesFromMessages(msgs)
175-
for i := range chunk {
176-
filesC <- &chunk[i]
186+
fileChunk := sd.filesFromMessages(msgs)
187+
for i := range fileChunk {
188+
filesC <- &fileChunk[i]
177189
}
178190
}
179191

180192
// populateThreads scans the message slice for threads, if and when it
181193
// discovers the message with ThreadTimestamp, it fetches all messages in that
182194
// thread updating them to the msgs slice.
195+
//
183196
// ref: https://api.slack.com/messaging/retrieving
184197
func (sd *SlackDumper) populateThreads(ctx context.Context, msgs []Message, channelID string, l *rate.Limiter) error {
185198
for i := range msgs {

0 commit comments

Comments
 (0)