From 4d4b50b3bca21ac7d707bcc2d9a3ac3eb2568cf6 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Mon, 24 Apr 2023 12:33:12 +1000 Subject: [PATCH] extract user related code to a user worker --- cmd/slackdump/internal/export/v3.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/cmd/slackdump/internal/export/v3.go b/cmd/slackdump/internal/export/v3.go index e9dfc2f4..e10b5f50 100644 --- a/cmd/slackdump/internal/export/v3.go +++ b/cmd/slackdump/internal/export/v3.go @@ -101,19 +101,10 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li wg.Add(1) go func() { defer wg.Done() - if err := userWorker(ctx, s, tmpdir); err != nil { + if err := userWorker(ctx, s, tmpdir, chunkdir, tf); err != nil { errC <- ExportError{"user", "worker", err} return } - users, err := chunkdir.Users() // load users from chunks - if err != nil { - errC <- ExportError{"user", "load users", err} - return - } - if err := tf.StartWithUsers(ctx, users); err != nil { - errC <- ExportError{"user", "start transformer", err} - return - } }() } // conversations goroutine @@ -225,7 +216,7 @@ func genAPIChannel(s *slackdump.Stream, tmpdir string, memberOnly bool) linkFeed } } -func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error { +func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string, chunkdir *chunk.Directory, tf *expproc.Transform) error { userproc, err := expproc.NewUsers(tmpdir) if err != nil { return err @@ -239,6 +230,13 @@ func userWorker(ctx context.Context, s *slackdump.Stream, tmpdir string) error { return fmt.Errorf("error closing user processor: %w", err) } logger.FromContext(ctx).Debug("users done") + users, err := chunkdir.Users() // load users from chunks + if err != nil { + return fmt.Errorf("error loading users: %w", err) + } + if err := tf.StartWithUsers(ctx, users); err != nil { + return fmt.Errorf("error starting the transformer: %w", err) + } return nil }