Skip to content

Commit

Permalink
my life is so successful, i got everything a man could ever need
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 18, 2023
1 parent 747993e commit 6443647
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func (cv *Conversations) finalise(ctx context.Context, channelID string) error {
return err
}
cv.mu.Lock()
defer cv.mu.Unlock()
delete(cv.cw, channelID)
cv.mu.Unlock()
if cv.onFinalise != nil {
return cv.onFinalise(channelID)
}
Expand Down
2 changes: 0 additions & 2 deletions cmd/slackdump/internal/export/expproc/expproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,3 @@
//
// GOOD LUCK DEBUGGING THIS.
package expproc

const ext = ".jsonl.gz"
140 changes: 109 additions & 31 deletions cmd/slackdump/internal/export/expproc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,52 @@ import (
"path/filepath"
"runtime/trace"
"sort"
"sync/atomic"
"sync"
"time"

"github.com/rusq/dlog"
"github.com/rusq/fsadapter"
"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/export"
"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/osext"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/types"
"github.com/slack-go/slack"
)

// Transform is a trasnformer that takes the chunks produced by the processor
// and transforms them into a Slack Export format. It is sutable for async
// processing, in which case, OnFinalise function is passed to the processor,
// and the finalisation requests will be queued (up to a certain limit) and
// will be processed once Start or StartWithUsers is called.
//
// Please note, that transform requires users to be passed either through
// options or through StartWithUsers. If users are not passed, the transform
// will fail.
//
// The asynchronous pattern to run the transform is as follows:
//
// 1. Create the transform instance.
// 2. Defer its Close method.
// 3. In goroutine: Start user processing, and in the same goroutine, after
// all users are fetched, call [Transform.StartWithUsers], passing the
// fetched users slice.
// 4. In another goroutine, start the Export Conversation processor, passsing
// the transformer's OnFinalise function as the finaliser option. It will
// be called by export processor for each channel that was completed.
//
// TODO: Chunk channels and index generation here.
type Transform struct {
srcdir string // source directory with chunks.
fsa fsadapter.FS // target file system adapter.
ids chan string // channel used to pass channel IDs to the worker.
done chan struct{}
srcdir string // source directory with chunks.
fsa fsadapter.FS // target file system adapter.
ids chan string // channel used to pass channel IDs to the worker.
done chan struct{}
err chan error // error channel used to propagate errors to the main thread.

mu sync.RWMutex // protects the following fields.
users []slack.User // list of users.
err chan error // error channel used to propagate errors to the main thread.
started atomic.Bool
started bool
}

type TfOption func(*Transform)
Expand All @@ -40,7 +65,7 @@ func WithBufferSize(n int) TfOption {
}
}

// WithUsers allows to pass a list of users to the transformer.
// WithUsers allows to pass a list of users to the transform.
func WithUsers(users []slack.User) TfOption {
return func(t *Transform) {
t.users = users
Expand Down Expand Up @@ -72,51 +97,68 @@ func (t *Transform) WriteUsers(users []slack.User) error {
return t.writeUsers(users)
}

// writeUsers writes the list of users to the file system adapter.
func (t *Transform) writeUsers(users []slack.User) error {
f, err := t.fsa.Create("users.json")
if err != nil {
return err
}
defer f.Close()
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
return enc.Encode(users)
}

// Start starts the Transform processor with the provided list of users.
// Users are used to populate each message with the user profile, as per Slack
// original export format.
func (t *Transform) StartWithUsers(ctx context.Context, users []slack.User) error {
if users == nil {
return errors.New("users list is nil")
}
t.mu.Lock()
t.users = users
t.mu.Unlock()
return t.Start(ctx)
}

// writeUsers writes the list of users to the file system adapter.
func (t *Transform) writeUsers(users []slack.User) error {
f, err := t.fsa.Create("users.json")
if err != nil {
return err
}
defer f.Close()
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
return enc.Encode(users)
func (t *Transform) hasUsers() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.users != nil
}

// Start starts the Transform processor, the users must have been initialised
// with the WithUsers option. Otherwise, use StartWithUsers method.
// If the processor is already started, it will return nil.
func (t *Transform) Start(ctx context.Context) error {
if t.users == nil {
if t.IsRunning() {
return nil
}
dlog.Debugln("transform: starting transform")
if !t.hasUsers() {
return errors.New("internal error: users not initialised")
}

t.mu.Lock()
defer t.mu.Unlock()

t.ids = make(chan string)
t.done = make(chan struct{})
t.err = make(chan error, 1)

t.started.Store(false)
t.started = true
go t.worker(ctx)
return nil
}

// OnFinalise is the function that should be passed to the Channel processor.
// It will not block if the internal buffer is full. Buffer size can be
// set with the WithBufferSize option.
// set with the WithBufferSize option. The caller is allowed to call OnFinalise
// even if the processor is not started, in which case the channel ID will
// be queued for processing once the processor is started.
func (t *Transform) OnFinalise(channelID string) error {
if !t.started.Load() {
return errors.New("transformer not started")
}
dlog.Debugln("transform: placing channel in the queue", channelID)
select {
case err := <-t.err:
return err
Expand All @@ -127,8 +169,11 @@ func (t *Transform) OnFinalise(channelID string) error {
}

func (t *Transform) worker(ctx context.Context) {
dlog.Debugln("transform: worker started")
for id := range t.ids {
dlog.Debugf("transform: transforming channel %s", id)
if err := transform(ctx, t.fsa, t.srcdir, id, t.users); err != nil {
dlog.Debugf("transform: error transforming channel %s: %s", id, err)
t.err <- err
continue
}
Expand All @@ -140,6 +185,7 @@ func (t *Transform) worker(ctx context.Context) {
// guaranteed that OnFinish will not be called anymore, otherwise the
// call to OnFinish will panic.
func (t *Transform) Close() error {
dlog.Debugln("transform: closing transform")
t.Stop()
return nil
}
Expand All @@ -151,15 +197,28 @@ func (t *Transform) Close() error {
// Stop MUST be called before writing the user list and other things for it
// not to interfere with the worker.
func (t *Transform) Stop() {
if !t.started.Load() {
if !t.IsRunning() {
return
}

dlog.Debugln("transform: stopping transform")

t.mu.Lock()
defer t.mu.Unlock()

close(t.ids)
dlog.Debugln("transform: waiting for workers to finish")
<-t.done
t.started.Store(false)
t.started = false
}

// transform is the chunk file transformer. It transforms the chunk file for
func (t *Transform) IsRunning() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.started
}

// transform is the chunk file transform. It transforms the chunk file for
// the channel with ID into a slack export format, and attachments are placed
// into the relevant directory. It expects the chunk file to be in the
// srcdir/id.json.gz file, and the attachments to be in the srcdir/id
Expand All @@ -171,7 +230,10 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
trace.Logf(ctx, "input", "len(users)=%d", len(users))
lg.Debugf("transforming channel %s, user len=%d", id, len(users))

cd := chunk.OpenDir(srcdir)
cd, err := chunk.OpenDir(srcdir)
if err != nil {
return err
}

// load the chunk file
cf, err := cd.Open(id)
Expand All @@ -192,13 +254,15 @@ func transform(ctx context.Context, fsa fsadapter.FS, srcdir string, id string,
return nil
}

// channelName returns the channel name, or the channel ID if it is a DM.
func channelName(ch *slack.Channel) string {
if ch.IsIM {
return ch.ID
}
return ch.Name
}

// writeMessages writes the messages to the file system adapter.
func writeMessages(ctx context.Context, fsa fsadapter.FS, pl *chunk.File, ci *slack.Channel, users []slack.User) error {
uidx := types.Users(users).IndexByID()
trgdir := channelName(ci)
Expand All @@ -210,6 +274,7 @@ func writeMessages(ctx context.Context, fsa fsadapter.FS, pl *chunk.File, ci *sl
if err := pl.Sorted(ctx, false, func(ts time.Time, m *slack.Message) error {
date := ts.Format("2006-01-02")
if date != prevDt || prevDt == "" {
// if we have advanced to the next date, switch to a new file.
if wc != nil {
if err := writeJSONFooter(wc); err != nil {
return err
Expand Down Expand Up @@ -261,13 +326,23 @@ func writeMessages(ctx context.Context, fsa fsadapter.FS, pl *chunk.File, ci *sl
return nil
}

// toExportMessage converts a slack message to an export message.
// toExportMessage converts a slack message m to an export message, populating
// the fields that are not present in the original message. To populate the
// count of replies and reply users on a lead message of a thread, it needs
// "thread". If m is not a parent message (m.ts != m.thread_ts) for the
// thread or thread is nil, it is ignored (this follows the original Slack
// Export logic). user is the poster's user information. Original Slack
// Export adds the "profile" field on each message with basic profile
// information about the poster.
func toExportMessage(m *slack.Message, thread []slack.Message, user *slack.User) *export.ExportMessage {
// export message
em := export.ExportMessage{
Msg: &m.Msg,
UserTeam: m.Team,
SourceTeam: m.Team,
}

// add user profile
if user != nil && !user.IsBot {
em.UserProfile = &export.ExportUserProfile{
AvatarHash: "",
Expand All @@ -281,7 +356,9 @@ func toExportMessage(m *slack.Message, thread []slack.Message, user *slack.User)
IsUltraRestricted: user.IsUltraRestricted,
}
}
if len(thread) > 0 {

// add thread information if it is a lead message of a thread.
if m.Timestamp == m.ThreadTimestamp && len(thread) > 0 {
em.Replies = make([]slack.Reply, 0, len(thread))
for _, rm := range thread {
em.Replies = append(em.Replies, slack.Reply{
Expand All @@ -304,6 +381,7 @@ func toExportMessage(m *slack.Message, thread []slack.Message, user *slack.User)
makeUniqueStrings(&em.ReplyUsers)
em.ReplyUsersCount = len(em.ReplyUsers)
}

return &em
}

Expand Down
1 change: 1 addition & 0 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
go func() {
defer wg.Done()
defer pb.Finish()
defer conv.Close()
errC <- conversationWorker(ctx, s, conv, pb, linkC)
}()
}
Expand Down

0 comments on commit 6443647

Please sign in to comment.