-
-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically pause queue if index service is unavailable #15066
Automatically pause queue if index service is unavailable #15066
Conversation
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs during the next 2 months. Thank you for your contributions. |
9aa2ac2
to
ac21844
Compare
62d56b0
to
c9b8da4
Compare
Ready for review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately there are a few races here. RCing to prevent merging.
Hi @lafriks Concurrency Fixing Patchdiff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go
index c1d4c29b8..a8b171194 100644
--- a/modules/indexer/code/elastic_search.go
+++ b/modules/indexer/code/elastic_search.go
@@ -13,12 +13,14 @@ import (
"net"
"strconv"
"strings"
+ "sync"
"time"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
@@ -46,6 +48,7 @@ type ElasticSearchIndexer struct {
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
+ lock sync.Mutex
}
type elasticLogger struct {
@@ -198,11 +201,15 @@ func (b *ElasticSearchIndexer) init() (bool, error) {
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
b.availabilityCallback = callback
}
// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
return b.available
}
@@ -486,27 +493,41 @@ func (b *ElasticSearchIndexer) Close() {
func (b *ElasticSearchIndexer) checkError(err error) error {
var opErr *net.OpError
- if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
- b.available = false
- if b.availabilityCallback != nil {
- b.availabilityCallback(b.available)
- }
+ if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+ return err
}
+
+ b.setAvailability(false)
+
return err
}
func (b *ElasticSearchIndexer) checkAvailability() {
- if b.available {
+ if b.Ping() {
return
}
// Request cluster state to check if elastic is available again
- _, err := b.client.ClusterState().Do(context.Background())
+ _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
if err != nil {
+ b.setAvailability(false)
+ return
+ }
+
+ b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.available == available {
return
}
- b.available = true
+
+ b.available = available
if b.availabilityCallback != nil {
+ // Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go
index 6ad061802..80f69d3b1 100644
--- a/modules/indexer/issues/elastic_search.go
+++ b/modules/indexer/issues/elastic_search.go
@@ -10,8 +10,10 @@ import (
"fmt"
"net"
"strconv"
+ "sync"
"time"
+ "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
@@ -26,6 +28,7 @@ type ElasticSearchIndexer struct {
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
+ lock sync.Mutex
}
type elasticLogger struct {
@@ -138,11 +141,15 @@ func (b *ElasticSearchIndexer) Init() (bool, error) {
// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
b.availabilityCallback = callback
}
// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
+ b.lock.Lock()
+ defer b.lock.Unlock()
return b.available
}
@@ -262,27 +269,41 @@ func (b *ElasticSearchIndexer) Close() {
func (b *ElasticSearchIndexer) checkError(err error) error {
var opErr *net.OpError
- if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
- b.available = false
- if b.availabilityCallback != nil {
- b.availabilityCallback(b.available)
- }
+ if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
+ return err
}
+
+ b.setAvailability(false)
+
return err
}
func (b *ElasticSearchIndexer) checkAvailability() {
- if b.available {
+ if b.Ping() {
return
}
// Request cluster state to check if elastic is available again
- _, err := b.client.ClusterState().Do(context.Background())
+ _, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
if err != nil {
+ b.setAvailability(false)
+ return
+ }
+
+ b.setAvailability(true)
+}
+
+func (b *ElasticSearchIndexer) setAvailability(available bool) {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+
+ if b.available == available {
return
}
- b.available = true
+
+ b.available = available
if b.availabilityCallback != nil {
+ // Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
|
f346c8f
to
f6f7beb
Compare
Thanks for patch, updated to use RWMutex as Ping could be called quite often and it needs only RLock |
🚀 |
* 'main' of https://github.com/go-gitea/gitea: Allow to filter repositories by language in explore, user and organization repositories lists (go-gitea#18430) Fix broken when no commits and default branch is not master (go-gitea#18422) [skip ci] Updated translations via Crowdin Automatically pause queue if index service is unavailable (go-gitea#15066)
…5066) * Handle keyword search error when issue indexer service is not available * Implement automatic disabling and resume of code indexer queue
Fixes that changes are not indexed and lost if external index services is not available
If indexer service is unavailable for search, display error:
If code search service is unavailable, display error:
Depends on #15928