Skip to content

Commit

Permalink
split spaghetti and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 30, 2024
1 parent c827555 commit 1eb2a19
Show file tree
Hide file tree
Showing 5 changed files with 468 additions and 69 deletions.
23 changes: 9 additions & 14 deletions internal/chunk/dirproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewConversation(cd *chunk.Directory, filesSubproc processor.Filer, tf Trans

// ChannelInfo is called for each channel that is retrieved.
func (cv *Conversations) ChannelInfo(ctx context.Context, ci *slack.Channel, threadTS string) error {
r, err := cv.t.recorder(chunk.ToFileID(ci.ID, threadTS, threadTS != ""))
r, err := cv.t.Recorder(chunk.ToFileID(ci.ID, threadTS, threadTS != ""))
if err != nil {
return err
}
Expand All @@ -106,11 +106,11 @@ func (cv *Conversations) Messages(ctx context.Context, channelID string, numThre
cv.debugtrace(ctx, "%s: Messages: numThreads=%d, isLast=%t, len(mm)=%d", channelID, numThreads, isLast, len(mm))

id := chunk.ToFileID(channelID, "", false)
r, err := cv.t.recorder(id)
r, err := cv.t.Recorder(id)
if err != nil {
return err
}
n := r.AddN(numThreads)
n := r.Add(numThreads)
cv.debugtrace(ctx, "%s: Messages: increased by %d to %d", channelID, numThreads, n)

if err := r.Messages(ctx, channelID, numThreads, isLast, mm); err != nil {
Expand All @@ -134,7 +134,7 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p
cv.debugtrace(ctx, "%s: ThreadMessages: parent=%s, isLast=%t, len(tm)=%d", channelID, parent.ThreadTimestamp, isLast, len(tm))

id := chunk.ToFileID(channelID, parent.ThreadTimestamp, threadOnly)
r, err := cv.t.recorder(id)
r, err := cv.t.Recorder(id)
if err != nil {
return err
}
Expand All @@ -152,18 +152,13 @@ func (cv *Conversations) ThreadMessages(ctx context.Context, channelID string, p
// finalise closes the channel file if there are no more threads to process.
func (cv *Conversations) finalise(ctx context.Context, id chunk.FileID) error {
if tc := cv.t.RefCount(id); tc > 0 {
cv.debugtrace(ctx, "%s finalise: not finalising, ref count = %d", id, tc)
cv.debugtrace(ctx, "%s: finalise: not finalising, ref count = %d", id, tc)
return nil
}
cv.debugtrace(ctx, "%s finalise: ref count = 0, finalising...", id)
r, err := cv.t.recorder(id)
if err != nil {
return err
}
if err := r.Close(); err != nil {
cv.debugtrace(ctx, "%s: finalise: ref count = 0, finalising...", id)
if err := cv.t.Unregister(id); err != nil {
return err
}
cv.t.destroy(id)
if cv.tf != nil {
return cv.tf.Transform(ctx, id)
}
Expand All @@ -180,7 +175,7 @@ func (cv *Conversations) Files(ctx context.Context, channel *slack.Channel, pare
return nil
}
id := chunk.ToFileID(channel.ID, parent.ThreadTimestamp, false) // we don't do files for threads in export
r, err := cv.t.recorder(id)
r, err := cv.t.Recorder(id)
if err != nil {
return err
}
Expand All @@ -191,7 +186,7 @@ func (cv *Conversations) Files(ctx context.Context, channel *slack.Channel, pare
}

func (cv *Conversations) ChannelUsers(ctx context.Context, channelID string, threadTS string, cu []string) error {
r, err := cv.t.recorder(chunk.ToFileID(channelID, threadTS, threadTS != ""))
r, err := cv.t.Recorder(chunk.ToFileID(channelID, threadTS, threadTS != ""))
if err != nil {
return err
}
Expand Down
99 changes: 44 additions & 55 deletions internal/chunk/dirproc/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,36 @@ import (
"sync"

"github.com/rusq/slackdump/v3/internal/chunk"
"github.com/rusq/slackdump/v3/internal/primitive"
)

// tracker keeps track of the files and their processors.
type tracker struct {
dir *chunk.Directory

mu sync.Mutex // guards map operations
mu sync.RWMutex // guards map operations
files map[chunk.FileID]*entityproc // files holds open files along with their processors
}

// entityproc is a processor for a single entity, which can be a thread or
// a channel.
type entityproc struct {
*baseproc
// Counter holds the number threads expected to be processed for the given
// channel. We keep track of the number of threads, to ensure that we
// don't close the file until all threads are processed. The channel file
// can be closed when the Counter is zero.
primitive.Counter
}

func newTracker(cd *chunk.Directory) *tracker {
return &tracker{
dir: cd,
files: make(map[chunk.FileID]*entityproc),
}
}

// ensure ensures that the channel file is open and the recorder is
// create ensures that the channel file is open and the recorder is
// initialized.
func (t *tracker) create(id chunk.FileID) error {
if _, ok := t.files[id]; ok {
Expand All @@ -32,21 +45,39 @@ func (t *tracker) create(id chunk.FileID) error {
if err != nil {
return err
}
t.files[id] = &entityproc{
ep := &entityproc{
baseproc: bp,
refs: 1, // one for the channel
}
ep.Inc() // one for the initial call
t.files[id] = ep
return nil
}

func (t *tracker) destroy(id chunk.FileID) {
// Unregister closes and removes the file from tracking (file remains on the file
// system).
func (t *tracker) Unregister(id chunk.FileID) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.unregister(id)
}

// unregister is an internal function that closes and removes the file from
// tracking without locking the mutex.
func (t *tracker) unregister(id chunk.FileID) error {
r, ok := t.files[id]
if !ok {
return nil
}
if err := r.Close(); err != nil {
return err
}
delete(t.files, id)
return nil
}

func (t *tracker) recorder(id chunk.FileID) (*entityproc, error) {
// Recorder returns the processor for the given file. If the processor
// doesn't exist, it is created.
func (t *tracker) Recorder(id chunk.FileID) (*entityproc, error) {
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -60,69 +91,27 @@ func (t *tracker) recorder(id chunk.FileID) (*entityproc, error) {
return t.files[id], nil
}

// CloseAll closes all open files.
func (t *tracker) CloseAll() error {
t.mu.Lock()
defer t.mu.Unlock()

for id, f := range t.files {
if err := f.Close(); err != nil {
return fmt.Errorf("error closing %s: %w", id, err)
for id := range t.files {
if err := t.unregister(id); err != nil {
return fmt.Errorf("error closing file %s: %w", id, err)
}
delete(t.files, id)
}
return nil

}

// RefCount returns the reference count for the given file.
func (t *tracker) RefCount(id chunk.FileID) int {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.RLock()
defer t.mu.RUnlock()

if f, ok := t.files[id]; ok {
return f.RefCount()
return f.N()
}
return 0
}

// entityproc is a processor for a single entity, which can be a thread or
// a channel.
type entityproc struct {
*baseproc
// refs is the number of refs are expected to be processed for
// the given channel. We keep track of the number of refs, to ensure
// that we don't close the file until all refs are processed.
// The channel file can be closed when the number of refs is zero.
refs int
mu sync.Mutex // guards refcnt
}

func (ep *entityproc) AddN(n int) int {
ep.mu.Lock()
defer ep.mu.Unlock()

ep.refs += n
return ep.refs
}

func (ep *entityproc) Add() int {
return ep.AddN(1)
}

func (ep *entityproc) DecN(n int) int {
ep.mu.Lock()
defer ep.mu.Unlock()

ep.refs -= n
return ep.refs
}

func (ep *entityproc) Dec() int {
return ep.DecN(1)
}

func (ep *entityproc) RefCount() int {
ep.mu.Lock()
defer ep.mu.Unlock()
return ep.refs
}
Loading

0 comments on commit 1eb2a19

Please sign in to comment.