Skip to content

Commit

Permalink
get rid of UserIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Jan 29, 2023
1 parent fcc7b87 commit db5efca
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 193 deletions.
22 changes: 11 additions & 11 deletions channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
// type of messages to fetch. See github.com/rusq/slack docs for possible
// values. If large number of channels is to be returned, consider using
// StreamChannels.
func (sd *Session) GetChannels(ctx context.Context, chanTypes ...string) (types.Channels, error) {
func (s *Session) GetChannels(ctx context.Context, chanTypes ...string) (types.Channels, error) {
var allChannels types.Channels
if err := sd.getChannels(ctx, chanTypes, func(cc types.Channels) error {
if err := s.getChannels(ctx, chanTypes, func(cc types.Channels) error {
allChannels = append(allChannels, cc...)
return nil
}); err != nil {
Expand All @@ -30,8 +30,8 @@ func (sd *Session) GetChannels(ctx context.Context, chanTypes ...string) (types.

// StreamChannels requests the channels from the API and calls the callback
// function cb for each.
func (sd *Session) StreamChannels(ctx context.Context, chanTypes []string, cb func(ch slack.Channel) error) error {
return sd.getChannels(ctx, chanTypes, func(chans types.Channels) error {
func (s *Session) StreamChannels(ctx context.Context, chanTypes []string, cb func(ch slack.Channel) error) error {
return s.getChannels(ctx, chanTypes, func(chans types.Channels) error {
for _, ch := range chans {
if err := cb(ch); err != nil {
return err
Expand All @@ -44,17 +44,17 @@ func (sd *Session) StreamChannels(ctx context.Context, chanTypes []string, cb fu
// getChannels list all conversations for a user. `chanTypes` specifies
// the type of messages to fetch. See github.com/rusq/slack docs for possible
// values
func (sd *Session) getChannels(ctx context.Context, chanTypes []string, cb func(types.Channels) error) error {
func (s *Session) getChannels(ctx context.Context, chanTypes []string, cb func(types.Channels) error) error {
ctx, task := trace.NewTask(ctx, "getChannels")
defer task.End()

limiter := network.NewLimiter(network.Tier2, sd.cfg.Limits.Tier2.Burst, int(sd.cfg.Limits.Tier2.Boost))
limiter := network.NewLimiter(network.Tier2, s.cfg.Limits.Tier2.Burst, int(s.cfg.Limits.Tier2.Boost))

if chanTypes == nil {
chanTypes = AllChanTypes
}

params := &slack.GetConversationsParameters{Types: chanTypes, Limit: sd.cfg.Limits.Request.Channels}
params := &slack.GetConversationsParameters{Types: chanTypes, Limit: s.cfg.Limits.Request.Channels}
fetchStart := time.Now()
var total int
for i := 1; ; i++ {
Expand All @@ -63,10 +63,10 @@ func (sd *Session) getChannels(ctx context.Context, chanTypes []string, cb func(
nextcur string
)
reqStart := time.Now()
if err := withRetry(ctx, limiter, sd.cfg.Limits.Tier3.Retries, func() error {
if err := withRetry(ctx, limiter, s.cfg.Limits.Tier3.Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationsContext", func() {
chans, nextcur, err = sd.client.GetConversationsContext(ctx, params)
chans, nextcur, err = s.client.GetConversationsContext(ctx, params)
})
return err

Expand All @@ -79,14 +79,14 @@ func (sd *Session) getChannels(ctx context.Context, chanTypes []string, cb func(
}
total += len(chans)

sd.l().Printf("channels request #%5d, fetched: %4d, total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
s.l().Printf("channels request #%5d, fetched: %4d, total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
i, len(chans), total,
float64(len(chans))/float64(time.Since(reqStart).Seconds()),
float64(total)/float64(time.Since(fetchStart).Seconds()),
)

if nextcur == "" {
sd.l().Printf("channels fetch complete, total: %d channels", total)
s.l().Printf("channels fetch complete, total: %d channels", total)
break
}

Expand Down
27 changes: 11 additions & 16 deletions channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import (
"github.com/slack-go/slack"
"github.com/stretchr/testify/assert"

"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/types"
)

func TestSession_getChannels(t *testing.T) {
type fields struct {
Users types.Users
UserIndex structures.UserIndex
options Config
Users types.Users
options Config
}
type args struct {
ctx context.Context
Expand Down Expand Up @@ -79,10 +77,9 @@ func TestSession_getChannels(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mc := newmockClienter(gomock.NewController(t))
sd := &Session{
client: mc,
Users: tt.fields.Users,
UserIndex: tt.fields.UserIndex,
cfg: tt.fields.options,
client: mc,
Users: tt.fields.Users,
cfg: tt.fields.options,
}

if tt.expectFn != nil {
Expand All @@ -105,10 +102,9 @@ func TestSession_getChannels(t *testing.T) {

func TestSession_GetChannels(t *testing.T) {
type fields struct {
client clienter
Users types.Users
UserIndex structures.UserIndex
options Config
client clienter
Users types.Users
options Config
}
type args struct {
ctx context.Context
Expand All @@ -126,10 +122,9 @@ func TestSession_GetChannels(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sd := &Session{
client: tt.fields.client,
Users: tt.fields.Users,
UserIndex: tt.fields.UserIndex,
cfg: tt.fields.options,
client: tt.fields.client,
Users: tt.fields.Users,
cfg: tt.fields.options,
}
got, err := sd.GetChannels(tt.args.ctx, tt.args.chanTypes...)
if (err != nil) != tt.wantErr {
Expand Down
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ type RequestLimit struct {
// DefOptions is the default options used when initialising slackdump instance.
var DefOptions = Config{
Limits: Limits{
Workers: 4, // number of workers doing the file download
Workers: 4, // number of parallel goroutines downloading files.
DownloadRetries: 3, // this shouldn't even happen, as we have no limiter on files download.
Tier2: TierLimits{
Boost: 20, // seems to work fine with this boost
Burst: 1, // limiter will wait indefinitely if it is less than 1.
Retries: 20, // see #28, sometimes slack is being difficult
Retries: 20, // see issue #28, sometimes slack is being difficult
},
Tier3: TierLimits{
Boost: 120, // playing safe there, but generally value of 120 is fine.
Expand Down
4 changes: 2 additions & 2 deletions internal/app/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (app *dump) writeText(fs fsadapter.FS, filename string, m *types.Conversati
}
defer f.Close()

return m.ToText(f, app.sess.UserIndex)
return m.ToText(f, app.sess.Users.IndexByID())
}

// reporter is an interface defining output functions
Expand Down Expand Up @@ -225,7 +225,7 @@ func (dm *dump) fetchEntity(ctx context.Context, listFlags config.ListFlags) (re
func (app *dump) formatEntity(w io.Writer, rep reporter, output config.Output) error {
switch output.Format {
case config.OutputTypeText:
return rep.ToText(w, app.sess.UserIndex)
return rep.ToText(w, app.sess.Users.IndexByID())
case config.OutputTypeJSON:
enc := json.NewEncoder(w)
return enc.Encode(rep)
Expand Down
48 changes: 24 additions & 24 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,41 @@ import (
//
// oldest and latest timestamps set a timeframe within which the messages
// should be retrieved, also one can provide process functions.
func (sd *Session) Dump(ctx context.Context, link string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
func (s *Session) Dump(ctx context.Context, link string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
sl, err := structures.ParseLink(link)
if err != nil {
return nil, err
}
if sd.cfg.DumpFiles {
fn, cancelFn, err := sd.newFileProcessFn(ctx, sl.Channel, sd.limiter(network.NoTier))
if s.cfg.DumpFiles {
fn, cancelFn, err := s.newFileProcessFn(ctx, sl.Channel, s.limiter(network.NoTier))
if err != nil {
return nil, err
}
defer cancelFn()
processFn = append(processFn, fn)
}

return sd.dump(ctx, sl, oldest, latest, processFn...)
return s.dump(ctx, sl, oldest, latest, processFn...)
}

// DumpAll dumps all messages. See description of Dump for what can be provided
// in link.
func (sd *Session) DumpAll(ctx context.Context, link string) (*types.Conversation, error) {
return sd.Dump(ctx, link, time.Time{}, time.Time{})
func (s *Session) DumpAll(ctx context.Context, link string) (*types.Conversation, error) {
return s.Dump(ctx, link, time.Time{}, time.Time{})
}

// DumpRaw dumps all messages, but does not account for any options
// defined, such as DumpFiles, instead, the caller must hassle about any
// processFns they want to apply.
func (sd *Session) DumpRaw(ctx context.Context, link string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
func (s *Session) DumpRaw(ctx context.Context, link string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
sl, err := structures.ParseLink(link)
if err != nil {
return nil, err
}
return sd.dump(ctx, sl, oldest, latest, processFn...)
return s.dump(ctx, sl, oldest, latest, processFn...)
}

func (sd *Session) dump(ctx context.Context, sl structures.SlackLink, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
func (s *Session) dump(ctx context.Context, sl structures.SlackLink, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
ctx, task := trace.NewTask(ctx, "dump")
defer task.End()
trace.Logf(ctx, "info", "sl: %q", sl)
Expand All @@ -70,15 +70,15 @@ func (sd *Session) dump(ctx context.Context, sl structures.SlackLink, oldest, la
}

if sl.IsThread() {
return sd.dumpThreadAsConversation(ctx, sl, oldest, latest, processFn...)
return s.dumpThreadAsConversation(ctx, sl, oldest, latest, processFn...)
} else {
return sd.dumpChannel(ctx, sl.Channel, oldest, latest, processFn...)
return s.dumpChannel(ctx, sl.Channel, oldest, latest, processFn...)
}
}

// dumpChannel fetches messages from the conversation identified by channelID.
// processFn will be called on each batch of messages returned from API.
func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
func (s *Session) dumpChannel(ctx context.Context, channelID string, oldest, latest time.Time, processFn ...ProcessFunc) (*types.Conversation, error) {
ctx, task := trace.NewTask(ctx, "dumpMessages")
defer task.End()

Expand All @@ -90,13 +90,13 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la

var (
// slack rate limits are per method, so we're safe to use different limiters for different mehtods.
convLimiter = sd.limiter(network.Tier3)
threadLimiter = sd.limiter(network.Tier3)
convLimiter = s.limiter(network.Tier3)
threadLimiter = s.limiter(network.Tier3)
)

// add thread dumper. It should go first, because it populates message
// chunk with thread messages.
pfns := append([]ProcessFunc{sd.newThreadProcessFn(ctx, threadLimiter, oldest, latest)}, processFn...)
pfns := append([]ProcessFunc{s.newThreadProcessFn(ctx, threadLimiter, oldest, latest)}, processFn...)

var (
messages []types.Message
Expand All @@ -108,13 +108,13 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la
resp *slack.GetConversationHistoryResponse
)
reqStart := time.Now()
if err := withRetry(ctx, convLimiter, sd.cfg.Limits.Tier3.Retries, func() error {
if err := withRetry(ctx, convLimiter, s.cfg.Limits.Tier3.Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationHistoryContext", func() {
resp, err = sd.client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
resp, err = s.client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
ChannelID: channelID,
Cursor: cursor,
Limit: sd.cfg.Limits.Request.Conversations,
Limit: s.cfg.Limits.Request.Conversations,
Oldest: structures.FormatSlackTS(oldest),
Latest: structures.FormatSlackTS(latest),
Inclusive: true,
Expand All @@ -141,14 +141,14 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la

messages = append(messages, chunk...)

sd.l().Printf("messages request #%5d, fetched: %4d (%s), total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
s.l().Printf("messages request #%5d, fetched: %4d (%s), total: %8d (speed: %6.2f/sec, avg: %6.2f/sec)\n",
i, len(resp.Messages), results, len(messages),
float64(len(resp.Messages))/float64(time.Since(reqStart).Seconds()),
float64(len(messages))/float64(time.Since(fetchStart).Seconds()),
)

if !resp.HasMore {
sd.l().Printf("messages fetch complete, total: %d", len(messages))
s.l().Printf("messages fetch complete, total: %d", len(messages))
break
}

Expand All @@ -157,20 +157,20 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la

types.SortMessages(messages)

name, err := sd.getChannelName(ctx, sd.limiter(network.Tier3), channelID)
name, err := s.getChannelName(ctx, s.limiter(network.Tier3), channelID)
if err != nil {
return nil, err
}

return &types.Conversation{Name: name, Messages: messages, ID: channelID}, nil
}

func (sd *Session) getChannelName(ctx context.Context, l *rate.Limiter, channelID string) (string, error) {
func (s *Session) getChannelName(ctx context.Context, l *rate.Limiter, channelID string) (string, error) {
// get channel name
var ci *slack.Channel
if err := withRetry(ctx, l, sd.cfg.Limits.Tier3.Retries, func() error {
if err := withRetry(ctx, l, s.cfg.Limits.Tier3.Retries, func() error {
var err error
ci, err = sd.client.GetConversationInfoContext(ctx, &slack.GetConversationInfoInput{ChannelID: channelID})
ci, err = s.client.GetConversationInfoContext(ctx, &slack.GetConversationInfoInput{ChannelID: channelID})
return err
}); err != nil {
return "", err
Expand Down
37 changes: 15 additions & 22 deletions messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/rusq/slackdump/v2/internal/fixtures"
"github.com/rusq/slackdump/v2/internal/network"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/types"
)

Expand Down Expand Up @@ -64,9 +63,8 @@ var (

func TestSession_DumpMessages(t *testing.T) {
type fields struct {
Users types.Users
UserIndex structures.UserIndex
options Config
Users types.Users
options Config
}
type args struct {
ctx context.Context
Expand Down Expand Up @@ -219,10 +217,9 @@ func TestSession_DumpMessages(t *testing.T) {
tt.expectFn(mc)

sd := &Session{
client: mc,
Users: tt.fields.Users,
UserIndex: tt.fields.UserIndex,
cfg: tt.fields.options,
client: mc,
Users: tt.fields.Users,
cfg: tt.fields.options,
}
got, err := sd.DumpAll(tt.args.ctx, tt.args.channelID)
if (err != nil) != tt.wantErr {
Expand All @@ -237,9 +234,8 @@ func TestSession_DumpMessages(t *testing.T) {
func TestSession_DumpAll(t *testing.T) {
t.Parallel()
type fields struct {
Users types.Users
UserIndex structures.UserIndex
options Config
Users types.Users
options Config
}
type args struct {
ctx context.Context
Expand Down Expand Up @@ -303,10 +299,9 @@ func TestSession_DumpAll(t *testing.T) {
}

sd := &Session{
client: mc,
Users: tt.fields.Users,
UserIndex: tt.fields.UserIndex,
cfg: tt.fields.options,
client: mc,
Users: tt.fields.Users,
cfg: tt.fields.options,
}
got, err := sd.DumpAll(tt.args.ctx, tt.args.slackURL)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -365,9 +360,8 @@ func TestConversation_String(t *testing.T) {

func TestSession_getChannelName(t *testing.T) {
type fields struct {
Users types.Users
UserIndex structures.UserIndex
options Config
Users types.Users
options Config
}
type args struct {
ctx context.Context
Expand Down Expand Up @@ -422,10 +416,9 @@ func TestSession_getChannelName(t *testing.T) {

tt.expectFn(mc)
sd := &Session{
client: mc,
Users: tt.fields.Users,
UserIndex: tt.fields.UserIndex,
cfg: tt.fields.options,
client: mc,
Users: tt.fields.Users,
cfg: tt.fields.options,
}
got, err := sd.getChannelName(tt.args.ctx, tt.args.l, tt.args.channelID)
if (err != nil) != tt.wantErr {
Expand Down
Loading

0 comments on commit db5efca

Please sign in to comment.