From 48f50a2cebfc3d9469338ec7a18aa8854d6b50f2 Mon Sep 17 00:00:00 2001 From: Rammiah Date: Thu, 22 Aug 2024 00:44:55 +0800 Subject: [PATCH] fix(search): BuildIndex concurrency error (#7035) --- internal/errs/search.go | 3 ++- internal/op/fs.go | 4 +-- internal/search/build.go | 52 ++++++++++++++++++++++++++++++--------- internal/search/search.go | 2 +- pkg/mq/mq.go | 2 ++ server/handles/index.go | 14 +++++++---- 6 files changed, 55 insertions(+), 22 deletions(-) diff --git a/internal/errs/search.go b/internal/errs/search.go index 9c864f4d241..3da92898e65 100644 --- a/internal/errs/search.go +++ b/internal/errs/search.go @@ -3,5 +3,6 @@ package errs import "fmt" var ( - SearchNotAvailable = fmt.Errorf("search not available") + SearchNotAvailable = fmt.Errorf("search not available") + BuildIndexIsRunning = fmt.Errorf("build index is running, please try later") ) diff --git a/internal/op/fs.go b/internal/op/fs.go index ed28039529f..e0153952638 100644 --- a/internal/op/fs.go +++ b/internal/op/fs.go @@ -136,9 +136,7 @@ func List(ctx context.Context, storage driver.Driver, path string, args model.Li model.WrapObjsName(files) // call hooks go func(reqPath string, files []model.Obj) { - for _, hook := range objsUpdateHooks { - hook(reqPath, files) - } + HandleObjsUpdateHook(reqPath, files) }(utils.GetFullPath(storage.GetStorage().MountPath, path), files) // sort objs diff --git a/internal/search/build.go b/internal/search/build.go index 1d3bfb7cd5d..9865b2988a1 100644 --- a/internal/search/build.go +++ b/internal/search/build.go @@ -5,10 +5,12 @@ import ( "path" "path/filepath" "strings" + "sync" "sync/atomic" "time" "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/fs" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/op" @@ -21,10 +23,13 @@ import ( ) var ( - Running = atomic.Bool{} - Quit chan struct{} + Quit = atomic.Pointer[chan struct{}]{} ) +func Running() bool { + return Quit.Load() != nil +} + func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth int, count bool) error { var ( err error @@ -33,11 +38,27 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth ) log.Infof("build index for: %+v", indexPaths) log.Infof("ignore paths: %+v", ignorePaths) - Running.Store(true) - Quit = make(chan struct{}, 1) - indexMQ := mq.NewInMemoryMQ[ObjWithParent]() + quit := make(chan struct{}, 1) + if !Quit.CompareAndSwap(nil, &quit) { + // other goroutine is running + return errs.BuildIndexIsRunning + } + var ( + indexMQ = mq.NewInMemoryMQ[ObjWithParent]() + running = atomic.Bool{} // current goroutine running + wg = &sync.WaitGroup{} + ) + running.Store(true) + wg.Add(1) go func() { ticker := time.NewTicker(time.Second) + defer func() { + Quit.Store(nil) + wg.Done() + // notify walk to exit when StopIndex api called + running.Store(false) + ticker.Stop() + }() tickCount := 0 for { select { @@ -70,9 +91,8 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth } }) - case <-Quit: - Running.Store(false) - ticker.Stop() + case <-quit: + log.Debugf("build index for %+v received quit", indexPaths) eMsg := "" now := time.Now() originErr := err @@ -100,14 +120,22 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth }) } }) + log.Debugf("build index for %+v quit success", indexPaths) return } } }() defer func() { - if Running.Load() { - Quit <- struct{}{} + if !running.Load() || Quit.Load() != &quit { + log.Debugf("build index for %+v stopped by StopIndex", indexPaths) + return + } + select { + // avoid goroutine leak + case quit <- struct{}{}: + default: } + wg.Wait() }() admin, err := op.GetAdmin() if err != nil { @@ -121,7 +149,7 @@ func BuildIndex(ctx context.Context, indexPaths, ignorePaths []string, maxDepth } for _, indexPath := range indexPaths { walkFn := func(indexPath string, info model.Obj) error { - if !Running.Load() { + if !running.Load() { return filepath.SkipDir } for _, avoidPath := range ignorePaths { @@ -167,7 +195,7 @@ func Config(ctx context.Context) searcher.Config { } func Update(parent string, objs []model.Obj) { - if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running.Load() { + if instance == nil || !instance.Config().AutoUpdate || !setting.GetBool(conf.AutoUpdateIndex) || Running() { return } if isIgnorePath(parent) { diff --git a/internal/search/search.go b/internal/search/search.go index c1a23b85ca8..d420eb16dd3 100644 --- a/internal/search/search.go +++ b/internal/search/search.go @@ -27,7 +27,7 @@ func Init(mode string) error { } instance = nil } - if Running.Load() { + if Running() { return fmt.Errorf("index is running") } if mode == "none" { diff --git a/pkg/mq/mq.go b/pkg/mq/mq.go index 35f5a159de4..cdae0425130 100644 --- a/pkg/mq/mq.go +++ b/pkg/mq/mq.go @@ -57,5 +57,7 @@ func (mq *inMemoryMQ[T]) Clear() { } func (mq *inMemoryMQ[T]) Len() int { + mq.Lock() + defer mq.Unlock() return mq.queue.Len() } diff --git a/server/handles/index.go b/server/handles/index.go index 0fa1fa0e9bf..5610688da1f 100644 --- a/server/handles/index.go +++ b/server/handles/index.go @@ -19,7 +19,7 @@ type UpdateIndexReq struct { } func BuildIndex(c *gin.Context) { - if search.Running.Load() { + if search.Running() { common.ErrorStrResp(c, "index is running", 400) return } @@ -45,7 +45,7 @@ func UpdateIndex(c *gin.Context) { common.ErrorResp(c, err, 400) return } - if search.Running.Load() { + if search.Running() { common.ErrorStrResp(c, "index is running", 400) return } @@ -72,16 +72,20 @@ func UpdateIndex(c *gin.Context) { } func StopIndex(c *gin.Context) { - if !search.Running.Load() { + quit := search.Quit.Load() + if quit == nil { common.ErrorStrResp(c, "index is not running", 400) return } - search.Quit <- struct{}{} + select { + case *quit <- struct{}{}: + default: + } common.SuccessResp(c) } func ClearIndex(c *gin.Context) { - if search.Running.Load() { + if search.Running() { common.ErrorStrResp(c, "index is running", 400) return }