From 49396bac85d78adf79f2c684d2c1d667b70af01e Mon Sep 17 00:00:00 2001 From: Lauris BH Date: Fri, 21 Jan 2022 15:43:52 +0200 Subject: [PATCH 1/7] Automatically pause queue if issue index service is unavailable --- modules/indexer/issues/bleve.go | 9 +++ modules/indexer/issues/db.go | 9 +++ modules/indexer/issues/elastic_search.go | 82 ++++++++++++++++++++---- modules/indexer/issues/indexer.go | 38 ++++++++++- 4 files changed, 125 insertions(+), 13 deletions(-) diff --git a/modules/indexer/issues/bleve.go b/modules/indexer/issues/bleve.go index d986a0e55ef9c..0c23181fd74c0 100644 --- a/modules/indexer/issues/bleve.go +++ b/modules/indexer/issues/bleve.go @@ -186,6 +186,15 @@ func (b *BleveIndexer) Init() (bool, error) { return false, err } +// SetAvailabilityChangeCallback does nothing +func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping does nothing +func (b *BleveIndexer) Ping() bool { + return true +} + // Close will close the bleve indexer func (b *BleveIndexer) Close() { if b.indexer != nil { diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index f02cbddce8fb5..acc6443ca7031 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -14,6 +14,15 @@ func (db *DBIndexer) Init() (bool, error) { return false, nil } +// SetAvailabilityChangeCallback dummy function +func (db *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping checks if database is available +func (db *DBIndexer) Ping() bool { + return models.Ping() != nil +} + // Index dummy function func (db *DBIndexer) Index(issue []*IndexerData) error { return nil diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go index 187b69b74946b..9dae15a2b0500 100644 --- a/modules/indexer/issues/elastic_search.go +++ b/modules/indexer/issues/elastic_search.go @@ -20,8 +20,11 @@ var _ Indexer = &ElasticSearchIndexer{} // ElasticSearchIndexer implements Indexer interface type ElasticSearchIndexer struct { - client *elastic.Client - indexerName string + client *elastic.Client + indexerName string + available bool + availabilityCallback func(bool) + stopTimer chan struct{} } type elasticLogger struct { @@ -56,10 +59,27 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, er return nil, err } - return &ElasticSearchIndexer{ + indexer := &ElasticSearchIndexer{ client: client, indexerName: indexerName, - }, nil + available: true, + stopTimer: make(chan struct{}), + } + + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + indexer.checkAvailability() + case <-indexer.stopTimer: + ticker.Stop() + return + } + } + }() + + return indexer, nil } const ( @@ -96,7 +116,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) { ctx := context.Background() exists, err := b.client.IndexExists(b.indexerName).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !exists { @@ -104,7 +124,7 @@ func (b *ElasticSearchIndexer) Init() (bool, error) { createIndex, err := b.client.CreateIndex(b.indexerName).BodyString(mapping).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !createIndex.Acknowledged { return false, errors.New("init failed") @@ -115,6 +135,16 @@ func (b *ElasticSearchIndexer) Init() (bool, error) { return true, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + b.availabilityCallback = callback +} + +// Ping checks if elastic is available +func (b *ElasticSearchIndexer) Ping() bool { + return b.available +} + // Index will save the index data func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { if len(issues) == 0 { @@ -132,7 +162,7 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { "comments": issue.Comments, }). Do(context.Background()) - return err + return b.checkError(err) } reqs := make([]elastic.BulkableRequest, 0) @@ -155,7 +185,7 @@ func (b *ElasticSearchIndexer) Index(issues []*IndexerData) error { Index(b.indexerName). Add(reqs...). Do(context.Background()) - return err + return b.checkError(err) } // Delete deletes indexes by ids @@ -167,7 +197,7 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error { Index(b.indexerName). Id(fmt.Sprintf("%d", ids[0])). Do(context.Background()) - return err + return b.checkError(err) } reqs := make([]elastic.BulkableRequest, 0) @@ -183,7 +213,7 @@ func (b *ElasticSearchIndexer) Delete(ids ...int64) error { Index(b.indexerName). Add(reqs...). Do(context.Background()) - return err + return b.checkError(err) } // Search searches for issues by given conditions. @@ -207,7 +237,7 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st From(start).Size(limit). Do(context.Background()) if err != nil { - return nil, err + return nil, b.checkError(err) } hits := make([]Match, 0, limit) @@ -225,4 +255,32 @@ func (b *ElasticSearchIndexer) Search(keyword string, repoIDs []int64, limit, st } // Close implements indexer -func (b *ElasticSearchIndexer) Close() {} +func (b *ElasticSearchIndexer) Close() { + close(b.stopTimer) +} + +func (b *ElasticSearchIndexer) checkError(err error) error { + if elastic.IsConnErr(err) && b.available { + b.available = false + if b.availabilityCallback != nil { + b.availabilityCallback(b.available) + } + } + return err +} + +func (b *ElasticSearchIndexer) checkAvailability() { + if b.available { + return + } + + // Request cluster state to check if elastic is available again + _, err := b.client.ClusterState().Do(context.Background()) + if err != nil { + return + } + b.available = true + if b.availabilityCallback != nil { + b.availabilityCallback(b.available) + } +} diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 729981ec71d2a..dbd7b02755436 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -47,6 +47,8 @@ type SearchResult struct { // Indexer defines an interface to indexer issues contents type Indexer interface { Init() (bool, error) + Ping() bool + SetAvailabilityChangeCallback(callback func(bool)) Index(issue []*IndexerData) error Delete(ids ...int64) error Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) @@ -111,6 +113,7 @@ func InitIssueIndexer(syncReindex bool) { } iData := make([]*IndexerData, 0, len(data)) + unhandled := make([]queue.Data, 0, len(data)) for _, datum := range data { indexerData, ok := datum.(*IndexerData) if !ok { @@ -119,13 +122,34 @@ func InitIssueIndexer(syncReindex bool) { } log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete) if indexerData.IsDelete { - _ = indexer.Delete(indexerData.IDs...) + if err := indexer.Delete(indexerData.IDs...); err != nil { + log.Error("Error whilst deleting from index: %v Error: %v", indexerData.IDs, err) + if indexer.Ping() { + continue + } + // Add back to queue + unhandled = append(unhandled, datum) + } continue } iData = append(iData, indexerData) } + if len(unhandled) > 0 { + for _, indexerData := range iData { + unhandled = append(unhandled, indexerData) + } + return unhandled + } if err := indexer.Index(iData); err != nil { log.Error("Error whilst indexing: %v Error: %v", iData, err) + if indexer.Ping() { + return nil + } + // Add back to queue + for _, indexerData := range iData { + unhandled = append(unhandled, indexerData) + } + return unhandled } return nil } @@ -193,6 +217,18 @@ func InitIssueIndexer(syncReindex bool) { log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType) } + if queue, ok := issueIndexerQueue.(queue.Pausable); ok { + holder.get().SetAvailabilityChangeCallback(func(available bool) { + if !available { + log.Info("Issue index queue paused") + queue.Pause() + } else { + log.Info("Issue index queue resumed") + queue.Resume() + } + }) + } + // Start processing the queue go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run) From 3acdb133b9441e3d01d1c6f981360dd2f60d9e0b Mon Sep 17 00:00:00 2001 From: Lauris BH Date: Fri, 21 Jan 2022 17:56:42 +0200 Subject: [PATCH 2/7] Handle keyword search error when issue indexer service is not available --- models/db/engine.go | 1 + modules/indexer/issues/db.go | 21 ++++++++++++--------- modules/indexer/issues/elastic_search.go | 4 +++- modules/indexer/issues/indexer.go | 11 +++++++++++ options/locale/locale_en-US.ini | 1 + routers/web/repo/issue.go | 7 +++++-- templates/shared/issuelist.tmpl | 5 +++++ 7 files changed, 38 insertions(+), 12 deletions(-) diff --git a/models/db/engine.go b/models/db/engine.go index 0604d939d3797..9f38af3c67060 100755 --- a/models/db/engine.go +++ b/models/db/engine.go @@ -65,6 +65,7 @@ type Engine interface { Query(...interface{}) ([]map[string][]byte, error) Cols(...string) *xorm.Session Context(ctx context.Context) *xorm.Session + Ping() error } // TableInfo returns table's information via an object diff --git a/modules/indexer/issues/db.go b/modules/indexer/issues/db.go index acc6443ca7031..b1bf8795f5db5 100644 --- a/modules/indexer/issues/db.go +++ b/modules/indexer/issues/db.go @@ -4,41 +4,44 @@ package issues -import "code.gitea.io/gitea/models" +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/models/db" +) // DBIndexer implements Indexer interface to use database's like search type DBIndexer struct{} // Init dummy function -func (db *DBIndexer) Init() (bool, error) { +func (i *DBIndexer) Init() (bool, error) { return false, nil } // SetAvailabilityChangeCallback dummy function -func (db *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +func (i *DBIndexer) SetAvailabilityChangeCallback(callback func(bool)) { } // Ping checks if database is available -func (db *DBIndexer) Ping() bool { - return models.Ping() != nil +func (i *DBIndexer) Ping() bool { + return db.GetEngine(db.DefaultContext).Ping() != nil } // Index dummy function -func (db *DBIndexer) Index(issue []*IndexerData) error { +func (i *DBIndexer) Index(issue []*IndexerData) error { return nil } // Delete dummy function -func (db *DBIndexer) Delete(ids ...int64) error { +func (i *DBIndexer) Delete(ids ...int64) error { return nil } // Close dummy function -func (db *DBIndexer) Close() { +func (i *DBIndexer) Close() { } // Search dummy function -func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { +func (i *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) { total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start) if err != nil { return nil, err diff --git a/modules/indexer/issues/elastic_search.go b/modules/indexer/issues/elastic_search.go index 9dae15a2b0500..6ad061802b80a 100644 --- a/modules/indexer/issues/elastic_search.go +++ b/modules/indexer/issues/elastic_search.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "net" "strconv" "time" @@ -260,7 +261,8 @@ func (b *ElasticSearchIndexer) Close() { } func (b *ElasticSearchIndexer) checkError(err error) error { - if elastic.IsConnErr(err) && b.available { + var opErr *net.OpError + if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { b.available = false if b.availabilityCallback != nil { b.availabilityCallback(b.available) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index dbd7b02755436..e9b0b9e5a26d4 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -387,3 +387,14 @@ func SearchIssuesByKeyword(repoIDs []int64, keyword string) ([]int64, error) { } return issueIDs, nil } + +// IsAvailable checks if issue indexer is available +func IsAvailable() bool { + indexer := holder.get() + if indexer == nil { + log.Error("IsAvailable(): unable to get indexer!") + return false + } + + return indexer.Ping() +} diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index 1f778d329bcde..d5b1768f6920c 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1262,6 +1262,7 @@ issues.filter_sort.moststars = Most stars issues.filter_sort.feweststars = Fewest stars issues.filter_sort.mostforks = Most forks issues.filter_sort.fewestforks = Fewest forks +issues.keyword_search_unavailable = Currently searhing by keyword is not available. Please contact your site administrator. issues.action_open = Open issues.action_close = Close issues.action_label = Label diff --git a/routers/web/repo/issue.go b/routers/web/repo/issue.go index 4f2716763acd3..df1b9876c079a 100644 --- a/routers/web/repo/issue.go +++ b/routers/web/repo/issue.go @@ -163,8 +163,11 @@ func issues(ctx *context.Context, milestoneID, projectID int64, isPullOption uti if len(keyword) > 0 { issueIDs, err = issue_indexer.SearchIssuesByKeyword([]int64{repo.ID}, keyword) if err != nil { - ctx.ServerError("issueIndexer.Search", err) - return + if issue_indexer.IsAvailable() { + ctx.ServerError("issueIndexer.Search", err) + return + } + ctx.Data["IssueIndexerUnavailable"] = true } if len(issueIDs) == 0 { forceEmpty = true diff --git a/templates/shared/issuelist.tmpl b/templates/shared/issuelist.tmpl index 3050107123cb6..687bbdee9476d 100644 --- a/templates/shared/issuelist.tmpl +++ b/templates/shared/issuelist.tmpl @@ -139,5 +139,10 @@ {{end}} + {{if .IssueIndexerUnavailable}} +
+

{{$.i18n.Tr "repo.issues.keyword_search_unavailable"}}

+
+ {{end}} {{template "base/paginate" .}} From 3b28eefc0fdd8ce11c348380c7e52bf3499ed239 Mon Sep 17 00:00:00 2001 From: Lauris BH Date: Mon, 24 Jan 2022 16:59:01 +0200 Subject: [PATCH 3/7] Implement automatic disabling and resume of code indexer queue --- .../doc/developers/hacking-on-gitea.en-us.md | 21 +++++ modules/context/repo.go | 4 + modules/indexer/code/bleve.go | 9 ++ modules/indexer/code/elastic_search.go | 92 +++++++++++++++---- modules/indexer/code/indexer.go | 34 ++++++- modules/indexer/code/wrapped.go | 21 +++++ options/locale/locale_en-US.ini | 3 + routers/web/explore/code.go | 18 +++- routers/web/repo/search.go | 9 +- templates/explore/code.tmpl | 14 ++- templates/repo/home.tmpl | 9 +- templates/repo/search.tmpl | 88 ++++++++++-------- 12 files changed, 250 insertions(+), 72 deletions(-) diff --git a/docs/content/doc/developers/hacking-on-gitea.en-us.md b/docs/content/doc/developers/hacking-on-gitea.en-us.md index 5481a2f1fef47..6e9a98e1264cb 100644 --- a/docs/content/doc/developers/hacking-on-gitea.en-us.md +++ b/docs/content/doc/developers/hacking-on-gitea.en-us.md @@ -187,6 +187,27 @@ make lint-frontend Note: When working on frontend code, set `USE_SERVICE_WORKER` to `false` in `app.ini` to prevent undesirable caching of frontend assets. +### Configuring local ElasticSearch instance + +Start local ElasticSearch instance using docker: + +```sh +mkdir -p $(pwd)/data/elasticsearch +sudo chown -R 1000:1000 $(pwd)/data/elasticsearch +docker run --rm -p 127.0.0.1:9200:9200 -p 127.0.0.1:9300:9300 -e "discovery.type=single-node" -v "$(pwd)/data/elasticsearch:/usr/share/elasticsearch/data" docker.elastic.co/elasticsearch/elasticsearch:7.16.3 +``` + +Configure `app.ini`: + +```ini +[indexer] +ISSUE_INDEXER_TYPE = elasticsearch +ISSUE_INDEXER_CONN_STR = http://elastic:changeme@localhost:9200 +REPO_INDEXER_ENABLED = true +REPO_INDEXER_TYPE = elasticsearch +REPO_INDEXER_CONN_STR = http://elastic:changeme@localhost:9200 +``` + ### Building and adding SVGs SVG icons are built using the `make svg` target which compiles the icon sources defined in `build/generate-svg.js` into the output directory `public/img/svg`. Custom icons can be added in the `web_src/svg` directory. diff --git a/modules/context/repo.go b/modules/context/repo.go index 97b417ffd165a..76fe1c5676a51 100644 --- a/modules/context/repo.go +++ b/modules/context/repo.go @@ -21,6 +21,7 @@ import ( user_model "code.gitea.io/gitea/models/user" "code.gitea.io/gitea/modules/cache" "code.gitea.io/gitea/modules/git" + code_indexer "code.gitea.io/gitea/modules/indexer/code" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/markup/markdown" "code.gitea.io/gitea/modules/setting" @@ -522,6 +523,9 @@ func RepoAssignment(ctx *Context) (cancel context.CancelFunc) { ctx.Data["ExposeAnonSSH"] = setting.SSH.ExposeAnonymous ctx.Data["DisableHTTP"] = setting.Repository.DisableHTTPGit ctx.Data["RepoSearchEnabled"] = setting.Indexer.RepoIndexerEnabled + if setting.Indexer.RepoIndexerEnabled { + ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable() + } ctx.Data["CloneLink"] = repo.CloneLink() ctx.Data["WikiCloneLink"] = repo.WikiCloneLink() diff --git a/modules/indexer/code/bleve.go b/modules/indexer/code/bleve.go index cfadcfebd8a2f..14419e77a0b3e 100644 --- a/modules/indexer/code/bleve.go +++ b/modules/indexer/code/bleve.go @@ -271,6 +271,15 @@ func (b *BleveIndexer) Close() { log.Info("PID: %d Repository Indexer closed", os.Getpid()) } +// SetAvailabilityChangeCallback does nothing +func (b *BleveIndexer) SetAvailabilityChangeCallback(callback func(bool)) { +} + +// Ping does nothing +func (b *BleveIndexer) Ping() bool { + return true +} + // Index indexes the data func (b *BleveIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize) diff --git a/modules/indexer/code/elastic_search.go b/modules/indexer/code/elastic_search.go index 9bd2fa301e875..c1d4c29b8a52f 100644 --- a/modules/indexer/code/elastic_search.go +++ b/modules/indexer/code/elastic_search.go @@ -7,8 +7,10 @@ package code import ( "bufio" "context" + "errors" "fmt" "io" + "net" "strconv" "strings" "time" @@ -39,8 +41,11 @@ var _ Indexer = &ElasticSearchIndexer{} // ElasticSearchIndexer implements Indexer interface type ElasticSearchIndexer struct { - client *elastic.Client - indexerAliasName string + client *elastic.Client + indexerAliasName string + available bool + availabilityCallback func(bool) + stopTimer chan struct{} } type elasticLogger struct { @@ -78,7 +83,23 @@ func NewElasticSearchIndexer(url, indexerName string) (*ElasticSearchIndexer, bo indexer := &ElasticSearchIndexer{ client: client, indexerAliasName: indexerName, + available: true, + stopTimer: make(chan struct{}), } + + ticker := time.NewTicker(10 * time.Second) + go func() { + for { + select { + case <-ticker.C: + indexer.checkAvailability() + case <-indexer.stopTimer: + ticker.Stop() + return + } + } + }() + exists, err := indexer.init() if err != nil { indexer.Close() @@ -126,14 +147,14 @@ func (b *ElasticSearchIndexer) init() (bool, error) { ctx := context.Background() exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !exists { mapping := defaultMapping createIndex, err := b.client.CreateIndex(b.realIndexerName()).BodyString(mapping).Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !createIndex.Acknowledged { return false, fmt.Errorf("create index %s with %s failed", b.realIndexerName(), mapping) @@ -143,7 +164,7 @@ func (b *ElasticSearchIndexer) init() (bool, error) { // check version r, err := b.client.Aliases().Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } realIndexerNames := r.IndicesByAlias(b.indexerAliasName) @@ -152,10 +173,10 @@ func (b *ElasticSearchIndexer) init() (bool, error) { Add(b.realIndexerName(), b.indexerAliasName). Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !res.Acknowledged { - return false, fmt.Errorf("") + return false, fmt.Errorf("create alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) } } else if len(realIndexerNames) >= 1 && realIndexerNames[0] < b.realIndexerName() { log.Warn("Found older gitea indexer named %s, but we will create a new one %s and keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", @@ -165,16 +186,26 @@ func (b *ElasticSearchIndexer) init() (bool, error) { Add(b.realIndexerName(), b.indexerAliasName). Do(ctx) if err != nil { - return false, err + return false, b.checkError(err) } if !res.Acknowledged { - return false, fmt.Errorf("") + return false, fmt.Errorf("change alias %s to index %s failed", b.indexerAliasName, b.realIndexerName()) } } return exists, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + b.availabilityCallback = callback +} + +// Ping checks if elastic is available +func (b *ElasticSearchIndexer) Ping() bool { + return b.available +} + func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.WriteCloserError, batchReader *bufio.Reader, sha string, update fileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { // Ignore vendored files in code search if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { @@ -190,7 +221,7 @@ func (b *ElasticSearchIndexer) addUpdate(ctx context.Context, batchWriter git.Wr return nil, err } if size, err = strconv.ParseInt(strings.TrimSpace(stdout), 10, 64); err != nil { - return nil, fmt.Errorf("Misformatted git cat-file output: %v", err) + return nil, fmt.Errorf("misformatted git cat-file output: %v", err) } } @@ -275,7 +306,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos Index(b.indexerAliasName). Add(reqs...). Do(context.Background()) - return err + return b.checkError(err) } return nil } @@ -285,7 +316,7 @@ func (b *ElasticSearchIndexer) Delete(repoID int64) error { _, err := b.client.DeleteByQuery(b.indexerAliasName). Query(elastic.NewTermsQuery("repo_id", repoID)). Do(context.Background()) - return err + return b.checkError(err) } // indexPos find words positions for start and the following end on content. It will @@ -409,7 +440,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, From(start).Size(pageSize). Do(context.Background()) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } return convertResult(searchResult, kw, pageSize) @@ -423,7 +454,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, Size(0). // We only needs stats information Do(context.Background()) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } query = query.Must(langQuery) @@ -440,7 +471,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, From(start).Size(pageSize). Do(context.Background()) if err != nil { - return 0, nil, nil, err + return 0, nil, nil, b.checkError(err) } total, hits, _, err := convertResult(searchResult, kw, pageSize) @@ -449,4 +480,33 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, } // Close implements indexer -func (b *ElasticSearchIndexer) Close() {} +func (b *ElasticSearchIndexer) Close() { + close(b.stopTimer) +} + +func (b *ElasticSearchIndexer) checkError(err error) error { + var opErr *net.OpError + if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) { + b.available = false + if b.availabilityCallback != nil { + b.availabilityCallback(b.available) + } + } + return err +} + +func (b *ElasticSearchIndexer) checkAvailability() { + if b.available { + return + } + + // Request cluster state to check if elastic is available again + _, err := b.client.ClusterState().Do(context.Background()) + if err != nil { + return + } + b.available = true + if b.availabilityCallback != nil { + b.availabilityCallback(b.available) + } +} diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index 9ae3abff60bab..f35afcc479004 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -42,6 +42,8 @@ type SearchResultLanguages struct { // Indexer defines an interface to index and search code contents type Indexer interface { + Ping() bool + SetAvailabilityChangeCallback(callback func(bool)) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error Delete(repoID int64) error Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) @@ -140,6 +142,7 @@ func Init() { return data } + unhandled := make([]queue.Data, 0, len(data)) for _, datum := range data { indexerData, ok := datum.(*IndexerData) if !ok { @@ -150,10 +153,14 @@ func Init() { if err := index(ctx, indexer, indexerData.RepoID); err != nil { log.Error("index: %v", err) - continue + if indexer.Ping() { + continue + } + // Add back to queue + unhandled = append(unhandled, datum) } } - return nil + return unhandled } indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{}) @@ -212,6 +219,18 @@ func Init() { indexer.set(rIndexer) + if queue, ok := indexerQueue.(queue.Pausable); ok { + rIndexer.SetAvailabilityChangeCallback(func(available bool) { + if !available { + log.Info("Code index queue paused") + queue.Pause() + } else { + log.Info("Code index queue resumed") + queue.Resume() + } + }) + } + // Start processing the queue go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run) @@ -262,6 +281,17 @@ func UpdateRepoIndexer(repo *repo_model.Repository) { } } +// IsAvailable checks if issue indexer is available +func IsAvailable() bool { + idx, err := indexer.get() + if err != nil { + log.Error("IsAvailable(): unable to get indexer: %v", err) + return false + } + + return idx.Ping() +} + // populateRepoIndexer populate the repo indexer with pre-existing data. This // should only be run when the indexer is created for the first time. func populateRepoIndexer(ctx context.Context) { diff --git a/modules/indexer/code/wrapped.go b/modules/indexer/code/wrapped.go index 56baadd6fc6f6..86407bbad7221 100644 --- a/modules/indexer/code/wrapped.go +++ b/modules/indexer/code/wrapped.go @@ -10,6 +10,7 @@ import ( "sync" repo_model "code.gitea.io/gitea/models/repo" + "code.gitea.io/gitea/modules/log" ) var indexer = newWrappedIndexer() @@ -56,6 +57,26 @@ func (w *wrappedIndexer) get() (Indexer, error) { return w.internal, nil } +// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes +func (w *wrappedIndexer) SetAvailabilityChangeCallback(callback func(bool)) { + indexer, err := w.get() + if err != nil { + log.Error("Failed to get indexer: %v", err) + return + } + indexer.SetAvailabilityChangeCallback(callback) +} + +// Ping checks if elastic is available +func (w *wrappedIndexer) Ping() bool { + indexer, err := w.get() + if err != nil { + log.Warn("Failed to get indexer: %v", err) + return false + } + return indexer.Ping() +} + func (w *wrappedIndexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error { indexer, err := w.get() if err != nil { diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index d5b1768f6920c..4923d8b7b72bb 100644 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -268,6 +268,7 @@ search = Search code = Code search.fuzzy = Fuzzy search.match = Match +code_search_unavailable = Currently code search is not available. Please contact your site administrator. repo_no_results = No matching repositories found. user_no_results = No matching users found. org_no_results = No matching organizations found. @@ -1708,6 +1709,8 @@ search.search_repo = Search repository search.fuzzy = Fuzzy search.match = Match search.results = Search results for "%s" in %s +search.code_no_results = No source code matching your search term found. +search.code_search_unavailable = Currently code search is not available. Please contact your site administrator. settings = Settings settings.desc = Settings is where you can manage the settings for the repository diff --git a/routers/web/explore/code.go b/routers/web/explore/code.go index d2acefde92793..ed1915d337ee7 100644 --- a/routers/web/explore/code.go +++ b/routers/web/explore/code.go @@ -89,15 +89,25 @@ func Code(ctx *context.Context) { total, searchResults, searchResultLanguages, err = code_indexer.PerformSearch(repoIDs, language, keyword, page, setting.UI.RepoSearchPagingNum, isMatch) if err != nil { - ctx.ServerError("SearchResults", err) - return + if code_indexer.IsAvailable() { + ctx.ServerError("SearchResults", err) + return + } + ctx.Data["CodeIndexerUnavailable"] = true + } else { + ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable() } // if non-login user or isAdmin, no need to check UnitTypeCode } else if (ctx.User == nil && len(repoIDs) > 0) || isAdmin { total, searchResults, searchResultLanguages, err = code_indexer.PerformSearch(repoIDs, language, keyword, page, setting.UI.RepoSearchPagingNum, isMatch) if err != nil { - ctx.ServerError("SearchResults", err) - return + if code_indexer.IsAvailable() { + ctx.ServerError("SearchResults", err) + return + } + ctx.Data["CodeIndexerUnavailable"] = true + } else { + ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable() } loadRepoIDs := make([]int64, 0, len(searchResults)) diff --git a/routers/web/repo/search.go b/routers/web/repo/search.go index 67539c3d7eb8a..04bb5b844a4b4 100644 --- a/routers/web/repo/search.go +++ b/routers/web/repo/search.go @@ -33,8 +33,13 @@ func Search(ctx *context.Context) { total, searchResults, searchResultLanguages, err := code_indexer.PerformSearch([]int64{ctx.Repo.Repository.ID}, language, keyword, page, setting.UI.RepoSearchPagingNum, isMatch) if err != nil { - ctx.ServerError("SearchResults", err) - return + if code_indexer.IsAvailable() { + ctx.ServerError("SearchResults", err) + return + } + ctx.Data["CodeIndexerUnavailable"] = true + } else { + ctx.Data["CodeIndexerUnavailable"] = !code_indexer.IsAvailable() } ctx.Data["Keyword"] = keyword ctx.Data["Language"] = language diff --git a/templates/explore/code.tmpl b/templates/explore/code.tmpl index 573c096f83f0e..e1056ab3129d7 100644 --- a/templates/explore/code.tmpl +++ b/templates/explore/code.tmpl @@ -5,21 +5,25 @@
- -
- {{if .SearchResults}} + {{if .CodeIndexerUnavailable }} +
+

{{$.i18n.Tr "explore.code_search_unavailable"}}

+
+ {{else if .SearchResults}}

{{.i18n.Tr "explore.code_search_results" (.Keyword|Escape) | Str2html }}

diff --git a/templates/repo/home.tmpl b/templates/repo/home.tmpl index 9924ae547dcf0..6d525c24da46f 100644 --- a/templates/repo/home.tmpl +++ b/templates/repo/home.tmpl @@ -13,9 +13,12 @@