Skip to content

Commit

Permalink
introduce chunk.FileID
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Apr 29, 2023
1 parent 3ea946a commit 257b6b8
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/slackdump/internal/export/expproc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type baseproc struct {
*chunk.Recorder
}

func newBaseProc(dir string, name string) (*baseproc, error) {
func newBaseProc(dir string, name chunk.FileID) (*baseproc, error) {
cd, err := chunk.OpenDir(dir)
if err != nil {
return nil, err
Expand Down
49 changes: 26 additions & 23 deletions cmd/slackdump/internal/export/expproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type Transformer interface {
// for the channel id becomes zero (meaning, that there are no more chunks
// to process). It should return [transform.ErrClosed] if the transformer
// is closed.
Transform(ctx context.Context, id string) error
Transform(ctx context.Context, id chunk.FileID) error
}

// Conversations is a processor that writes the channel and thread messages.
// Zero value is unusable. Use [NewConversation] to create a new instance.
type Conversations struct {
dir string
cw map[string]*channelproc
cw map[chunk.FileID]*channelproc
mu sync.RWMutex
lg logger.Interface

Expand Down Expand Up @@ -85,7 +85,7 @@ func NewConversation(dir string, filesSubproc processor.Filer, tf Transformer, o
c := &Conversations{
dir: dir,
lg: logger.Default,
cw: make(map[string]*channelproc),
cw: make(map[chunk.FileID]*channelproc),
fileSubproc: filesSubproc,
tf: tf,
}
Expand All @@ -97,7 +97,7 @@ func NewConversation(dir string, filesSubproc processor.Filer, tf Transformer, o

// ensure ensures that the channel file is open and the recorder is
// initialized.
func (cv *Conversations) ensure(id string) error {
func (cv *Conversations) ensure(id chunk.FileID) error {
cv.mu.Lock()
defer cv.mu.Unlock()
if _, ok := cv.cw[id]; ok {
Expand All @@ -116,14 +116,14 @@ func (cv *Conversations) ensure(id string) error {

// ChannelInfo is called for each channel that is retrieved.
func (cv *Conversations) ChannelInfo(ctx context.Context, ci *slack.Channel, threadTS string) error {
r, err := cv.recorder(chunk.FileID(ci.ID, threadTS, threadTS != ""))
r, err := cv.recorder(chunk.ToFileID(ci.ID, threadTS, threadTS != ""))
if err != nil {
return err
}
return r.ChannelInfo(ctx, ci, threadTS)
}

func (cv *Conversations) recorder(id string) (*baseproc, error) {
func (cv *Conversations) recorder(id chunk.FileID) (*baseproc, error) {
cv.mu.RLock()
r, ok := cv.cw[id]
cv.mu.RUnlock()
Expand All @@ -140,31 +140,31 @@ func (cv *Conversations) recorder(id string) (*baseproc, error) {

// refcount returns the number of references that are expected to be
// processed for the given channel.
func (cv *Conversations) refcount(channelID string) int {
func (cv *Conversations) refcount(id chunk.FileID) int {
cv.mu.RLock()
defer cv.mu.RUnlock()
if _, ok := cv.cw[channelID]; !ok {
if _, ok := cv.cw[id]; !ok {
return 0
}
return cv.cw[channelID].refcnt
return cv.cw[id].refcnt
}

func (cv *Conversations) incRefN(channelID string, n int) {
func (cv *Conversations) incRefN(id chunk.FileID, n int) {
cv.mu.Lock()
defer cv.mu.Unlock()
if _, ok := cv.cw[channelID]; !ok {
if _, ok := cv.cw[id]; !ok {
return
}
cv.cw[channelID].refcnt += n
cv.cw[id].refcnt += n
}

func (cv *Conversations) decRef(channelID string) {
func (cv *Conversations) decRef(id chunk.FileID) {
cv.mu.Lock()
defer cv.mu.Unlock()
if _, ok := cv.cw[channelID]; !ok {
if _, ok := cv.cw[id]; !ok {
return
}
cv.cw[channelID].refcnt--
cv.cw[id].refcnt--
}

// Messages is called for each message that is retrieved.
Expand All @@ -174,22 +174,24 @@ func (cv *Conversations) Messages(ctx context.Context, channelID string, numThre

lg := logger.FromContext(ctx)
lg.Debugf("processor: channelID=%s, numThreads=%d, isLast=%t, len(mm)=%d", channelID, numThreads, isLast, len(mm))
r, err := cv.recorder(channelID)

id := chunk.ToFileID(channelID, "", false)
r, err := cv.recorder(id)
if err != nil {
return err
}
if numThreads > 0 {
cv.incRefN(channelID, numThreads) // one for each thread
cv.incRefN(id, numThreads) // one for each thread
trace.Logf(ctx, "ref", "added %d", numThreads)
lg.Debugf("processor: increased ref count for %q to %d", channelID, cv.refcount(channelID))
lg.Debugf("processor: increased ref count for %q to %d", channelID, cv.refcount(id))
}
if err := r.Messages(ctx, channelID, numThreads, isLast, mm); err != nil {
return err
}
if isLast {
trace.Log(ctx, "isLast", "true, decrease ref count")
cv.decRef(channelID)
return cv.finalise(ctx, channelID)
cv.decRef(id)
return cv.finalise(ctx, id)
}
return nil
}
Expand All @@ -201,7 +203,8 @@ func (cv *Conversations) Files(ctx context.Context, channel *slack.Channel, pare
return err
}
if cv.recordFiles {
r, err := cv.recorder(channel.ID)
id := chunk.ToFileID(channel.ID, parent.ThreadTimestamp, false) // we don't do files for threads in export
r, err := cv.recorder(id)
if err != nil {
return err
}
Expand All @@ -219,7 +222,7 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p
defer task.End()
lg := logger.FromContext(ctx)

id := chunk.FileID(channelID, parent.ThreadTimestamp, threadOnly)
id := chunk.ToFileID(channelID, parent.ThreadTimestamp, threadOnly)
r, err := cv.recorder(id)
if err != nil {
return err
Expand All @@ -240,7 +243,7 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p
}

// finalise closes the channel file if there are no more threads to process.
func (cv *Conversations) finalise(ctx context.Context, id string) error {
func (cv *Conversations) finalise(ctx context.Context, id chunk.FileID) error {
lg := logger.FromContext(ctx)
if tc := cv.refcount(id); tc > 0 {
trace.Logf(ctx, "ref", "not finalising %q because ref count = %d", id, tc)
Expand Down
5 changes: 3 additions & 2 deletions internal/chunk/chunktest/dirserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http
s.mu.Lock()
p, ok := s.ptrs[channel]
s.mu.Unlock()
id := chunk.ToFileID(channel, "", false)
if !ok {
cf, err := s.cd.Open(channel)
cf, err := s.cd.Open(id)
if err != nil {
if os.IsNotExist(err) {
http.NotFound(w, r)
Expand All @@ -85,7 +86,7 @@ func (s *DirServer) chunkWrapper(fn func(p *chunk.Player) http.HandlerFunc) http
})
}

func (s *DirServer) chunkfileWrapper(name string, fn func(p *chunk.Player) http.HandlerFunc) http.Handler {
func (s *DirServer) chunkfileWrapper(name chunk.FileID, fn func(p *chunk.Player) http.HandlerFunc) http.Handler {
rs, err := s.cd.Open(name)
if err != nil {
panic(err)
Expand Down
45 changes: 26 additions & 19 deletions internal/chunk/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ const ext = ".json.gz"

// common filenames
const (
FChannels = "channels"
FUsers = "users"
FWorkspace = "workspace"
FChannels FileID = "channels"
FUsers FileID = "users"
FWorkspace FileID = "workspace"
)

// Directory is an abstraction over the directory with chunk files. It
Expand Down Expand Up @@ -56,20 +56,27 @@ func CreateDir(dir string) (*Directory, error) {
return &Directory{dir: dir}, nil
}

// FileID returns the file ID for the given channel and thread timestamp.
// FileID is the ID of the file within the directory (it's basically the
// file name without extension). If includeThread is true and threadTS is
// not empty, the thread timestamp will be appended to the channel ID.
// Otherwise, only the channel ID will be returned.
func FileID(channelID, threadTS string, includeThread bool) string {
// FileID is the ID of the file within the directory (it's basically the file
// name without an extension).
type FileID string

// chanThreadSep is the separator between channel name and a thread name in
// the file ID.
const chanThreadSep = "-"

// ToFileID returns the file ID for the given channel and thread timestamp.
// If includeThread is true and threadTS is not empty, the thread timestamp
// will be appended to the channel ID. Otherwise, only the channel ID will be
// returned.
func ToFileID(channelID, threadTS string, includeThread bool) FileID {
if includeThread && threadTS != "" {
return channelID + "-" + threadTS
return FileID(channelID + chanThreadSep + threadTS)
}
return channelID
return FileID(channelID)
}

func SplitFileID(fileID string) (channelID, threadTS string) {
channelID, threadTS, _ = strings.Cut(fileID, "-")
func (id FileID) Split() (channelID, threadTS string) {
channelID, threadTS, _ = strings.Cut(string(id), chanThreadSep)
return
}

Expand Down Expand Up @@ -190,8 +197,8 @@ func (d *Directory) Users() ([]slack.User, error) {

// Open opens a chunk file with the given name. Extension is appended
// automatically.
func (d *Directory) Open(fileID string) (*File, error) {
f, err := d.OpenRAW(d.filename(fileID))
func (d *Directory) Open(id FileID) (*File, error) {
f, err := d.OpenRAW(d.filename(id))
if err != nil {
return nil, err
}
Expand All @@ -206,8 +213,8 @@ func (d *Directory) OpenRAW(filename string) (io.ReadSeekCloser, error) {
}

// filename returns the full path of the chunk file with the given fileID.
func (d *Directory) filename(fileID string) string {
return filepath.Join(d.dir, fileID+ext)
func (d *Directory) filename(id FileID) string {
return filepath.Join(d.dir, string(id)+ext)
}

// Create creates the chunk file with the given name. Extension is appended
Expand All @@ -220,7 +227,7 @@ func (d *Directory) filename(fileID string) string {
//
// It will NOT overwrite an existing file and will return an error if the file
// exists.
func (d *Directory) Create(fileID string) (io.WriteCloser, error) {
func (d *Directory) Create(fileID FileID) (io.WriteCloser, error) {
filename := d.filename(fileID)
if fi, err := os.Stat(filename); err == nil {
if fi.IsDir() {
Expand Down Expand Up @@ -254,7 +261,7 @@ func (c *closewrapper) Close() error {
// to find the workspace.json.gz file, if not found, it tries to get the info
// from users.json.gz and channels.json.gz.
func (d *Directory) WorkspaceInfo() (*slack.AuthTestResponse, error) {
for _, name := range []string{FWorkspace, FUsers, FChannels} {
for _, name := range []FileID{FWorkspace, FUsers, FChannels} {
f, err := d.Open(name)
if err != nil {
continue
Expand Down
27 changes: 14 additions & 13 deletions internal/chunk/transform/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type Export struct {

start chan struct{}
done chan struct{}
err chan error // error channel used to propagate errors to the main thread.
ids chan string // channel used to pass channel IDs to the worker.
err chan error // error channel used to propagate errors to the main thread.
ids chan chunk.FileID // channel used to pass channel IDs to the worker.
}

// bufferSz is the default size of the channel IDs buffer. This is the number
Expand All @@ -73,7 +73,7 @@ func WithBufferSize(n int) ExpOption {
if n < 1 {
n = bufferSz
}
t.ids = make(chan string, n)
t.ids = make(chan chunk.FileID, n)
}
}

Expand Down Expand Up @@ -108,7 +108,7 @@ func NewExport(ctx context.Context, fsa fsadapter.FS, chunkdir string, tfopt ...
lg: lg,
start: make(chan struct{}),
done: make(chan struct{}),
ids: make(chan string, bufferSz),
ids: make(chan chunk.FileID, bufferSz),
err: make(chan error, 1),
}
for _, opt := range tfopt {
Expand Down Expand Up @@ -180,17 +180,17 @@ func (t *Export) Start(ctx context.Context) error {
// even if the processor is not started, in which case the channel ID will
// be queued for processing once the processor is started. If the export
// worker is closed, it will return ErrClosed.
func (t *Export) Transform(ctx context.Context, id string) error {
if t.closed.Load() {
return ErrClosed
}
t.lg.Debugln("transform: placing channel in the queue", id)
func (t *Export) Transform(ctx context.Context, id chunk.FileID) error {
select {
case err := <-t.err:
return err
default:
t.ids <- id
}
if t.closed.Load() {
return ErrClosed
}
t.lg.Debugln("transform: placing channel in the queue", id)
t.ids <- id
return nil
}

Expand All @@ -202,7 +202,7 @@ func (t *Export) worker(ctx context.Context) {
t.lg.Debugln("transform: worker started")
for id := range t.ids {
t.lg.Debugf("transform: transforming channel %s", id)
if err := transform(ctx, t.fsa, t.srcdir, id, t.users, t.msgUpdFn); err != nil {
if err := transform(ctx, t.fsa, t.srcdir, chunk.FileID(id), t.users, t.msgUpdFn); err != nil {
t.lg.Debugf("transform: error transforming channel %s: %s", id, err)
t.err <- err
continue
Expand Down Expand Up @@ -255,7 +255,7 @@ func (t *Export) Close() error {
// into the relevant directory. It expects the chunk file to be in the
// srcdir/id.json.gz file, and the attachments to be in the srcdir/id
// directory.
func transform(ctx context.Context, fsa fsadapter.FS, cd *chunk.Directory, id string, users []slack.User, msgFn []msgUpdFunc) error {
func transform(ctx context.Context, fsa fsadapter.FS, cd *chunk.Directory, id chunk.FileID, users []slack.User, msgFn []msgUpdFunc) error {
ctx, task := trace.NewTask(ctx, "transform")
defer task.End()

Expand All @@ -270,7 +270,8 @@ func transform(ctx context.Context, fsa fsadapter.FS, cd *chunk.Directory, id st
}
defer cf.Close()

ci, err := cf.ChannelInfo(id)
channelID, _ := id.Split()
ci, err := cf.ChannelInfo(channelID)
if err != nil {
return fmt.Errorf("error reading channel info for %q: %w", id, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/chunk/transform/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test_transform(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := transform(tt.args.ctx, tt.args.fsa, cd, tt.args.id, nil, nil); (err != nil) != tt.wantErr {
if err := transform(tt.args.ctx, tt.args.fsa, cd, chunk.FileID(tt.args.id), nil, nil); (err != nil) != tt.wantErr {
t.Errorf("transform() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
Loading

0 comments on commit 257b6b8

Please sign in to comment.