Skip to content

Commit

Permalink
parallel channel processing in converter
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed May 12, 2023
1 parent f38bd9f commit 0ad3b70
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions internal/convert/chunkexp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"runtime/trace"
"sync"

"github.com/rusq/fsadapter"
"github.com/rusq/slackdump/v2/internal/chunk"
Expand All @@ -22,7 +23,8 @@ var (
ErrNoLocFunction = errors.New("missing location function")
)

// ChunkToExport is a converter between Chunk and Export formats.
// ChunkToExport is a converter between Chunk and Export formats. Zero value
// is not usable.
type ChunkToExport struct {
// src is the source directory with chunks
src *chunk.Directory
Expand All @@ -36,6 +38,8 @@ type ChunkToExport struct {

lg logger.Interface

workers int // number of workers to use to convert channels

request chan filereq
result chan copyresult
}
Expand Down Expand Up @@ -86,6 +90,7 @@ func NewChunkToExport(src *chunk.Directory, trg fsadapter.FS, opt ...C2EOption)
lg: logger.Default,
request: make(chan filereq, 1),
result: make(chan copyresult, 1),
workers: 8,
}
for _, o := range opt {
o(c)
Expand Down Expand Up @@ -161,20 +166,48 @@ func (c *ChunkToExport) Convert(ctx context.Context) error {
go c.copyworker(c.result, c.request)
}

conv := transform.NewExpConverter(c.src, c.trg, tfopts...)

errC := make(chan error, 1)
// 1. generator
var chC = make(chan slack.Channel)
go func() {
defer close(c.result)
defer close(chC)
for _, ch := range channels {
lg.Debugf("processing channel %q", ch.ID)
if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil {
errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err)
return
chC <- ch
}
}()

// 2. workers
conv := transform.NewExpConverter(c.src, c.trg, tfopts...)
errC := make(chan error, c.workers)
var wg sync.WaitGroup
for i := 0; i < c.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for ch := range chC {
lg.Debugf("processing channel %q", ch.ID)
if err := conv.Convert(ctx, chunk.ToFileID(ch.ID, "", false)); err != nil {
errC <- fmt.Errorf("converter: failed to process %q: %w", ch.ID, err)
return
}
}
}()
}
// 2.1 index writer
wg.Add(1)
go func() {
defer wg.Done()
lg.Debugf("writing index for %s", c.src.Name())
if err := conv.WriteIndex(); err != nil {
errC <- err
}
}()
// 3. sentinel
go func() {
wg.Wait()
close(c.result)
}()

// 4. result processor
LOOP:
for {
select {
Expand All @@ -192,10 +225,6 @@ LOOP:
}
}

lg.Debugf("writing index for %s", c.src.Name())
if err := conv.WriteIndex(); err != nil {
return err
}
return nil
}

Expand Down

0 comments on commit 0ad3b70

Please sign in to comment.