Skip to content

Commit

Permalink
extract user related code to a user worker
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 27, 2023
1 parent 3d96406 commit 4d4b50b
Showing 1 changed file with 9 additions and 11 deletions.
20 changes: 9 additions & 11 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,10 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
wg.Add(1)
go func() {
defer wg.Done()
if err := userWorker(ctx, s, tmpdir); err != nil {
if err := userWorker(ctx, s, tmpdir, chunkdir, tf); err != nil {
errC <- ExportError{"user", "worker", err}
return
}
users, err := chunkdir.Users() // load users from chunks
if err != nil {
errC <- ExportError{"user", "load users", err}
return
}
if err := tf.StartWithUsers(ctx, users); err != nil {
errC <- ExportError{"user", "start transformer", err}
return
}
}()
}
// conversations goroutine
Expand Down Expand Up @@ -225,7 +216,7 @@ func genAPIChannel(s *slackdump.Stream, tmpdir string, memberOnly bool) linkFeed
}
}

func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error {
func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string, chunkdir *chunk.Directory, tf *expproc.Transform) error {
userproc, err := expproc.NewUsers(tmpdir)
if err != nil {
return err
Expand All @@ -239,6 +230,13 @@ func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error {
return fmt.Errorf("error closing user processor: %w", err)
}
logger.FromContext(ctx).Debug("users done")
users, err := chunkdir.Users() // load users from chunks
if err != nil {
return fmt.Errorf("error loading users: %w", err)
}
if err := tf.StartWithUsers(ctx, users); err != nil {
return fmt.Errorf("error starting the transformer: %w", err)
}
return nil
}

Expand Down

0 comments on commit 4d4b50b

Please sign in to comment.