Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor clean task #1805

Merged
merged 4 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/job/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ func (p *Progress) Increment() {
}
}

// AddProcessed increments the number of processed work units by the provided
// amount. This is used to calculate the percentage.
func (p *Progress) AddProcessed(v int) {
p.mutex.Lock()
defer p.mutex.Unlock()

newVal := v
if newVal > p.total {
newVal = p.total
}

p.processed = newVal
p.calculatePercent()
}

func (p *Progress) addTask(t *task) {
p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down
135 changes: 6 additions & 129 deletions pkg/manager/manager_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,136 +443,13 @@ func (s *singleton) AutoTag(ctx context.Context, input models.AutoTagMetadataInp
}

func (s *singleton) Clean(ctx context.Context, input models.CleanMetadataInput) int {
j := job.MakeJobExec(func(ctx context.Context, progress *job.Progress) {
var scenes []*models.Scene
var images []*models.Image
var galleries []*models.Gallery

if err := s.TxnManager.WithReadTxn(context.TODO(), func(r models.ReaderRepository) error {
qb := r.Scene()
iqb := r.Image()
gqb := r.Gallery()

logger.Infof("Starting cleaning of tracked files")
if input.DryRun {
logger.Infof("Running in Dry Mode")
}
var err error

scenes, err = qb.All()

if err != nil {
return errors.New("failed to fetch list of scenes for cleaning")
}

images, err = iqb.All()
if err != nil {
return errors.New("failed to fetch list of images for cleaning")
}

galleries, err = gqb.All()
if err != nil {
return errors.New("failed to fetch list of galleries for cleaning")
}

return nil
}); err != nil {
logger.Error(err.Error())
return
}

if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
}

var wg sync.WaitGroup
total := len(scenes) + len(images) + len(galleries)
progress.SetTotal(total)
fileNamingAlgo := config.GetInstance().GetVideoFileNamingAlgorithm()
for _, scene := range scenes {
progress.Increment()
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
}

if scene == nil {
logger.Errorf("nil scene, skipping Clean")
continue
}

wg.Add(1)

task := CleanTask{
ctx: ctx,
TxnManager: s.TxnManager,
Scene: scene,
fileNamingAlgorithm: fileNamingAlgo,
}
go progress.ExecuteTask(fmt.Sprintf("Assessing scene %s for clean", scene.Path), func() {
task.Start(&wg, input.DryRun)
})

wg.Wait()
}

for _, img := range images {
progress.Increment()
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
}

if img == nil {
logger.Errorf("nil image, skipping Clean")
continue
}

wg.Add(1)

task := CleanTask{
ctx: ctx,
TxnManager: s.TxnManager,
Image: img,
}
go progress.ExecuteTask(fmt.Sprintf("Assessing image %s for clean", img.Path), func() {
task.Start(&wg, input.DryRun)
})
wg.Wait()
}

for _, gallery := range galleries {
progress.Increment()
if job.IsCancelled(ctx) {
logger.Info("Stopping due to user request")
return
}

if gallery == nil {
logger.Errorf("nil gallery, skipping Clean")
continue
}

wg.Add(1)

task := CleanTask{
ctx: ctx,
TxnManager: s.TxnManager,
Gallery: gallery,
}
go progress.ExecuteTask(fmt.Sprintf("Assessing gallery %s for clean", gallery.GetTitle()), func() {
task.Start(&wg, input.DryRun)
})
wg.Wait()
}

logger.Info("Finished Cleaning")

s.scanSubs.notify()
})
j := cleanJob{
txnManager: s.TxnManager,
input: input,
scanSubs: s.scanSubs,
}

return s.JobManager.Add(ctx, "Cleaning...", j)
return s.JobManager.Add(ctx, "Cleaning...", &j)
}

func (s *singleton) MigrateHash(ctx context.Context) int {
Expand Down
14 changes: 3 additions & 11 deletions pkg/manager/task_autotag.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,22 +444,14 @@ func (t *autoTagFilesTask) getCount(r models.ReaderRepository) (int, error) {
return sceneCount + imageCount + galleryCount, nil
}

func (t *autoTagFilesTask) batchFindFilter(batchSize int) *models.FindFilterType {
page := 1
return &models.FindFilterType{
PerPage: &batchSize,
Page: &page,
}
}

func (t *autoTagFilesTask) processScenes(r models.ReaderRepository) error {
if job.IsCancelled(t.ctx) {
return nil
}

batchSize := 1000

findFilter := t.batchFindFilter(batchSize)
findFilter := models.BatchFindFilter(batchSize)
sceneFilter := t.makeSceneFilter()

more := true
Expand Down Expand Up @@ -507,7 +499,7 @@ func (t *autoTagFilesTask) processImages(r models.ReaderRepository) error {

batchSize := 1000

findFilter := t.batchFindFilter(batchSize)
findFilter := models.BatchFindFilter(batchSize)
imageFilter := t.makeImageFilter()

more := true
Expand Down Expand Up @@ -555,7 +547,7 @@ func (t *autoTagFilesTask) processGalleries(r models.ReaderRepository) error {

batchSize := 1000

findFilter := t.batchFindFilter(batchSize)
findFilter := models.BatchFindFilter(batchSize)
galleryFilter := t.makeGalleryFilter()

more := true
Expand Down
Loading