Skip to content

Commit

Permalink
propagating the slack channel to Files processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 201f529 commit ed0c2a4
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 49 deletions.
35 changes: 20 additions & 15 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ import (

// Conversations is a processor that writes the channel and thread messages.
type Conversations struct {
dir string
cw map[string]*channelproc
mu sync.RWMutex
lg logger.Interface
filer processor.Filer
dir string
cw map[string]*channelproc
mu sync.RWMutex
lg logger.Interface

// flags
// fileSubproc is the files subprocessor, it is called by the Files method
// in addition to recording the files in the chunk file (if recordFiles is
// set). It it useful, when one needs to download the files directly into
// a final archive/directory, avoiding the intermediate step of
// downloading files into the temporary directory, and then using
// transform to download the files.
fileSubproc processor.Filer // files sub-processor
recordFiles bool

onFinalise func(ctx context.Context, channelID string) error
Expand Down Expand Up @@ -61,12 +66,12 @@ type channelproc struct {

// NewConversation returns the new conversation processor. filer.Files method
// will be called for each file chunk.
func NewConversation(dir string, filer processor.Filer, opts ...ConvOption) (*Conversations, error) {
func NewConversation(dir string, filesSubproc processor.Filer, opts ...ConvOption) (*Conversations, error) {
c := &Conversations{
dir: dir,
lg: logger.Default,
cw: make(map[string]*channelproc),
filer: filer,
dir: dir,
lg: logger.Default,
cw: make(map[string]*channelproc),
fileSubproc: filesSubproc,
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -173,16 +178,16 @@ func (cv *Conversations) Messages(ctx context.Context, channelID string, numThre

// Files is called for each file that is retrieved. The parent message is
// passed in as well.
func (cv *Conversations) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
if err := cv.filer.Files(ctx, channelID, parent, isThread, ff); err != nil {
func (cv *Conversations) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error {
if err := cv.fileSubproc.Files(ctx, channel, parent, isThread, ff); err != nil {
return err
}
if cv.recordFiles {
r, err := cv.recorder(channelID)
r, err := cv.recorder(channel.ID)
if err != nil {
return err
}
if err := r.Files(ctx, channelID, parent, isThread, ff); err != nil {
if err := r.Files(ctx, channel, parent, isThread, ff); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/slackdump/internal/export/expproc/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type mmfiler struct {
basefiler
}

func (mm mmfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
func (mm mmfiler) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error {
const baseDir = "__uploads"
for _, f := range ff {
if err := mm.dcl.Download(filepath.Join(baseDir, f.ID, f.Name), f.URLPrivateDownload); err != nil {
Expand All @@ -53,7 +53,7 @@ func (mm mmfiler) Files(ctx context.Context, channelID string, parent slack.Mess

type nopfiler struct{}

func (nopfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
func (nopfiler) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error {
return nil
}

Expand All @@ -62,14 +62,14 @@ type stdfiler struct {
}

// TODO: the channel name is not available, need to think how to pass it tho.
func (mm stdfiler) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
func (mm stdfiler) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error {
const baseDir = "attachments"
for _, f := range ff {
if err := mm.dcl.Download(
// TODO: this should be channel name, not id for public. Maybe
// there's no choice but to pass the channel name to the
// processor, or post-process files in transform.
filepath.Join(channelID, baseDir, fmt.Sprintf("%s-%s", f.ID, f.Name)),
filepath.Join(channelName(channel), baseDir, fmt.Sprintf("%s-%s", f.ID, f.Name)),
f.URLPrivateDownload,
); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Chunk struct {
NumThreads int `json:"nt,omitempty"`

// Channel contains the channel information. It may not be immediately
// followed by messages from the channel. Populated by ChannelInfo method.
// followed by messages from the channel. Populated by ChannelInfo and
// Files methods.
Channel *slack.Channel `json:"ci,omitempty"`

// Parent is populated in case the chunk is a thread, or a file. Populated
Expand Down
2 changes: 1 addition & 1 deletion internal/chunk/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Conversations interface {
type Filer interface {
// Files is called for each file that is retrieved. The parent message is
// passed in as well.
Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error
Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error
}

var _ Conversations = new(chunk.Recorder)
Expand Down
6 changes: 3 additions & 3 deletions internal/chunk/processor/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewStandard(ctx context.Context, w io.Writer, sess downloader.Downloader, d

// Files implements the Processor interface. It will download files if the
// dumpFiles option is enabled.
func (s *Standard) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, ff []slack.File) error {
func (s *Standard) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, ff []slack.File) error {
if !s.opts.dumpFiles {
// ignore files if requested
return nil
Expand All @@ -57,11 +57,11 @@ func (s *Standard) Files(ctx context.Context, channelID string, parent slack.Mes
trace.Logf(ctx, "skip", "unfetchable file type: %q", ff[i].ID)
continue
}
filename, err := s.dl.DownloadFile(channelID, ff[i])
filename, err := s.dl.DownloadFile(channel.ID, ff[i])
if err != nil {
return err
}
st.AddFile(channelID, ff[i].ID, filename)
st.AddFile(channel.ID, ff[i].ID, filename)
}
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions internal/chunk/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (rec *Recorder) Messages(ctx context.Context, channelID string, numThreads

// Files is called for each file chunk that is retrieved. The parent message is
// passed in as well.
func (rec *Recorder) Files(ctx context.Context, channelID string, parent slack.Message, isThread bool, f []slack.File) error {
func (rec *Recorder) Files(ctx context.Context, channel *slack.Channel, parent slack.Message, isThread bool, f []slack.File) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -106,14 +106,15 @@ func (rec *Recorder) Files(ctx context.Context, channelID string, parent slack.M
case rec.chunks <- Chunk{
Type: CFiles,
Timestamp: time.Now().UnixNano(),
ChannelID: channelID,
ChannelID: channel.ID,
Channel: channel,
Parent: &parent,
IsThread: isThread,
Count: len(f),
Files: f,
}: // ok
for i := range f {
rec.state.AddFile(channelID, f[i].ID, "")
rec.state.AddFile(channel.ID, f[i].ID, "")
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions internal/chunk/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestRecorder_Files(t *testing.T) {
}
if err := rec.Files(
ctx,
"C123",
&slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "C123"}}},
slack.Message{Msg: slack.Msg{Text: "parent"}},
true,
[]slack.File{{ID: "F123", Name: "file.txt"}},
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestRecorder_Files(t *testing.T) {
rec.errC <- errors.New("test error")
gotErr := rec.Files(
ctx,
"C123",
&slack.Channel{GroupConversation: slack.GroupConversation{Conversation: slack.Conversation{ID: "C123"}}},
slack.Message{Msg: slack.Msg{Text: "parent"}},
true,
[]slack.File{{ID: "F123", Name: "file.txt"}},
Expand Down
10 changes: 10 additions & 0 deletions internal/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"io"
"os"

"github.com/slack-go/slack"
)

// Load loads a json data into T, or panics.
Expand Down Expand Up @@ -38,3 +40,11 @@ func FilledFile(sz int) *os.File {
f.Seek(0, io.SeekStart)
return f
}

// DummyChannel is the helper function that returns a pointer to a
// slack.Channel with the given ID, that could be used in tests.
func DummyChannel(id string) *slack.Channel {
var ch slack.Channel
ch.ID = id
return &ch
}
2 changes: 1 addition & 1 deletion internal/mocks/mock_processor/mock_processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 22 additions & 18 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,20 @@ func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Convers
// channel worker
wg.Add(1)
go func() {
defer wg.Done()
cs.channelWorker(ctx, proc, resultsC, threadsC, chansC)
// we close threads here, instead of the main loop, because we want to
// close it after all the thread workers are done.
close(threadsC)
wg.Done()
trace.Log(ctx, "async", "channel worker done")
}()
}
{
// thread worker
wg.Add(1)
go func() {
defer wg.Done()
cs.threadWorker(ctx, proc, resultsC, threadsC)
wg.Done()
trace.Log(ctx, "async", "thread worker done")
}()
}
Expand Down Expand Up @@ -273,14 +273,15 @@ func (cs *Stream) channelWorker(ctx context.Context, proc processor.Conversation
if !more {
return // channel closed
}
if err := cs.channelInfo(ctx, proc, req.channelID, false); err != nil {
channel, err := cs.channelInfo(ctx, proc, req.channelID, false)
if err != nil {
results <- StreamResult{Type: RTChannel, ChannelID: req.channelID, Err: err}
}
last := false
threadCount := 0
if err := cs.channel(ctx, req.channelID, func(mm []slack.Message, isLast bool) error {
last = isLast
n, err := processChannelMessages(ctx, proc, threadC, req.channelID, isLast, mm)
n, err := processChannelMessages(ctx, proc, threadC, channel, isLast, mm)
threadCount = n
return err
}); err != nil {
Expand Down Expand Up @@ -351,15 +352,18 @@ func (cs *Stream) threadWorker(ctx context.Context, proc processor.Conversations
if !more {
return // channel closed
}
var channel = new(slack.Channel)
if req.needChanInfo {
if err := cs.channelInfo(ctx, proc, req.channelID, true); err != nil {
if _, err := cs.channelInfo(ctx, proc, req.channelID, true); err != nil {
results <- StreamResult{Type: RTThread, ChannelID: req.channelID, ThreadTS: req.threadTS, Err: err}
}
} else {
channel.ID = req.channelID
}
var last bool
if err := cs.thread(ctx, req.channelID, req.threadTS, func(msgs []slack.Message, isLast bool) error {
last = isLast
return processThreadMessages(ctx, proc, req.channelID, req.threadTS, isLast, msgs)
return processThreadMessages(ctx, proc, channel, req.threadTS, isLast, msgs)
}); err != nil {
results <- StreamResult{Type: RTThread, ChannelID: req.channelID, ThreadTS: req.threadTS, Err: err}
}
Expand Down Expand Up @@ -419,7 +423,7 @@ func (cs *Stream) thread(ctx context.Context, id string, threadTS string, fn fun
// processChannelMessages processes the messages in the channel and sends
// thread requests for the threads in the channel, if it discovers messages
// with threads. It returns thread count in the mm and error if any.
func processChannelMessages(ctx context.Context, proc processor.Conversations, threadC chan<- threadRequest, channelID string, isLast bool, mm []slack.Message) (int, error) {
func processChannelMessages(ctx context.Context, proc processor.Conversations, threadC chan<- threadRequest, channel *slack.Channel, isLast bool, mm []slack.Message) (int, error) {
lg := logger.FromContext(ctx)

var trs = make([]threadRequest, 0, len(mm))
Expand All @@ -430,16 +434,16 @@ func processChannelMessages(ctx context.Context, proc processor.Conversations, t
// start processing the channel and will have the initial reference
// count, if it needs it.
if mm[i].Msg.ThreadTimestamp != "" && mm[i].Msg.SubType != "thread_broadcast" && mm[i].LatestReply != structures.NoRepliesLatestReply {
lg.Debugf("- message #%d/channel=%s,thread: id=%s, thread_ts=%s", i, channelID, mm[i].Timestamp, mm[i].Msg.ThreadTimestamp)
trs = append(trs, threadRequest{channelID: channelID, threadTS: mm[i].Msg.ThreadTimestamp})
lg.Debugf("- message #%d/channel=%s,thread: id=%s, thread_ts=%s", i, channel.ID, mm[i].Timestamp, mm[i].Msg.ThreadTimestamp)
trs = append(trs, threadRequest{channelID: channel.ID, threadTS: mm[i].Msg.ThreadTimestamp})
}
if len(mm[i].Files) > 0 {
if err := proc.Files(ctx, channelID, mm[i], false, mm[i].Files); err != nil {
if err := proc.Files(ctx, channel, mm[i], false, mm[i].Files); err != nil {
return len(trs), err
}
}
}
if err := proc.Messages(ctx, channelID, len(trs), isLast, mm); err != nil {
if err := proc.Messages(ctx, channel.ID, len(trs), isLast, mm); err != nil {
return 0, fmt.Errorf("failed to process message chunk starting with id=%s (size=%d): %w", mm[0].Msg.Timestamp, len(mm), err)
}
for _, tr := range trs {
Expand All @@ -448,25 +452,25 @@ func processChannelMessages(ctx context.Context, proc processor.Conversations, t
return len(trs), nil
}

func processThreadMessages(ctx context.Context, proc processor.Conversations, channelID, threadTS string, isLast bool, msgs []slack.Message) error {
func processThreadMessages(ctx context.Context, proc processor.Conversations, channel *slack.Channel, threadTS string, isLast bool, msgs []slack.Message) error {
// extract files from thread messages
for _, m := range msgs[1:] {
if len(m.Files) > 0 {
if err := proc.Files(ctx, channelID, m, true, m.Files); err != nil {
if err := proc.Files(ctx, channel, m, true, m.Files); err != nil {
return err
}
}
}
// slack returns the thread starter as the first message with every
// call, so we use it as a parent message.
if err := proc.ThreadMessages(ctx, channelID, msgs[0], isLast, msgs[1:]); err != nil {
if err := proc.ThreadMessages(ctx, channel.ID, msgs[0], isLast, msgs[1:]); err != nil {
return fmt.Errorf("failed to process thread message id=%s, thread_ts=%s: %w", msgs[0].Msg.Timestamp, threadTS, err)
}
return nil
}

// channelInfo fetches the channel info and passes it to the processor.
func (cs *Stream) channelInfo(ctx context.Context, proc processor.Conversations, channelID string, isThread bool) error {
func (cs *Stream) channelInfo(ctx context.Context, proc processor.Conversations, channelID string, isThread bool) (*slack.Channel, error) {
ctx, task := trace.NewTask(ctx, "channelInfo")
defer task.End()

Expand All @@ -480,12 +484,12 @@ func (cs *Stream) channelInfo(ctx context.Context, proc processor.Conversations,
})
return err
}); err != nil {
return err
return nil, err
}
if err := proc.ChannelInfo(ctx, info, isThread); err != nil {
return err
return nil, err
}
return nil
return info, nil
}

// WorkspaceInfo fetches the workspace info and passes it to the processor.
Expand Down
2 changes: 1 addition & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func Test_processThreadMessages(t *testing.T) {
Files(gomock.Any(), "CTM1", testThread[2], true, testThread[2].Files).
Return(nil)

if err := processThreadMessages(context.Background(), mproc, "CTM1", testThread[0].ThreadTimestamp, true, testThread); err != nil {
if err := processThreadMessages(context.Background(), mproc, fixtures.DummyChannel("CTM1"), testThread[0].ThreadTimestamp, true, testThread); err != nil {
t.Fatal(err)
}
})
Expand Down

0 comments on commit ed0c2a4

Please sign in to comment.