From f720e1c43eca1cc9c8b191c07812e0d4898da175 Mon Sep 17 00:00:00 2001 From: Rustam Gilyazov <16064414+rusq@users.noreply.github.com> Date: Tue, 28 Feb 2023 21:55:38 +1000 Subject: [PATCH] where did that file go? --- .gitignore | 3 +- cmd/slackdump/internal/dump/dump.go | 24 ----- cmd/slackdump/internal/dump/namer.go | 32 +++++++ internal/chunk/player.go | 48 +++++++++- internal/chunk/state/chunk.go | 68 ++++++++++++++ internal/chunk/state/state.go | 23 +++++ internal/osext/move.go | 54 +++++++++++ internal/osext/osext.go | 2 + internal/transform/standard.go | 134 ++++++++++++++++++++------- internal/transform/standard_test.go | 27 ++++++ stream.go | 16 ++-- {contrib => utils}/record_stats.py | 0 utils/visualise.py | 68 ++++++++++++++ 13 files changed, 432 insertions(+), 67 deletions(-) create mode 100644 cmd/slackdump/internal/dump/namer.go create mode 100644 internal/chunk/state/chunk.go create mode 100644 internal/osext/move.go create mode 100644 internal/osext/osext.go create mode 100644 internal/transform/standard_test.go rename {contrib => utils}/record_stats.py (100%) create mode 100755 utils/visualise.py diff --git a/.gitignore b/.gitignore index 88c355a0..d300d292 100644 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,5 @@ dist/ *.state # sundry junk used for testing and other fuckery -/tmp \ No newline at end of file +/tmp +*.dot diff --git a/cmd/slackdump/internal/dump/dump.go b/cmd/slackdump/internal/dump/dump.go index 058388ea..8c0b1332 100644 --- a/cmd/slackdump/internal/dump/dump.go +++ b/cmd/slackdump/internal/dump/dump.go @@ -150,30 +150,6 @@ func dumpv2(ctx context.Context, sess *slackdump.Session, list *structures.Entit return nil } -// namer is a helper type to generate filenames for conversations. -type namer struct { - t *template.Template - ext string -} - -// newNamer returns a new namer. It must be called with a valid template. -func newNamer(tmpl string, ext string) (namer, error) { - t, err := template.New("name").Parse(tmpl) - if err != nil { - return namer{}, err - } - return namer{t: t, ext: ext}, nil -} - -// Filename returns the filename for the given conversation. -func (n namer) Filename(conv *types.Conversation) string { - var buf strings.Builder - if err := n.t.Execute(&buf, conv); err != nil { - panic(err) - } - return buf.String() + "." + n.ext -} - func save(ctx context.Context, fs fsadapter.FS, filename string, conv *types.Conversation) error { _, task := trace.NewTask(ctx, "saveData") defer task.End() diff --git a/cmd/slackdump/internal/dump/namer.go b/cmd/slackdump/internal/dump/namer.go new file mode 100644 index 00000000..bb69a9cb --- /dev/null +++ b/cmd/slackdump/internal/dump/namer.go @@ -0,0 +1,32 @@ +package dump + +import ( + "strings" + "text/template" + + "github.com/rusq/slackdump/v2/types" +) + +// namer is a helper type to generate filenames for conversations. +type namer struct { + t *template.Template + ext string +} + +// newNamer returns a new namer. It must be called with a valid template. +func newNamer(tmpl string, ext string) (namer, error) { + t, err := template.New("name").Parse(tmpl) + if err != nil { + return namer{}, err + } + return namer{t: t, ext: ext}, nil +} + +// Filename returns the filename for the given conversation. +func (n namer) Filename(conv *types.Conversation) string { + var buf strings.Builder + if err := n.t.Execute(&buf, conv); err != nil { + panic(err) + } + return buf.String() + "." + n.ext +} diff --git a/internal/chunk/player.go b/internal/chunk/player.go index 5243b497..a989414e 100644 --- a/internal/chunk/player.go +++ b/internal/chunk/player.go @@ -5,6 +5,7 @@ import ( "errors" "io" "path/filepath" + "strings" "sync/atomic" "github.com/slack-go/slack" @@ -157,6 +158,7 @@ func (p *Player) HasMoreThreads(channelID string, threadTS string) bool { return p.hasMore(threadID(channelID, threadTS)) } +// Reset resets the state of the Player. func (p *Player) Reset() error { p.pointer = make(offsets) _, err := p.rs.Seek(0, io.SeekStart) @@ -193,7 +195,8 @@ type namer interface { Name() string } -// State returns the state of the player. +// State generates and returns the state of the player. It does not include +// the path to the downloaded files. func (p *Player) State() (*state.State, error) { var name string if file, ok := p.rs.(namer); ok { @@ -206,6 +209,8 @@ func (p *Player) State() (*state.State, error) { } if ev.Type == CFiles { for _, f := range ev.Files { + // we are adding the files with the empty path as we + // have no way of knowing if the file was downloaded or not. s.AddFile(ev.ChannelID, f.ID, "") } } @@ -225,3 +230,44 @@ func (p *Player) State() (*state.State, error) { } return s, nil } + +// allMessagesForID returns all the messages for the given id. It will reset +// the Player prior to execution. +func (p *Player) allMessagesForID(id string) ([]slack.Message, error) { + if err := p.Reset(); err != nil { + return nil, err + } + var m []slack.Message + for { + chunk, err := p.tryGetChunk(id) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + m = append(m, chunk.Messages...) + } + return m, nil +} + +// AllMessages returns all the messages for the given channel. +func (p *Player) AllMessages(channelID string) ([]slack.Message, error) { + return p.allMessagesForID(channelID) +} + +// AllThreadMessages returns all the messages for the given thread. +func (p *Player) AllThreadMessages(channelID, threadTS string) ([]slack.Message, error) { + return p.allMessagesForID(threadID(channelID, threadTS)) +} + +// AllChannels returns all the channels in the chunkfile. +func (p *Player) AllChannels() []string { + var ids []string + for id := range p.idx { + if !strings.Contains(id, ":") { + ids = append(ids, id) + } + } + return ids +} diff --git a/internal/chunk/state/chunk.go b/internal/chunk/state/chunk.go new file mode 100644 index 00000000..2e729ba8 --- /dev/null +++ b/internal/chunk/state/chunk.go @@ -0,0 +1,68 @@ +package state + +import ( + "compress/gzip" + "io" + "os" + "path/filepath" +) + +// OpenChunks attempts to open the chunk file linked in the State. If the +// chunk is compressed, it will be decompressed and a temporary file will be +// created. The temporary file will be removed when the OpenChunks is +// closed. +func (st *State) OpenChunks(basePath string) (io.ReadSeekCloser, error) { + f, err := os.Open(filepath.Join(basePath, st.Filename)) + if err != nil { + return nil, err + } + if st.IsCompressed { + tf, err := uncompress(f) + if err != nil { + return nil, err + } + return removeOnClose(tf.Name(), tf), nil + } + return f, nil +} + +func removeOnClose(name string, r io.ReadSeekCloser) io.ReadSeekCloser { + return removeWrapper{filename: name, ReadSeekCloser: r} +} + +type removeWrapper struct { + io.ReadSeekCloser + + filename string +} + +func (r removeWrapper) Close() error { + err := r.ReadSeekCloser.Close() + if err != nil { + return err + } + return os.Remove(r.filename) +} + +// uncompress decompresses a gzip file and returns a temporary file handler. +// it must be removed after use. +func uncompress(r io.Reader) (*os.File, error) { + gr, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer gr.Close() + f, err := os.CreateTemp("", "fsadapter-*") + if err != nil { + return nil, err + } + _, err = io.Copy(f, gr) + if err != nil { + return nil, err + } + // reset temporary file position to prepare it for reading. + if _, err := f.Seek(0, io.SeekStart); err != nil { + return nil, err + } + return f, nil +} diff --git a/internal/chunk/state/state.go b/internal/chunk/state/state.go index 5ce9a3eb..2340ec85 100644 --- a/internal/chunk/state/state.go +++ b/internal/chunk/state/state.go @@ -5,6 +5,7 @@ import ( "io" "os" "strconv" + "strings" "sync" ) @@ -100,6 +101,28 @@ func (s *State) AddFile(channelID, fileID string, path string) { s.Files[channelID+":"+fileID] = path } +// AllFiles returns all saved files for the given channel. +func (s *State) AllFiles(channelID string) []string { + s.mu.RLock() + defer s.mu.RUnlock() + + var files []string + for fileChanID, path := range s.Files { + id, _, _ := strings.Cut(fileChanID, ":") + if id == channelID { + files = append(files, path) + } + } + return files +} + +func (s *State) FilePath(channelID, fileID string) string { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.Files[channelID+":"+fileID] +} + // tsUpdate updates the map with the given ID and value if the value is greater. func tsUpdate(m map[string]int64, id string, val string) { currVal, err := ts2int(val) diff --git a/internal/osext/move.go b/internal/osext/move.go new file mode 100644 index 00000000..c3e804c9 --- /dev/null +++ b/internal/osext/move.go @@ -0,0 +1,54 @@ +package osext + +import ( + "fmt" + "io" + "os" + + "github.com/rusq/fsadapter" +) + +// MoveFile moves a file from src to dst. If dst already exists, it will be +// overwritten. +// +// Adopted solution from https://stackoverflow.com/questions/50740902/move-a-file-to-a-different-drive-with-go +// TODO: This is a temporary solution. We should use os.Rename() instead, but +// that doesn't work across filesystems, see the above link. +func MoveFile(src string, fs fsadapter.FS, dst string) error { + in, err := os.Open(src) + if err != nil { + return fmt.Errorf("unable to open source file: %s", err) + } + + out, err := fs.Create(dst) + if err != nil { + in.Close() + return fmt.Errorf("unable to open destination file: %s", err) + } + defer out.Close() + + _, err = io.Copy(out, in) + in.Close() + if err != nil { + return fmt.Errorf("error writing output: %s", err) + } + + // sync is not supported by fsadapter. + // if err := out.Sync(); err != nil { + // return fmt.Errorf("sync: %s", err) + // } + + if _, err := os.Stat(src); err != nil { + return fmt.Errorf("stat: %s", err) + } else { + // Chmod not yet supported. + // if err := fs.Chmod(dst, si.Mode()); err != nil { + // return fmt.Errorf("chmod: %s", err) + // } + } + + if err := os.Remove(src); err != nil { + return fmt.Errorf("failed removing source: %s", err) + } + return nil +} diff --git a/internal/osext/osext.go b/internal/osext/osext.go new file mode 100644 index 00000000..cf1d3091 --- /dev/null +++ b/internal/osext/osext.go @@ -0,0 +1,2 @@ +// Package osext provides some extended functionality for the os package. +package osext diff --git a/internal/transform/standard.go b/internal/transform/standard.go index 5a6df516..75bc39ed 100644 --- a/internal/transform/standard.go +++ b/internal/transform/standard.go @@ -1,70 +1,140 @@ package transform import ( - "compress/gzip" + "encoding/json" "fmt" - "io" - "os" + "path/filepath" + "github.com/rusq/dlog" "github.com/rusq/fsadapter" "github.com/rusq/slackdump/v2/internal/chunk" "github.com/rusq/slackdump/v2/internal/chunk/state" + "github.com/rusq/slackdump/v2/internal/osext" + "github.com/rusq/slackdump/v2/internal/structures/files" + "github.com/rusq/slackdump/v2/types" ) type Standard struct { - fs fsadapter.FS + fs fsadapter.FS + nameFn func(*types.Conversation) string + updateFileLink bool } -func NewStandard(fs fsadapter.FS) *Standard { - return &Standard{fs: fs} +// NewStandard returns a new Standard transformer, nameFn should return the +// filename for a given conversation. This is the name that the conversation +// will be written to the filesystem. +func NewStandard(fs fsadapter.FS, nameFn func(*types.Conversation) string) *Standard { + return &Standard{fs: fs, nameFn: nameFn, updateFileLink: true} } -func (s *Standard) Transform(st *state.State) error { +func (s *Standard) Transform(st *state.State, basePath string) error { if st == nil { return fmt.Errorf("nil state") } - var rs io.ReadSeeker - f, err := os.Open(st.Filename) + rsc, err := st.OpenChunks(basePath) if err != nil { return err } - defer f.Close() - if st.IsCompressed { - tf, err := uncompress(f) + defer rsc.Close() + + pl, err := chunk.NewPlayer(rsc) + if err != nil { + return err + } + + allCh := pl.AllChannels() + for _, ch := range allCh { + conv, err := s.conversation(pl, st, basePath, ch) if err != nil { return err } - defer os.Remove(tf.Name()) - defer tf.Close() - rs = tf - } else { - rs = f - } - pl, err := chunk.NewPlayer(rs) - if err != nil { - return err + if err := s.saveConversation(conv); err != nil { + return err + } } - _ = pl return nil } -// uncompress decompresses a gzip file and returns a temporary file handler. -// it must be removed after use. -func uncompress(r io.Reader) (*os.File, error) { - gr, err := gzip.NewReader(r) +func (s *Standard) conversation(pl *chunk.Player, st *state.State, basePath string, ch string) (*types.Conversation, error) { + mm, err := pl.AllMessages(ch) if err != nil { return nil, err } - defer gr.Close() - f, err := os.CreateTemp("", "fsadapter-*") - if err != nil { + conv := &types.Conversation{ + ID: ch, + Messages: make([]types.Message, 0, len(mm)), + } + for i := range mm { + if mm[i].SubType == "thread_broadcast" { + // this we don't eat. + // skip thread broadcasts, they're not useful + continue + } + var sdm types.Message + sdm.Message = mm[i] + if mm[i].ThreadTimestamp != "" { + // if there's a thread timestamp, we need to find and add it. + thread, err := pl.AllThreadMessages(ch, mm[i].ThreadTimestamp) + if err != nil { + return nil, err + } + sdm.ThreadReplies = types.ConvertMsgs(thread) + // update the file links, if requested + if err := s.transferFiles(st, basePath, sdm.ThreadReplies, ch); err != nil { + return nil, err + } + } + conv.Messages = append(conv.Messages, sdm) + } + // update the file links, if requested + if err := s.transferFiles(st, basePath, conv.Messages, ch); err != nil { return nil, err } - _, err = io.Copy(f, gr) + return conv, nil +} + +func (s *Standard) transferFiles(st *state.State, basePath string, mm []types.Message, ch string) error { + for i := range mm { + if mm[i].Files == nil { + continue + } + for j := range mm[i].Files { + fp := st.FilePath(ch, mm[i].Files[j].ID) + if fp == "" { + return fmt.Errorf("unable to generate the filename for: %v", mm[i].Files[j]) + } + srcPath := filepath.Join(basePath, fp) + fsTrgPath := filepath.Join(ch, "attachments", filepath.Base(srcPath)) + if err := osext.MoveFile(srcPath, s.fs, fsTrgPath); err != nil { + dlog.Printf("file missing: %q", srcPath) + return fmt.Errorf("error moving %q to %q", srcPath, fsTrgPath) + } + // TODO: simplify this + if s.updateFileLink { + if err := files.UpdateFileLinksAll(&mm[i].Files[j], func(ptrS *string) error { + *ptrS = fsTrgPath + return nil + }); err != nil { + return err + } + } + } + } + return nil +} + +func (s *Standard) saveConversation(conv *types.Conversation) error { + if conv == nil { + return fmt.Errorf("nil conversation") + } + f, err := s.fs.Create(s.nameFn(conv)) if err != nil { - return nil, err + return err } - return f, nil + defer f.Close() + enc := json.NewEncoder(f) + enc.SetIndent("", " ") + return enc.Encode(conv) } diff --git a/internal/transform/standard_test.go b/internal/transform/standard_test.go new file mode 100644 index 00000000..3bab0ac0 --- /dev/null +++ b/internal/transform/standard_test.go @@ -0,0 +1,27 @@ +package transform + +import ( + "path/filepath" + "testing" + + "github.com/rusq/fsadapter" + "github.com/rusq/slackdump/v2/internal/chunk/state" + "github.com/rusq/slackdump/v2/types" +) + +const whereTheTempIsAt = "../../tmp" + +func TestStandard_Transform(t *testing.T) { + // MANUAL + fs := fsadapter.NewDirectory(filepath.Join(whereTheTempIsAt, "manual")) + s := NewStandard(fs, func(c *types.Conversation) string { + return c.ID + ".json" + }) + st, err := state.Load(filepath.Join(whereTheTempIsAt, "C01SPFM1KNY.state")) + if err != nil { + t.Fatalf("state.Load(): %s", err) + } + if err := s.Transform(st, whereTheTempIsAt); err != nil { + t.Fatal(err) + } +} diff --git a/stream.go b/stream.go index ac8c42a2..5c6ab841 100644 --- a/stream.go +++ b/stream.go @@ -97,15 +97,14 @@ func (cs *channelStream) channel(ctx context.Context, id string, proc processor. return fmt.Errorf("failed to process message chunk starting with id=%s (size=%d): %w", resp.Messages[0].Msg.ClientMsgID, len(resp.Messages), err) } for i := range resp.Messages { - idx := i - if resp.Messages[idx].Msg.ThreadTimestamp != "" && resp.Messages[idx].Msg.SubType != "thread_broadcast" { - dlog.Debugf("- message #%d/thread: id=%s, thread_ts=%s, cursor=%s", i, resp.Messages[idx].ClientMsgID, resp.Messages[idx].Msg.ThreadTimestamp, cursor) - if err := cs.thread(ctx, id, resp.Messages[idx].Msg.ThreadTimestamp, proc); err != nil { + if resp.Messages[i].Msg.ThreadTimestamp != "" && resp.Messages[i].Msg.SubType != "thread_broadcast" { + dlog.Debugf("- message #%d/thread: id=%s, thread_ts=%s, cursor=%s", i, resp.Messages[i].ClientMsgID, resp.Messages[i].Msg.ThreadTimestamp, cursor) + if err := cs.thread(ctx, id, resp.Messages[i].Msg.ThreadTimestamp, proc); err != nil { return err } } - if resp.Messages[idx].Files != nil && len(resp.Messages[idx].Files) > 0 { - if err := proc.Files(ctx, id, resp.Messages[idx], false, resp.Messages[idx].Files); err != nil { + if len(resp.Messages[i].Files) > 0 { + if err := proc.Files(ctx, id, resp.Messages[i], false, resp.Messages[i].Files); err != nil { return err } } @@ -154,9 +153,8 @@ func (cs *channelStream) thread(ctx context.Context, id string, threadTS string, } // extract files from thread messages for i := range msgs[1:] { - idx := i - if msgs[idx].Files != nil && len(msgs[idx].Files) > 0 { - if err := proc.Files(ctx, id, msgs[idx], true, msgs[idx].Files); err != nil { + if len(msgs[i].Files) > 0 { + if err := proc.Files(ctx, id, msgs[i], true, msgs[i].Files); err != nil { return err } } diff --git a/contrib/record_stats.py b/utils/record_stats.py similarity index 100% rename from contrib/record_stats.py rename to utils/record_stats.py diff --git a/utils/visualise.py b/utils/visualise.py new file mode 100755 index 00000000..da78057c --- /dev/null +++ b/utils/visualise.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Visualises the chunks. + +Usage: visualise.py + +Example: + python3 visualise.py ../data/2021-01-01.jsonl | dot -Tpng -o graph.png + +It will generate a graph of the messages, threads and files. Files, that have +an underscore prefix, are files that are attached to a message. Files, that +have no prefix, are files that are in the files chunk, so, if each message +that has file attachments must have two nodes for each file linked to it. +""" +import sys +import json + +CHUNK_MESSAGE = 0 +CHUNK_THREAD = 1 +CHUNK_FILE = 2 + +COLOR_MSG= "#54AEA6" +COLOR_MSG_FILE = "#00FFFF" +COLOR_THREAD = "#E0CA87" +COLOR_FILE = "#C4B7D5" + +def main(args: list[str]): + """ + Main function + """ + if len(args) != 1: + print("Usage: visualise.py ") + print("Example: python3 visualise.py ../data/2021-01-01.jsonl | dot -Tpng -o graph.png") + sys.exit(1) + + with open(args[0], "r") as file: + print("digraph {") + print("rankdir=LR;") + print("node [shape=box];") + for line in file: + chunk = json.loads(line) + if chunk["_t"] == CHUNK_MESSAGE: + for msg in chunk["_m"]: + print(f"{msg['ts']} [fillcolor=\"{COLOR_MSG}\"; style=filled];") + if files := msg.get("files"): + if files: + for file in files: + print(f"_{file['id']}[fillcolor=\"{COLOR_MSG_FILE}\"; style=filled];") + print(f"{msg['ts']} -> _{file['id']};") + elif chunk["_t"] == CHUNK_THREAD: + for msg in chunk["_m"]: + print(f"{msg['ts']}[fillcolor=\"{COLOR_THREAD}\"; style=filled];") + print(f"{chunk['_p']['ts']} -> {msg['ts']};") + if files := msg.get("files"): + if files: + for file in files: + print(f"_{file['id']}[fillcolor=\"{COLOR_MSG_FILE}\"; style=filled];") + print(f"{msg['ts']} -> _{file['id']};") + elif chunk["_t"] == CHUNK_FILE: + for file in chunk["_f"]: + print(f"{file['id']}[fillcolor=\"{COLOR_FILE}\"; style=filled];") + print(f"{chunk['_p']['ts']} -> {file['id']};") + else: + raise("Unknown chunk type: " + str(chunk["_t"])) + print("}") +if __name__ == '__main__': + main(sys.argv[1:])