diff --git a/cmd/slackdump/internal/convertcmd/convert.go b/cmd/slackdump/internal/convertcmd/convert.go index 26cc5b19..9ab746ea 100644 --- a/cmd/slackdump/internal/convertcmd/convert.go +++ b/cmd/slackdump/internal/convertcmd/convert.go @@ -106,6 +106,7 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error if err != nil { return err } + defer cd.Close() fsa, err := fsadapter.New(trg) if err != nil { return err diff --git a/cmd/slackdump/internal/dump/dump.go b/cmd/slackdump/internal/dump/dump.go index 1b1cfa37..e2495bf6 100644 --- a/cmd/slackdump/internal/dump/dump.go +++ b/cmd/slackdump/internal/dump/dump.go @@ -216,6 +216,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump if err != nil { return err } + defer cd.Close() // Create conversation processor. proc, err := dirproc.NewConversation(cd, subproc, coord, dirproc.WithLogger(lg), dirproc.WithRecordFiles(false)) if err != nil { diff --git a/cmd/slackdump/internal/export/v3.go b/cmd/slackdump/internal/export/v3.go index 9b872171..8cd880bd 100644 --- a/cmd/slackdump/internal/export/v3.go +++ b/cmd/slackdump/internal/export/v3.go @@ -35,6 +35,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li if err != nil { return err } + defer chunkdir.Close() if !lg.IsDebug() { defer chunkdir.RemoveAll() } diff --git a/internal/chunk/chunk_test.go b/internal/chunk/chunk_test.go index 22bc4a60..4e9081da 100644 --- a/internal/chunk/chunk_test.go +++ b/internal/chunk/chunk_test.go @@ -1,6 +1,9 @@ package chunk import ( + "bytes" + "encoding/json" + "io" "testing" "github.com/slack-go/slack" @@ -127,3 +130,15 @@ func TestChunk_ID(t *testing.T) { }) } } + +// marshalChunks turns chunks into io.ReadSeeker +func marshalChunks(chunks ...Chunk) io.ReadSeeker { + var b bytes.Buffer + enc := json.NewEncoder(&b) + for _, c := range chunks { + if err := enc.Encode(c); err != nil { + panic(err) + } + } + return bytes.NewReader(b.Bytes()) +} diff --git a/internal/chunk/chunktest/dirserver.go b/internal/chunk/chunktest/dirserver.go index f41f740a..617beb1f 100644 --- a/internal/chunk/chunktest/dirserver.go +++ b/internal/chunk/chunktest/dirserver.go @@ -23,6 +23,7 @@ func NewDirServer(dir string) *DirServer { if err != nil { panic(err) } + defer cd.Close() ds := &DirServer{ cd: cd, ptrs: make(map[string]*chunk.Player), diff --git a/internal/chunk/directory.go b/internal/chunk/directory.go index 6d271c09..fa950ab9 100644 --- a/internal/chunk/directory.go +++ b/internal/chunk/directory.go @@ -1,8 +1,8 @@ package chunk import ( - "bytes" "compress/gzip" + "encoding/gob" "errors" "fmt" "io" @@ -13,8 +13,6 @@ import ( "sync/atomic" "github.com/slack-go/slack" - - "github.com/rusq/slackdump/v2/internal/osext" ) const ext = ".json.gz" @@ -35,6 +33,7 @@ const ( // compressed with GZIP, unless stated otherwise. type Directory struct { dir string + fm *filemgr cache dcache } @@ -42,6 +41,11 @@ type dcache struct { channels atomic.Value // []slack.Channel } +type filewrapper interface { + io.ReadSeeker + Name() string +} + // OpenDir "opens" an existing directory for read and write operations. // It expects the directory to exist and to be a directory, otherwise it will // return an error. @@ -51,7 +55,11 @@ func OpenDir(dir string) (*Directory, error) { } else if !fi.IsDir() { return nil, fmt.Errorf("not a directory: %s", dir) } - return &Directory{dir: dir}, nil + fm, err := newFileMgr() + if err != nil { + return nil, err + } + return &Directory{dir: dir, fm: fm}, nil } // CreateDir creates and opens a directory. It will create all parent @@ -60,15 +68,25 @@ func CreateDir(dir string) (*Directory, error) { if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err } - return &Directory{dir: dir}, nil + fm, err := newFileMgr() + if err != nil { + return nil, err + } + return &Directory{dir: dir, fm: fm}, nil } // RemoveAll deletes the directory and all its contents. Make sure all files // are closed. func (d *Directory) RemoveAll() error { + _ = d.Close() return os.RemoveAll(d.dir) } +// Close closes the directory and all open files. +func (d *Directory) Close() error { + return d.fm.Destroy() +} + var errNoChannelInfo = errors.New("no channel info") // Channels collects all channels from the chunk directory. First, it @@ -81,21 +99,21 @@ func (d *Directory) Channels() ([]slack.Channel, error) { } // try to open the channels file if fi, err := os.Stat(d.filename(FChannels)); err == nil && !fi.IsDir() { - return loadChannelsJSON(d.filename(FChannels)) + return d.loadChannelsJSON(d.filename(FChannels)) } // channel files not found, try to get channel info from the conversation // files. var ch []slack.Channel - if err := filepath.WalkDir(d.dir, func(path string, d fs.DirEntry, err error) error { + if err := filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error { if err != nil { return err } if !strings.HasSuffix(path, ext) { return nil - } else if d.IsDir() { + } else if de.IsDir() { return nil } - chs, err := loadChanInfo(path) + chs, err := d.loadChanInfo(path) if err != nil { if errors.Is(err, errNoChannelInfo) { return nil @@ -116,9 +134,9 @@ func (d *Directory) Name() string { return d.dir } -func loadChanInfo(fullpath string) ([]slack.Channel, error) { +func (d *Directory) loadChanInfo(fullpath string) ([]slack.Channel, error) { // try to get from cache - f, err := openChunks(fullpath) + f, err := d.fm.Open(fullpath) if err != nil { return nil, err } @@ -133,8 +151,8 @@ func loadChanInfo(fullpath string) ([]slack.Channel, error) { // readChanInfo returns the Channels from all the ChannelInfo chunks in the // file. -func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) { - cf, err := FromReader(rs) +func readChanInfo(wf filewrapper) ([]slack.Channel, error) { + cf, err := cachedFromReader(wf) if err != nil { return nil, err } @@ -143,61 +161,17 @@ func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) { // loadChannelsJSON loads channels json file and returns a slice of // slack.Channel. It expects it to be GZIP compressed. -func loadChannelsJSON(fullpath string) ([]slack.Channel, error) { - f, err := openChunksBuf(fullpath) - if err != nil { - return nil, err - } - cf, err := FromReader(f) - if err != nil { - return nil, err - } - return cf.AllChannels() -} - -// openChunks opens an existing chunk file and returns a ReadSeekCloser. It -// expects a chunkfile to be a gzip-compressed file. -func openChunks(filename string) (io.ReadSeekCloser, error) { - f, err := openfile(filename) - if err != nil { - return nil, err - } - defer f.Close() - - tf, err := osext.UnGZIP(f) - if err != nil { - return nil, err - } - - return osext.RemoveOnClose(tf), nil -} - -func openfile(filename string) (*os.File, error) { - if fi, err := os.Stat(filename); err != nil { - return nil, err - } else if fi.IsDir() { - return nil, errors.New("chunk file is a directory") - } else if fi.Size() == 0 { - return nil, errors.New("chunk file is empty") - } - return os.Open(filename) -} - -func openChunksBuf(filename string) (io.ReadSeeker, error) { - f, err := openfile(filename) +func (d *Directory) loadChannelsJSON(fullpath string) ([]slack.Channel, error) { + f, err := d.fm.Open(fullpath) if err != nil { return nil, err } defer f.Close() - gzr, err := gzip.NewReader(f) + cf, err := cachedFromReader(f) if err != nil { return nil, err } - var buf bytes.Buffer - if _, err := io.Copy(&buf, gzr); err != nil { - return nil, err - } - return bytes.NewReader(buf.Bytes()), nil + return cf.AllChannels() } func (d *Directory) Stat(id FileID) (fs.FileInfo, error) { @@ -221,18 +195,22 @@ func (d *Directory) Users() ([]slack.User, error) { // Open opens a chunk file with the given name. Extension is appended // automatically. func (d *Directory) Open(id FileID) (*File, error) { - f, err := d.OpenRAW(d.filename(id)) + f, err := d.openRAW(d.filename(id)) if err != nil { return nil, err } - return FromReader(f) + return cachedFromReader(f) } // OpenRAW opens a compressed chunk file with filename within the directory, // and returns a ReadSeekCloser. filename is the full name of the file with // extension. func (d *Directory) OpenRAW(filename string) (io.ReadSeekCloser, error) { - return openChunks(filepath.Join(filename)) + return d.openRAW(filename) +} + +func (d *Directory) openRAW(filename string) (*wrappedfile, error) { + return d.fm.Open(filename) } // filename returns the full path of the chunk file with the given fileID. @@ -298,3 +276,32 @@ func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) { } return nil, errors.New("no workspace info found") } + +func cachedFromReader(wf filewrapper) (*File, error) { + // check if index exists. If it does, read it and return chunk.File with it. + r, err := os.Open(wf.Name() + extIdx) + if err == nil { + defer r.Close() + dec := gob.NewDecoder(r) + var idx index + if err := dec.Decode(&idx); err != nil { + return nil, err + } + return fromReaderWithIndex(wf, idx) + } + // write index + cf, err := FromReader(wf) + if err != nil { + return nil, err + } + w, err := os.Create(wf.Name() + extIdx) + if err != nil { + return nil, err + } + defer w.Close() + enc := gob.NewEncoder(w) + if err := enc.Encode(cf.idx); err != nil { + return nil, err + } + return cf, nil +} diff --git a/internal/chunk/directory_test.go b/internal/chunk/directory_test.go index 57166dec..19d0df4e 100644 --- a/internal/chunk/directory_test.go +++ b/internal/chunk/directory_test.go @@ -1,9 +1,8 @@ package chunk import ( - "bytes" - "encoding/json" "io" + "path/filepath" "reflect" "testing" @@ -59,8 +58,9 @@ var ( ) func Test_readChanInfo(t *testing.T) { + dir := t.TempDir() type args struct { - r io.ReadSeeker + r filewrapper } tests := []struct { name string @@ -71,7 +71,8 @@ func Test_readChanInfo(t *testing.T) { { name: "test", args: args{ - r: marshalChunks( + r: testfilewrapper( + filepath.Join(dir, "unit"), TestPublicChannelInfo, TestPublicChannelMessages, ), @@ -96,14 +97,16 @@ func Test_readChanInfo(t *testing.T) { } } -// marshalChunks turns chunks into io.ReadSeeker -func marshalChunks(chunks ...Chunk) io.ReadSeeker { - var b bytes.Buffer - enc := json.NewEncoder(&b) - for _, c := range chunks { - if err := enc.Encode(c); err != nil { - panic(err) - } +func testfilewrapper(name string, chunks ...Chunk) filewrapper { + return nopCloser{ + ReadSeeker: marshalChunks(chunks...), + name: name, } - return bytes.NewReader(b.Bytes()) } + +type nopCloser struct { + name string + io.ReadSeeker +} + +func (n nopCloser) Name() string { return n.name } diff --git a/internal/chunk/file.go b/internal/chunk/file.go index 259e853d..7be7e4f4 100644 --- a/internal/chunk/file.go +++ b/internal/chunk/file.go @@ -64,6 +64,20 @@ func FromReader(rs io.ReadSeeker) (*File, error) { }, nil } +// fromReaderWithIndex creates a new chunk File from the io.ReadSeeker and +// index. +// +// USE WITH CAUTION: It does not check if the file corresponds to the index. +func fromReaderWithIndex(rs io.ReadSeeker, idx index) (*File, error) { + if _, err := rs.Seek(0, io.SeekStart); err != nil { // reset offset + return nil, err + } + return &File{ + rs: rs, + idx: idx, + }, nil +} + // Close closes the underlying reader if it implements io.Closer. func (f *File) Close() error { if c, ok := f.rs.(io.Closer); ok { @@ -98,6 +112,10 @@ func indexChunks(dec decoder) (index, error) { return idx, nil } +func (f *File) Index() index { + return f.idx +} + func caller(steps int) string { name := "?" if pc, _, _, ok := runtime.Caller(steps + 1); ok { diff --git a/internal/chunk/filemgr.go b/internal/chunk/filemgr.go new file mode 100644 index 00000000..4c78d54b --- /dev/null +++ b/internal/chunk/filemgr.go @@ -0,0 +1,109 @@ +package chunk + +import ( + "compress/gzip" + "crypto/sha1" + "fmt" + "io" + "os" + "sync" + + "github.com/rusq/slackdump/v2/logger" +) + +const extIdx = ".idx" + +type filemgr struct { + tmpdir string // temporary storage directory + + mu sync.Mutex // protects the following + existing map[string]string // map of unpacked files (real name to the temporary file name) + handles map[string]io.Closer // map of the temporary file name to it's handle +} + +func newFileMgr() (*filemgr, error) { + tmpdir, err := os.MkdirTemp("", "slackdump-*") + if err != nil { + return nil, err + } + logger.Default.Debugf("created temporary directory: %s", tmpdir) + return &filemgr{ + tmpdir: tmpdir, + existing: make(map[string]string), + handles: make(map[string]io.Closer), + }, nil +} + +// hash returns a hex encoded sha256 hash of the string. +func hash(s string) string { + return fmt.Sprintf("%x", sha1.Sum([]byte(s))) +} + +func (dp *filemgr) Destroy() error { + dp.mu.Lock() + defer dp.mu.Unlock() + for _, f := range dp.handles { + f.Close() + } + // return os.RemoveAll(dp.tmpdir) + return nil +} + +// Open +func (dp *filemgr) Open(name string) (*wrappedfile, error) { + // create the directory if it doesn't exist + if err := os.MkdirAll(dp.tmpdir, 0o755); err != nil { + return nil, err + } + // check if the file is already open + dp.mu.Lock() + defer dp.mu.Unlock() + if tempfile, ok := dp.existing[name]; ok { + etf, err := os.Open(tempfile) // existing temporary file + if err != nil { + return nil, err + } + dp.handles[etf.Name()] = etf + return &wrappedfile{etf, dp}, nil + } + // open the compressed file + tmpname := hash(name) + cf, err := os.Open(name) + if err != nil { + return nil, err + } + defer cf.Close() + gz, err := gzip.NewReader(cf) + if err != nil { + return nil, err + } + // create a temporary file + tf, err := os.CreateTemp(dp.tmpdir, "filemgr-*") + if err != nil { + return nil, err + } + if _, err = io.Copy(tf, gz); err != nil { + return nil, err + } + if err := tf.Sync(); err != nil { + return nil, err + } + if _, err := tf.Seek(0, io.SeekStart); err != nil { + return nil, err + } + dp.existing[name] = tf.Name() + dp.handles[tmpname] = tf + return &wrappedfile{tf, dp}, nil +} + +type wrappedfile struct { + *os.File + dp *filemgr +} + +func (wf *wrappedfile) Close() error { + wf.dp.mu.Lock() + defer wf.dp.mu.Unlock() + delete(wf.dp.handles, wf.Name()) + return wf.File.Close() +} diff --git a/internal/chunk/transform/export_test.go b/internal/chunk/transform/export_test.go index 0cf751b5..93772d03 100644 --- a/internal/chunk/transform/export_test.go +++ b/internal/chunk/transform/export_test.go @@ -45,6 +45,7 @@ func Test_transform(t *testing.T) { if err != nil { t.Fatal(err) } + defer cd.Close() cvt := ExpConverter{ cd: cd, fsa: tt.args.fsa, diff --git a/internal/chunk/transform/standard.go b/internal/chunk/transform/standard.go index edaca06f..21534ebe 100644 --- a/internal/chunk/transform/standard.go +++ b/internal/chunk/transform/standard.go @@ -47,6 +47,7 @@ func NewStandard(fsa fsadapter.FS, dir string, opts ...StdOption) (*StdConverter if err != nil { return nil, err } + defer cd.Close() std := &StdConverter{ cd: cd, fsa: fsa, diff --git a/internal/chunk/transform/standard_test.go b/internal/chunk/transform/standard_test.go index 2007f43d..5eb620e5 100644 --- a/internal/chunk/transform/standard_test.go +++ b/internal/chunk/transform/standard_test.go @@ -21,6 +21,7 @@ func Test_stdConvert(t *testing.T) { if err != nil { t.Fatal(err) } + defer cd.Close() fsa, err := fsadapter.New("output-dump.zip") if err != nil { t.Fatal(err) diff --git a/internal/convert/chunkexp_test.go b/internal/convert/chunkexp_test.go index 77f1b6e0..e0bdb83d 100644 --- a/internal/convert/chunkexp_test.go +++ b/internal/convert/chunkexp_test.go @@ -25,6 +25,7 @@ func TestChunkToExport_Validate(t *testing.T) { if err != nil { t.Fatal(err) } + defer srcDir.Close() var testTrgDir = t.TempDir() type fields struct { @@ -98,6 +99,7 @@ func TestChunkToExport_Convert(t *testing.T) { if err != nil { t.Fatal(err) } + defer cd.Close() testTrgDir, err := os.MkdirTemp("", "slackdump") if err != nil { t.Fatal(err) diff --git a/internal/osext/compression.go b/internal/osext/compression.go index 7bb13c99..9d7c93b6 100644 --- a/internal/osext/compression.go +++ b/internal/osext/compression.go @@ -24,6 +24,9 @@ func UnGZIP(r io.Reader) (*os.File, error) { if err != nil { return nil, err } + if err := f.Sync(); err != nil { + return nil, err + } // reset temporary file position to prepare it for reading. if _, err := f.Seek(0, io.SeekStart); err != nil { return nil, err