Skip to content

Commit

Permalink
fix parameter parsing in test server
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 8, 2023
1 parent 5e9ae7b commit d180329
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 35 deletions.
3 changes: 3 additions & 0 deletions internal/processors/proctest/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func NewPlayer(rs io.ReadSeeker) (*Player, error) {
return &Player{
rs: rs,
idx: idx,
current: state{
threadReq: make(map[string]int),
},
}, nil
}

Expand Down
11 changes: 5 additions & 6 deletions internal/processors/proctest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,18 @@ func router(p *Player) *http.ServeMux {
}
})
mux.HandleFunc("/api/conversations.replies", func(w http.ResponseWriter, r *http.Request) {
req := slack.GetConversationRepliesParameters{}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
timestamp := r.FormValue("ts")
if timestamp == "" {
http.Error(w, "ts is required", http.StatusBadRequest)
return
}
msg, err := p.Thread(req.Timestamp)
msg, err := p.Thread(timestamp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := GetConversationRepliesResponse{
HasMore: p.HasMoreThreads(req.Timestamp),
HasMore: p.HasMoreThreads(timestamp),
Messages: msg,
SlackResponse: slack.SlackResponse{
Ok: true,
Expand Down
38 changes: 9 additions & 29 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,13 @@ import (
"github.com/rusq/slackdump/v2/internal/processors"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/slack-go/slack"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
streamThreadGoroutines = 10
streamFilesGoroutines = 10
)

type channelStream struct {
oldest, latest time.Time
client clienter
limits rateLimits

egThread errgroup.Group
egFiles errgroup.Group
}

type rateLimits struct {
Expand All @@ -46,8 +37,6 @@ func newChannelStream(cl clienter, limits *Limits, oldest, latest time.Time) *ch
tier: limits,
},
}
cs.egThread.SetLimit(streamThreadGoroutines)
cs.egFiles.SetLimit(streamFilesGoroutines)
return cs
}

Expand All @@ -68,12 +57,6 @@ func (cs *channelStream) Stream(ctx context.Context, link string, proc processor
return err
}
}
if err := cs.egThread.Wait(); err != nil {
return err
}
if err := cs.egFiles.Wait(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -107,19 +90,16 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processors
for i := range resp.Messages {
idx := i
if resp.Messages[idx].Msg.ThreadTimestamp != "" {
cs.egThread.Go(func() error {
return cs.thread(ctx, id, resp.Messages[idx].Msg.ThreadTimestamp, proc)
})
if err := cs.thread(ctx, id, resp.Messages[idx].Msg.ThreadTimestamp, proc); err != nil {
return err
}
}
if resp.Messages[idx].Files != nil && len(resp.Messages[idx].Files) > 0 {
cs.egFiles.Go(func() error {
return proc.Files(id, resp.Messages[idx], false, resp.Messages[idx].Files)
})
if err := proc.Files(id, resp.Messages[idx], false, resp.Messages[idx].Files); err != nil {
return err
}
}
}
if err := cs.egThread.Wait(); err != nil {
return err
}
if !resp.HasMore {
break
}
Expand Down Expand Up @@ -165,9 +145,9 @@ func (cs *channelStream) thread(ctx context.Context, id string, threadTS string,
for i := range msgs[1:] {
idx := i
if msgs[idx].Files != nil && len(msgs[idx].Files) > 0 {
cs.egFiles.Go(func() error {
return proc.Files(id, msgs[idx], true, msgs[idx].Files)
})
if err := proc.Files(id, msgs[idx], true, msgs[idx].Files); err != nil {
return err
}
}
}
if !hasmore {
Expand Down

0 comments on commit d180329

Please sign in to comment.