Skip to content

Commit

Permalink
async stream
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 16, 2023
1 parent ab926b2 commit fa68959
Showing 1 changed file with 184 additions and 1 deletion.
185 changes: 184 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"runtime/trace"
"sync"
"time"

"github.com/rusq/dlog"
Expand Down Expand Up @@ -207,7 +208,7 @@ func (cs *Stream) thread(ctx context.Context, id string, threadTS string, proc p
}

// slack returns the thread starter as the first message with every
// call so we use it as a parent message.
// call, so we use it as a parent message.
if err := proc.ThreadMessages(ctx, id, msgs[0], msgs[1:]); err != nil {
return fmt.Errorf("failed to process message id=%s, thread_ts=%s: %w", msgs[0].Msg.ClientMsgID, threadTS, err)
}
Expand Down Expand Up @@ -269,3 +270,185 @@ func (cs *Stream) Channels(ctx context.Context, types []string, proc processor.C
}
return nil
}

const chanSz = 100

func (cs *Stream) AsyncConversations(ctx context.Context, proc processor.Conversations, links <-chan string) error {
ctx, task := trace.NewTask(ctx, "AsyncConversations")
defer task.End()

// create channels
chans := make(chan channelRequest, chanSz)
defer close(chans)
threads := make(chan threadRequest, chanSz)
defer close(threads)
errorC := make(chan error, 2)

var wg sync.WaitGroup
wg.Add(2)
go func() {
cs.channelWorker(ctx, 0, proc, errorC, chans, threads)
wg.Done()
}()
go func() {
go cs.threadWorker(ctx, 0, proc, errorC, threads)
wg.Done()
}()
go func() {
wg.Wait()
close(errorC)
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
case link, more := <-links:
if !more {
return nil
}
if err := cs.processLink(chans, threads, link); err != nil {
return err
}
}
}
}

// processLink parses the link and sends it to the appropriate worker.
func (cs *Stream) processLink(chans chan<- channelRequest, threads chan<- threadRequest, link string) error {
sl, err := structures.ParseLink(link)
if err != nil {
return err
}
if !sl.IsValid() {
return errors.New("invalid slack link: " + link)
}
if sl.IsThread() {
threads <- threadRequest{channelID: sl.Channel, threadTS: sl.ThreadTS}
} else {
chans <- channelRequest{channelID: sl.Channel}
}
return nil
}

type channelRequest struct {
channelID string
}

type threadRequest struct {
channelID string
threadTS string
}

type WorkerError struct {
Type string
Worker int
Err error
}

func (we WorkerError) Error() string {
return fmt.Sprintf("%s worker %d: %v", we.Type, we.Worker, we.Err)
}

func (we WorkerError) Unwrap() error {
return we.Err
}

func (cs *Stream) channelWorker(ctx context.Context, id int, proc processor.Conversations, errors chan<- error, reqs <-chan channelRequest, threadC chan<- threadRequest) {
ctx, task := trace.NewTask(ctx, "channelWorker")
defer task.End()
trace.Logf(ctx, "id", "%d", id)

for {
select {
case <-ctx.Done():
errors <- WorkerError{Type: "channel", Worker: id, Err: ctx.Err()}
return
case req, more := <-reqs:
if !more {
return // channel closed
}
if err := cs.channelInfo(ctx, req.channelID, false, proc); err != nil {
errors <- WorkerError{Type: "channel", Worker: id, Err: err}
}
if err := cs.asyncChannel(ctx, req.channelID, func(mm []slack.Message) error {
if err := proc.Messages(ctx, req.channelID, mm); err != nil {
return fmt.Errorf("failed to process message chunk starting with id=%s (size=%d): %w", mm[0].Msg.ClientMsgID, len(mm), err)
}
for i := range mm {
if mm[i].Msg.ThreadTimestamp != "" && mm[i].Msg.SubType != "thread_broadcast" {
dlog.Debugf("- message #%d/thread: id=%s, thread_ts=%s", i, mm[i].ClientMsgID, mm[i].Msg.ThreadTimestamp)
threadC <- threadRequest{channelID: req.channelID, threadTS: mm[i].Msg.ThreadTimestamp}
}
if len(mm[i].Files) > 0 {
if err := proc.Files(ctx, req.channelID, mm[i], false, mm[i].Files); err != nil {
return err
}
}
}
return nil
}); err != nil {
errors <- WorkerError{Type: "channel", Worker: id, Err: err}
}
}
}
}

func (cs *Stream) asyncChannel(ctx context.Context, id string, fn func(mm []slack.Message) error) error {
ctx, task := trace.NewTask(ctx, "asyncChannel")
defer task.End()

cursor := ""
for {
var resp *slack.GetConversationHistoryResponse
if err := network.WithRetry(ctx, cs.limits.channels, cs.limits.tier.Tier3.Retries, func() error {
var apiErr error
rgn := trace.StartRegion(ctx, "GetConversationHistoryContext")
resp, apiErr = cs.client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
ChannelID: id,
Cursor: cursor,
Limit: cs.limits.tier.Request.Conversations,
Oldest: structures.FormatSlackTS(cs.oldest),
Latest: structures.FormatSlackTS(cs.latest),
Inclusive: true,
})
rgn.End()
return apiErr
}); err != nil {
return err
}
if !resp.Ok {
trace.Logf(ctx, "error", "not ok, api error=%s", resp.Error)
return fmt.Errorf("response not ok, slack error: %s", resp.Error)
}
if err := fn(resp.Messages); err != nil {
return fmt.Errorf("failed to process message chunk starting with id=%s (size=%d): %w", resp.Messages[0].Msg.ClientMsgID, len(resp.Messages), err)
}
if !resp.HasMore {
break
}
cursor = resp.ResponseMetaData.NextCursor
}
return nil
}

func (cs *Stream) threadWorker(ctx context.Context, id int, proc processor.Conversations, errors chan<- error, reqs <-chan threadRequest) {
ctx, task := trace.NewTask(ctx, "threadWorker")
defer task.End()
trace.Logf(ctx, "id", "%d", id)

for {
select {
case <-ctx.Done():
errors <- WorkerError{Type: "thread", Worker: id, Err: ctx.Err()}
return
case req, more := <-reqs:
if !more {
return // channel closed
}
if err := cs.thread(ctx, req.channelID, req.threadTS, proc); err != nil {
errors <- WorkerError{Type: "thread", Worker: id, Err: err}
}
}
}
}

0 comments on commit fa68959

Please sign in to comment.