Skip to content

Commit

Permalink
transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 25, 2023
1 parent 893fc0b commit 7082f05
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/diag/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func runRecord(ctx context.Context, _ *base.Command, args []string) error {
rec := event.NewRecorder(w)
for _, ch := range args {
cfg.Log.Printf("streaming channel %q", ch)
if err := sess.Stream(ctx, ch, rec, time.Time{}, time.Time{}); err != nil {
if err := sess.Stream(ctx, rec, ch, time.Time{}, time.Time{}); err != nil {
if err2 := rec.Close(); err2 != nil {
return fmt.Errorf("error streaming channel %q: %w; error closing recorder: %v", ch, err, err2)
}
Expand Down
96 changes: 67 additions & 29 deletions cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"errors"
"flag"
"fmt"
"io/fs"
"os"
"path/filepath"
"runtime/trace"
"strings"
"text/template"
Expand All @@ -18,11 +20,12 @@ import (
"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/auth"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/cfg"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/fetch"
"github.com/rusq/slackdump/v2/cmd/slackdump/internal/golang/base"
"github.com/rusq/slackdump/v2/internal/app/config"
"github.com/rusq/slackdump/v2/internal/app/nametmpl"
"github.com/rusq/slackdump/v2/internal/event/processor"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/rusq/slackdump/v2/internal/transform"
"github.com/rusq/slackdump/v2/types"
)

Expand Down Expand Up @@ -109,9 +112,25 @@ func RunDump(ctx context.Context, cmd *base.Command, args []string) error {
}
defer sess.Close()

// Dump conversations.
tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return err
}

return dumpv3(ctx, sess, list, namer)
// Dump conversations.
p := &fetch.Parameters{
Oldest: opts.Oldest,
Latest: opts.Latest,
List: list,
DumpFiles: cfg.SlackConfig.DumpFiles,
}
if err := fetch.Fetch(ctx, sess, tmpdir, p); err != nil {
return err
}
if err := reconstruct(ctx, sess.Filesystem(), tmpdir, namer); err != nil {
return err
}
return nil
}

func dumpv2(ctx context.Context, sess *slackdump.Session, list *structures.EntityList, namer namer) error {
Expand All @@ -128,32 +147,6 @@ func dumpv2(ctx context.Context, sess *slackdump.Session, list *structures.Entit
return nil
}

func dumpv3(ctx context.Context, sess *slackdump.Session, list *structures.EntityList, namer namer) error {
tmpdir, err := os.MkdirTemp("", "slackdump-*")
if err != nil {
return err
}
dlog.Printf("using %s as temporary directory", tmpdir)
f, err := os.CreateTemp(tmpdir, "events-*.jsonl")
if err != nil {
return err
}
defer f.Close()

pr, err := processor.NewStandard(f, sess.Client(), tmpdir)
if err != nil {
return err
}
defer pr.Close()

for _, link := range list.Include {
if err := sess.Stream(ctx, link, pr, opts.Oldest, opts.Latest); err != nil {
return err
}
}
return nil
}

// namer is a helper type to generate filenames for conversations.
type namer struct {
t *template.Template
Expand Down Expand Up @@ -201,3 +194,48 @@ func HelpDump(cmd *base.Command) string {
}
return buf.String()
}

func reconstruct(ctx context.Context, fsa fsadapter.FS, tmpdir string, namer namer) error {
_, task := trace.NewTask(ctx, "reconstruct")
defer task.End()

return filepath.WalkDir(tmpdir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
if path != tmpdir {
return filepath.SkipDir
}
return nil
}
if !strings.HasSuffix(path, "jsonl.gz") {
return nil
}

channelID, _, found := strings.Cut(d.Name(), "-")
if !found {
return fmt.Errorf("invalid filename: %q", d.Name())
}

info := transform.EventsInfo{
ChannelID: channelID,
IsCompressed: true,
File: path,
}
if filesdir := filepath.Join(tmpdir, channelID); dirExists(filesdir) {
info.FilesDir = filesdir
}

dlog.Printf("reconstructing %s", info)
return nil
})
}

func dirExists(path string) bool {
fi, err := os.Stat(path)
if err != nil {
return false
}
return fi.IsDir()
}
14 changes: 14 additions & 0 deletions cmd/slackdump/internal/dump/dump_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dump

import (
"context"
_ "embed"
"testing"
)

func Test_reconstruct(t *testing.T) {
if err := reconstruct(context.Background(), nil, "../../../../tmp", namer{}); err != nil {
t.Fatal(err)
}
t.Fatal("x")
}
68 changes: 68 additions & 0 deletions cmd/slackdump/internal/fetch/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package fetch

import (
"compress/gzip"
"context"
"fmt"
"os"
"strings"
"time"

"github.com/rusq/dlog"
"github.com/rusq/slackdump/v2"
"github.com/rusq/slackdump/v2/internal/event/processor"
"github.com/rusq/slackdump/v2/internal/structures"
"github.com/slack-go/slack"
)

type Parameters struct {
Oldest time.Time
Latest time.Time
List *structures.EntityList
DumpFiles bool
}

func Fetch(ctx context.Context, sess *slackdump.Session, dir string, p *Parameters) error {
if p == nil {
return fmt.Errorf("nil parameters")
}

dlog.Printf("using %s as temporary directory", dir)

for _, link := range p.List.Include {
if err := dumpOne(ctx, sess, dir, link, p); err != nil {
return err
}
}
return nil
}

type streamer interface {
Client() *slack.Client
Stream(context.Context, processor.Processor, string, time.Time, time.Time) error
}

var replacer = strings.NewReplacer("/", "-", ":", "-")

func dumpOne(ctx context.Context, sess streamer, dir string, link string, p *Parameters) error {
var pattern = fmt.Sprintf("%s-*.jsonl.gz", replacer.Replace(link))
f, err := os.CreateTemp(dir, pattern)
if err != nil {
return err
}
defer f.Close()

gz := gzip.NewWriter(f)
defer gz.Close()

pr, err := processor.NewStandard(ctx, gz, sess.Client(), dir, processor.DumpFiles(p.DumpFiles))
if err != nil {
return err
}
defer pr.Close()

if err := sess.Stream(ctx, pr, link, p.Oldest, p.Latest); err != nil {
return err
}
return nil
}
11 changes: 7 additions & 4 deletions internal/event/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Player struct {
// ID, value is the list of offsets for that id in the file.
type index map[string][]int64

// offsets holds the index of the current offset for each event id.
// offsets holds the index of the current offset in the index for each event
// id.
type offsets map[string]int

// NewPlayer creates a new event player from the io.ReadSeeker.
Expand Down Expand Up @@ -75,7 +76,7 @@ func indexRecords(dec decodeOffsetter) (index, error) {
return idx, nil
}

// Offset returns the last read offset of the record ReadSeeker.
// Offset returns the last read offset of the record in ReadSeeker.
func (p *Player) Offset() int64 {
return p.lastOffset.Load()
}
Expand Down Expand Up @@ -194,7 +195,7 @@ func (p *Player) State() (*state.State, error) {
name = file.Name()
}
s := state.New(name)
p.ForEach(func(ev *Event) error {
if err := p.ForEach(func(ev *Event) error {
if ev == nil {
return nil
}
Expand All @@ -214,6 +215,8 @@ func (p *Player) State() (*state.State, error) {
}
}
return nil
})
}); err != nil {
return nil, err
}
return s, nil
}
17 changes: 17 additions & 0 deletions internal/event/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,20 @@ type Processor interface {

io.Closer
}

type options struct {
dumpFiles bool
}

// Option is a functional option for the processor.
type Option func(*options)

// DumpFiles disables the file processing (enabled by default). It may be
// useful on enterprise workspaces where the file download may be monitored.
// See:
// https://github.com/rusq/slackdump/discussions/191#discussioncomment-4953235
func DumpFiles(b bool) Option {
return func(o *options) {
o.dumpFiles = b
}
}
33 changes: 22 additions & 11 deletions internal/event/processor/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,38 @@ import (
type Standard struct {
*event.Recorder
dl *downloader.Client

opts options
}

func NewStandard(w io.Writer, sess downloader.Downloader, dir string) (*Standard, error) {
r := event.NewRecorder(w)
// NewStandard creates a new standard processor. It will write the output to
// the given writer. The downloader is used to download files. The directory
// is the directory where the files will be downloaded to. The options are
// functional options. See the NoFiles option.
func NewStandard(ctx context.Context, w io.Writer, sess downloader.Downloader, dir string, opts ...Option) (*Standard, error) {
opt := options{dumpFiles: false}
for _, o := range opts {
o(&opt)
}

dl := downloader.New(sess, fsadapter.NewDirectory(dir))
dl.Start(context.Background())
dl.Start(ctx)

r := event.NewRecorder(w)
return &Standard{
Recorder: r,
dl: dl,
opts: opt,
}, nil
}

// Files implements the Processor interface. It will download files if the
// dumpFiles option is enabled.
func (s *Standard) Files(channelID string, parent slack.Message, isThread bool, m []slack.File) error {
if !s.opts.dumpFiles {
// ignore files if requested
return nil
}
// custom file processor, because we need to donwload those files
for i := range m {
if _, err := s.dl.DownloadFile(channelID, m[i]); err != nil {
Expand All @@ -35,14 +54,6 @@ func (s *Standard) Files(channelID string, parent slack.Message, isThread bool,
return nil
}

func fileUrls(ff []slack.File) []string {
var urls = make([]string, 0, len(ff))
for i := range ff {
urls = append(urls, ff[i].URLPrivate)
}
return urls
}

func (s *Standard) Close() error {
// reconstruct the final json file
s.dl.Stop()
Expand Down
1 change: 1 addition & 0 deletions internal/structures/entity_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (el *EntityList) HasExcludes() bool {
return len(el.Exclude) > 0
}

// IsEmpty returns true if there's no entries in the list.
func (el *EntityList) IsEmpty() bool {
return len(el.Include)+len(el.Exclude) == 0
}
Expand Down
Loading

0 comments on commit 7082f05

Please sign in to comment.