Skip to content

Commit

Permalink
do not rely on chunks in user processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed May 1, 2023
1 parent fa4effc commit 5e8e672
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
13 changes: 8 additions & 5 deletions internal/chunk/control/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ import (
"github.com/rusq/slackdump/v2/internal/chunk/processor/dirproc"
"github.com/rusq/slackdump/v2/internal/chunk/transform"
"github.com/rusq/slackdump/v2/logger"
"github.com/slack-go/slack"
)

func userWorker(ctx context.Context, s Streamer, chunkdir *chunk.Directory, tf TransformStarter) error {
userproc, err := dirproc.NewUsers(chunkdir)
var users = make([]slack.User, 0, 100)
userproc, err := dirproc.NewUsers(chunkdir, dirproc.WithUsers(func(us []slack.User) error {
users = append(users, us...)
return nil
}))
if err != nil {
return err
}

if err := s.Users(ctx, userproc); err != nil {
if err2 := userproc.Close(); err2 != nil {
err = errors.Join(err2)
Expand All @@ -29,9 +33,8 @@ func userWorker(ctx context.Context, s Streamer, chunkdir *chunk.Directory, tf T
return fmt.Errorf("error closing user processor: %w", err)
}
logger.FromContext(ctx).Debug("users done")
users, err := chunkdir.Users() // load users from chunks
if err != nil {
return fmt.Errorf("error loading users: %w", err)
if len(users) == 0 {
return fmt.Errorf("unable to proceed, no users found")
}
if err := tf.StartWithUsers(ctx, users); err != nil {
return fmt.Errorf("error starting the transformer: %w", err)
Expand Down
41 changes: 38 additions & 3 deletions internal/chunk/processor/dirproc/users.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
package dirproc

import "github.com/rusq/slackdump/v2/internal/chunk"
import (
"context"
"fmt"

"github.com/rusq/slackdump/v2/internal/chunk"
"github.com/rusq/slackdump/v2/internal/chunk/processor"
"github.com/slack-go/slack"
)

// Users is a users processor.
type Users struct {
*baseproc
cb func([]slack.User) error
}

var _ processor.Users = &Users{}

type UserOption func(*Users)

// WithUsers sets the users callback.
func WithUsers(cb func([]slack.User) error) UserOption {
return func(u *Users) {
u.cb = cb
}
}

// NewUsers creates a new Users processor.
func NewUsers(cd *chunk.Directory) (*Users, error) {
func NewUsers(cd *chunk.Directory, opt ...UserOption) (*Users, error) {
p, err := newBaseProc(cd, "users")
if err != nil {
return nil, err
}
return &Users{baseproc: p}, nil
u := &Users{baseproc: p}
for _, o := range opt {
o(u)
}
return u, nil
}

func (u *Users) Users(ctx context.Context, users []slack.User) error {
if err := u.baseproc.Users(ctx, users); err != nil {
return err
}
if u.cb != nil {
if err := u.cb(users); err != nil {
return fmt.Errorf("users callback: %w", err)
}
}
return nil
}

0 comments on commit 5e8e672

Please sign in to comment.