Skip to content

Commit

Permalink
implement chunk caching
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed May 16, 2023
1 parent 546fb96 commit 1f47770
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 77 deletions.
1 change: 1 addition & 0 deletions cmd/slackdump/internal/convertcmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func chunk2export(ctx context.Context, src, trg string, cflg convertflags) error
if err != nil {
return err
}
defer cd.Close()
fsa, err := fsadapter.New(trg)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/slackdump/internal/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func dump(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, p dump
if err != nil {
return err
}
defer cd.Close()
// Create conversation processor.
proc, err := dirproc.NewConversation(cd, subproc, coord, dirproc.WithLogger(lg), dirproc.WithRecordFiles(false))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/slackdump/internal/export/v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func exportV3(ctx context.Context, sess *slackdump.Session, fsa fsadapter.FS, li
if err != nil {
return err
}
defer chunkdir.Close()
if !lg.IsDebug() {
defer chunkdir.RemoveAll()
}
Expand Down
15 changes: 15 additions & 0 deletions internal/chunk/chunk_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package chunk

import (
"bytes"
"encoding/json"
"io"
"testing"

"github.com/slack-go/slack"
Expand Down Expand Up @@ -127,3 +130,15 @@ func TestChunk_ID(t *testing.T) {
})
}
}

// marshalChunks turns chunks into io.ReadSeeker
func marshalChunks(chunks ...Chunk) io.ReadSeeker {
var b bytes.Buffer
enc := json.NewEncoder(&b)
for _, c := range chunks {
if err := enc.Encode(c); err != nil {
panic(err)
}
}
return bytes.NewReader(b.Bytes())
}
1 change: 1 addition & 0 deletions internal/chunk/chunktest/dirserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func NewDirServer(dir string) *DirServer {
if err != nil {
panic(err)
}
defer cd.Close()
ds := &DirServer{
cd: cd,
ptrs: make(map[string]*chunk.Player),
Expand Down
135 changes: 71 additions & 64 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package chunk

import (
"bytes"
"compress/gzip"
"encoding/gob"
"errors"
"fmt"
"io"
Expand All @@ -13,8 +13,6 @@ import (
"sync/atomic"

"github.com/slack-go/slack"

"github.com/rusq/slackdump/v2/internal/osext"
)

const ext = ".json.gz"
Expand All @@ -35,13 +33,19 @@ const (
// compressed with GZIP, unless stated otherwise.
type Directory struct {
dir string
fm *filemgr
cache dcache
}

type dcache struct {
channels atomic.Value // []slack.Channel
}

type filewrapper interface {
io.ReadSeeker
Name() string
}

// OpenDir "opens" an existing directory for read and write operations.
// It expects the directory to exist and to be a directory, otherwise it will
// return an error.
Expand All @@ -51,7 +55,11 @@ func OpenDir(dir string) (*Directory, error) {
} else if !fi.IsDir() {
return nil, fmt.Errorf("not a directory: %s", dir)
}
return &Directory{dir: dir}, nil
fm, err := newFileMgr()
if err != nil {
return nil, err
}
return &Directory{dir: dir, fm: fm}, nil
}

// CreateDir creates and opens a directory. It will create all parent
Expand All @@ -60,15 +68,25 @@ func CreateDir(dir string) (*Directory, error) {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
return &Directory{dir: dir}, nil
fm, err := newFileMgr()
if err != nil {
return nil, err
}
return &Directory{dir: dir, fm: fm}, nil
}

// RemoveAll deletes the directory and all its contents. Make sure all files
// are closed.
func (d *Directory) RemoveAll() error {
_ = d.Close()
return os.RemoveAll(d.dir)
}

// Close closes the directory and all open files.
func (d *Directory) Close() error {
return d.fm.Destroy()
}

var errNoChannelInfo = errors.New("no channel info")

// Channels collects all channels from the chunk directory. First, it
Expand All @@ -81,21 +99,21 @@ func (d *Directory) Channels() ([]slack.Channel, error) {
}
// try to open the channels file
if fi, err := os.Stat(d.filename(FChannels)); err == nil && !fi.IsDir() {
return loadChannelsJSON(d.filename(FChannels))
return d.loadChannelsJSON(d.filename(FChannels))
}
// channel files not found, try to get channel info from the conversation
// files.
var ch []slack.Channel
if err := filepath.WalkDir(d.dir, func(path string, d fs.DirEntry, err error) error {
if err := filepath.WalkDir(d.dir, func(path string, de fs.DirEntry, err error) error {
if err != nil {
return err
}
if !strings.HasSuffix(path, ext) {
return nil
} else if d.IsDir() {
} else if de.IsDir() {
return nil
}
chs, err := loadChanInfo(path)
chs, err := d.loadChanInfo(path)
if err != nil {
if errors.Is(err, errNoChannelInfo) {
return nil
Expand All @@ -116,9 +134,9 @@ func (d *Directory) Name() string {
return d.dir
}

func loadChanInfo(fullpath string) ([]slack.Channel, error) {
func (d *Directory) loadChanInfo(fullpath string) ([]slack.Channel, error) {
// try to get from cache
f, err := openChunks(fullpath)
f, err := d.fm.Open(fullpath)
if err != nil {
return nil, err
}
Expand All @@ -133,8 +151,8 @@ func loadChanInfo(fullpath string) ([]slack.Channel, error) {

// readChanInfo returns the Channels from all the ChannelInfo chunks in the
// file.
func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) {
cf, err := FromReader(rs)
func readChanInfo(wf filewrapper) ([]slack.Channel, error) {
cf, err := cachedFromReader(wf)
if err != nil {
return nil, err
}
Expand All @@ -143,61 +161,17 @@ func readChanInfo(rs io.ReadSeeker) ([]slack.Channel, error) {

// loadChannelsJSON loads channels json file and returns a slice of
// slack.Channel. It expects it to be GZIP compressed.
func loadChannelsJSON(fullpath string) ([]slack.Channel, error) {
f, err := openChunksBuf(fullpath)
if err != nil {
return nil, err
}
cf, err := FromReader(f)
if err != nil {
return nil, err
}
return cf.AllChannels()
}

// openChunks opens an existing chunk file and returns a ReadSeekCloser. It
// expects a chunkfile to be a gzip-compressed file.
func openChunks(filename string) (io.ReadSeekCloser, error) {
f, err := openfile(filename)
if err != nil {
return nil, err
}
defer f.Close()

tf, err := osext.UnGZIP(f)
if err != nil {
return nil, err
}

return osext.RemoveOnClose(tf), nil
}

func openfile(filename string) (*os.File, error) {
if fi, err := os.Stat(filename); err != nil {
return nil, err
} else if fi.IsDir() {
return nil, errors.New("chunk file is a directory")
} else if fi.Size() == 0 {
return nil, errors.New("chunk file is empty")
}
return os.Open(filename)
}

func openChunksBuf(filename string) (io.ReadSeeker, error) {
f, err := openfile(filename)
func (d *Directory) loadChannelsJSON(fullpath string) ([]slack.Channel, error) {
f, err := d.fm.Open(fullpath)
if err != nil {
return nil, err
}
defer f.Close()
gzr, err := gzip.NewReader(f)
cf, err := cachedFromReader(f)
if err != nil {
return nil, err
}
var buf bytes.Buffer
if _, err := io.Copy(&buf, gzr); err != nil {
return nil, err
}
return bytes.NewReader(buf.Bytes()), nil
return cf.AllChannels()
}

func (d *Directory) Stat(id FileID) (fs.FileInfo, error) {
Expand All @@ -221,18 +195,22 @@ func (d *Directory) Users() ([]slack.User, error) {
// Open opens a chunk file with the given name. Extension is appended
// automatically.
func (d *Directory) Open(id FileID) (*File, error) {
f, err := d.OpenRAW(d.filename(id))
f, err := d.openRAW(d.filename(id))
if err != nil {
return nil, err
}
return FromReader(f)
return cachedFromReader(f)
}

// OpenRAW opens a compressed chunk file with filename within the directory,
// and returns a ReadSeekCloser. filename is the full name of the file with
// extension.
func (d *Directory) OpenRAW(filename string) (io.ReadSeekCloser, error) {
return openChunks(filepath.Join(filename))
return d.openRAW(filename)
}

func (d *Directory) openRAW(filename string) (*wrappedfile, error) {
return d.fm.Open(filename)
}

// filename returns the full path of the chunk file with the given fileID.
Expand Down Expand Up @@ -298,3 +276,32 @@ func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) {
}
return nil, errors.New("no workspace info found")
}

func cachedFromReader(wf filewrapper) (*File, error) {
// check if index exists. If it does, read it and return chunk.File with it.
r, err := os.Open(wf.Name() + extIdx)
if err == nil {
defer r.Close()
dec := gob.NewDecoder(r)
var idx index
if err := dec.Decode(&idx); err != nil {
return nil, err
}
return fromReaderWithIndex(wf, idx)
}
// write index
cf, err := FromReader(wf)
if err != nil {
return nil, err
}
w, err := os.Create(wf.Name() + extIdx)
if err != nil {
return nil, err
}
defer w.Close()
enc := gob.NewEncoder(w)
if err := enc.Encode(cf.idx); err != nil {
return nil, err
}
return cf, nil
}
29 changes: 16 additions & 13 deletions internal/chunk/directory_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package chunk

import (
"bytes"
"encoding/json"
"io"
"path/filepath"
"reflect"
"testing"

Expand Down Expand Up @@ -59,8 +58,9 @@ var (
)

func Test_readChanInfo(t *testing.T) {
dir := t.TempDir()
type args struct {
r io.ReadSeeker
r filewrapper
}
tests := []struct {
name string
Expand All @@ -71,7 +71,8 @@ func Test_readChanInfo(t *testing.T) {
{
name: "test",
args: args{
r: marshalChunks(
r: testfilewrapper(
filepath.Join(dir, "unit"),
TestPublicChannelInfo,
TestPublicChannelMessages,
),
Expand All @@ -96,14 +97,16 @@ func Test_readChanInfo(t *testing.T) {
}
}

// marshalChunks turns chunks into io.ReadSeeker
func marshalChunks(chunks ...Chunk) io.ReadSeeker {
var b bytes.Buffer
enc := json.NewEncoder(&b)
for _, c := range chunks {
if err := enc.Encode(c); err != nil {
panic(err)
}
func testfilewrapper(name string, chunks ...Chunk) filewrapper {
return nopCloser{
ReadSeeker: marshalChunks(chunks...),
name: name,
}
return bytes.NewReader(b.Bytes())
}

type nopCloser struct {
name string
io.ReadSeeker
}

func (n nopCloser) Name() string { return n.name }
Loading

0 comments on commit 1f47770

Please sign in to comment.