diff --git a/internal/chunk/control/workers.go b/internal/chunk/control/workers.go index d0e614f8..16254e52 100644 --- a/internal/chunk/control/workers.go +++ b/internal/chunk/control/workers.go @@ -11,14 +11,18 @@ import ( "github.com/rusq/slackdump/v2/internal/chunk/processor/dirproc" "github.com/rusq/slackdump/v2/internal/chunk/transform" "github.com/rusq/slackdump/v2/logger" + "github.com/slack-go/slack" ) func userWorker(ctx context.Context, s Streamer, chunkdir *chunk.Directory, tf TransformStarter) error { - userproc, err := dirproc.NewUsers(chunkdir) + var users = make([]slack.User, 0, 100) + userproc, err := dirproc.NewUsers(chunkdir, dirproc.WithUsers(func(us []slack.User) error { + users = append(users, us...) + return nil + })) if err != nil { return err } - if err := s.Users(ctx, userproc); err != nil { if err2 := userproc.Close(); err2 != nil { err = errors.Join(err2) @@ -29,9 +33,8 @@ func userWorker(ctx context.Context, s Streamer, chunkdir *chunk.Directory, tf T return fmt.Errorf("error closing user processor: %w", err) } logger.FromContext(ctx).Debug("users done") - users, err := chunkdir.Users() // load users from chunks - if err != nil { - return fmt.Errorf("error loading users: %w", err) + if len(users) == 0 { + return fmt.Errorf("unable to proceed, no users found") } if err := tf.StartWithUsers(ctx, users); err != nil { return fmt.Errorf("error starting the transformer: %w", err) diff --git a/internal/chunk/processor/dirproc/users.go b/internal/chunk/processor/dirproc/users.go index a3ae6311..b31ee9ee 100644 --- a/internal/chunk/processor/dirproc/users.go +++ b/internal/chunk/processor/dirproc/users.go @@ -1,17 +1,52 @@ package dirproc -import "github.com/rusq/slackdump/v2/internal/chunk" +import ( + "context" + "fmt" + + "github.com/rusq/slackdump/v2/internal/chunk" + "github.com/rusq/slackdump/v2/internal/chunk/processor" + "github.com/slack-go/slack" +) // Users is a users processor. type Users struct { *baseproc + cb func([]slack.User) error +} + +var _ processor.Users = &Users{} + +type UserOption func(*Users) + +// WithUsers sets the users callback. +func WithUsers(cb func([]slack.User) error) UserOption { + return func(u *Users) { + u.cb = cb + } } // NewUsers creates a new Users processor. -func NewUsers(cd *chunk.Directory) (*Users, error) { +func NewUsers(cd *chunk.Directory, opt ...UserOption) (*Users, error) { p, err := newBaseProc(cd, "users") if err != nil { return nil, err } - return &Users{baseproc: p}, nil + u := &Users{baseproc: p} + for _, o := range opt { + o(u) + } + return u, nil +} + +func (u *Users) Users(ctx context.Context, users []slack.User) error { + if err := u.baseproc.Users(ctx, users); err != nil { + return err + } + if u.cb != nil { + if err := u.cb(users); err != nil { + return fmt.Errorf("users callback: %w", err) + } + } + return nil }