Skip to content

Commit

Permalink
dirprocessor tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Mar 30, 2024
1 parent f70bb00 commit 8bd47f2
Show file tree
Hide file tree
Showing 7 changed files with 1,226 additions and 39 deletions.
45 changes: 38 additions & 7 deletions internal/chunk/dirproc/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dirproc
import (
"context"
"errors"
"io"
"runtime/trace"

"github.com/rusq/slack"
Expand All @@ -11,6 +12,8 @@ import (
"github.com/rusq/slackdump/v3/processor"
)

//go:generate mockgen -source=conversations.go -destination=dirproc_mock_test.go -package=dirproc

// Transformer is an interface that is called when the processor is finished
// processing a channel or thread.
type Transformer interface {
Expand All @@ -25,9 +28,8 @@ type Transformer interface {
// Conversations is a processor that writes the channel and thread messages.
// Zero value is unusable. Use [NewConversation] to create a new instance.
type Conversations struct {
dir *chunk.Directory
t *filetracker
lg logger.Interface
t tracker
lg logger.Interface

// subproc is the files subprocessor, it is called by the Files method
// in addition to recording the files in the chunk file (if recordFiles is
Expand All @@ -42,6 +44,31 @@ type Conversations struct {
tf Transformer
}

// tracker is an interface for a recorder of data.

type tracker interface {
Recorder(id chunk.FileID) (datahandler, error)
RefCount(id chunk.FileID) int
Unregister(id chunk.FileID) error
CloseAll() error
}

// datahandler is an interface for the data processor
type datahandler interface {
processor.ChannelInformer
processor.Messenger
processor.Filer
counter
io.Closer
}

type counter interface {
Inc() int
Dec() int
Add(int) int
N() int
}

// ConvOption is a function that configures the Conversations processor.
type ConvOption func(*Conversations)

Expand All @@ -59,21 +86,25 @@ func WithRecordFiles(b bool) ConvOption {
}
}

var (
errNilSubproc = errors.New("internal error: files subprocessor is nil")
errNilTransformer = errors.New("internal error: transformer is nil")
)

// NewConversation returns the new conversation processor. filesSubproc will
// be called for each file chunk, tf will be called for each completed channel
// or thread, when the reference count becomes zero.
// Reference count is increased with each call to Channel processing functions.
func NewConversation(cd *chunk.Directory, filesSubproc processor.Filer, tf Transformer, opts ...ConvOption) (*Conversations, error) {
// validation
if filesSubproc == nil {
return nil, errors.New("internal error: files subprocessor is nil")
return nil, errNilSubproc
} else if tf == nil {
return nil, errors.New("internal error: transformer is nil")
return nil, errNilTransformer
}

c := &Conversations{
dir: cd,
t: newTracker(cd),
t: newFileTracker(cd),
lg: logger.Default,
subproc: filesSubproc,
tf: tf,
Expand Down
Loading

0 comments on commit 8bd47f2

Please sign in to comment.