Skip to content

Commit

Permalink
Fix locking and correct context usage
Browse files Browse the repository at this point in the history
  • Loading branch information
lafriks committed Jan 25, 2022
1 parent 09ed40d commit f6f7beb
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 68 deletions.
6 changes: 3 additions & 3 deletions models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ func GetRepoIssueStats(repoID, uid int64, filterMode int, isPull bool) (numOpen,
}

// SearchIssueIDsByKeyword search issues on database
func SearchIssueIDsByKeyword(kw string, repoIDs []int64, limit, start int) (int64, []int64, error) {
func SearchIssueIDsByKeyword(ctx context.Context, kw string, repoIDs []int64, limit, start int) (int64, []int64, error) {
repoCond := builder.In("repo_id", repoIDs)
subQuery := builder.Select("id").From("issue").Where(repoCond)
kw = strings.ToUpper(kw)
Expand All @@ -1884,7 +1884,7 @@ func SearchIssueIDsByKeyword(kw string, repoIDs []int64, limit, start int) (int6
ID int64
UpdatedUnix int64
}, 0, limit)
err := db.GetEngine(db.DefaultContext).Distinct("id", "updated_unix").Table("issue").Where(cond).
err := db.GetEngine(ctx).Distinct("id", "updated_unix").Table("issue").Where(cond).
OrderBy("`updated_unix` DESC").Limit(limit, start).
Find(&res)
if err != nil {
Expand All @@ -1894,7 +1894,7 @@ func SearchIssueIDsByKeyword(kw string, repoIDs []int64, limit, start int) (int6
ids = append(ids, r.ID)
}

total, err := db.GetEngine(db.DefaultContext).Distinct("id").Table("issue").Where(cond).Count()
total, err := db.GetEngine(ctx).Distinct("id").Table("issue").Where(cond).Count()
if err != nil {
return 0, nil, err
}
Expand Down
11 changes: 6 additions & 5 deletions models/issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package models

import (
"context"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -303,23 +304,23 @@ func TestIssue_loadTotalTimes(t *testing.T) {

func TestIssue_SearchIssueIDsByKeyword(t *testing.T) {
assert.NoError(t, unittest.PrepareTestDatabase())
total, ids, err := SearchIssueIDsByKeyword("issue2", []int64{1}, 10, 0)
total, ids, err := SearchIssueIDsByKeyword(context.TODO(), "issue2", []int64{1}, 10, 0)
assert.NoError(t, err)
assert.EqualValues(t, 1, total)
assert.EqualValues(t, []int64{2}, ids)

total, ids, err = SearchIssueIDsByKeyword("first", []int64{1}, 10, 0)
total, ids, err = SearchIssueIDsByKeyword(context.TODO(), "first", []int64{1}, 10, 0)
assert.NoError(t, err)
assert.EqualValues(t, 1, total)
assert.EqualValues(t, []int64{1}, ids)

total, ids, err = SearchIssueIDsByKeyword("for", []int64{1}, 10, 0)
total, ids, err = SearchIssueIDsByKeyword(context.TODO(), "for", []int64{1}, 10, 0)
assert.NoError(t, err)
assert.EqualValues(t, 5, total)
assert.ElementsMatch(t, []int64{1, 2, 3, 5, 11}, ids)

// issue1's comment id 2
total, ids, err = SearchIssueIDsByKeyword("good", []int64{1}, 10, 0)
total, ids, err = SearchIssueIDsByKeyword(context.TODO(), "good", []int64{1}, 10, 0)
assert.NoError(t, err)
assert.EqualValues(t, 1, total)
assert.EqualValues(t, []int64{1}, ids)
Expand Down Expand Up @@ -464,7 +465,7 @@ func TestCorrectIssueStats(t *testing.T) {
wg.Wait()

// Now we will get all issueID's that match the "Bugs are nasty" query.
total, ids, err := SearchIssueIDsByKeyword("Bugs are nasty", []int64{1}, issueAmount, 0)
total, ids, err := SearchIssueIDsByKeyword(context.TODO(), "Bugs are nasty", []int64{1}, issueAmount, 0)

// Just to be sure.
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions modules/indexer/code/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {

// Search searches for files in the specified repo.
// Returns the matching file-paths
func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
func (b *BleveIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
var (
indexerQuery query.Query
keywordQuery query.Query
Expand Down Expand Up @@ -381,7 +381,7 @@ func (b *BleveIndexer) Search(repoIDs []int64, language, keyword string, page, p
searchRequest.AddFacet("languages", bleve.NewFacetRequest("Language", 10))
}

result, err := b.indexer.Search(searchRequest)
result, err := b.indexer.SearchInContext(ctx, searchRequest)
if err != nil {
return 0, nil, nil, err
}
Expand Down
51 changes: 36 additions & 15 deletions modules/indexer/code/elastic_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"

repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/analyze"
"code.gitea.io/gitea/modules/charset"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
Expand Down Expand Up @@ -46,6 +48,7 @@ type ElasticSearchIndexer struct {
available bool
availabilityCallback func(bool)
stopTimer chan struct{}
lock sync.RWMutex
}

type elasticLogger struct {
Expand Down Expand Up @@ -144,7 +147,7 @@ func (b *ElasticSearchIndexer) realIndexerName() string {

// Init will initialize the indexer
func (b *ElasticSearchIndexer) init() (bool, error) {
ctx := context.Background()
ctx := graceful.GetManager().HammerContext()
exists, err := b.client.IndexExists(b.realIndexerName()).Do(ctx)
if err != nil {
return false, b.checkError(err)
Expand Down Expand Up @@ -198,11 +201,15 @@ func (b *ElasticSearchIndexer) init() (bool, error) {

// SetAvailabilityChangeCallback sets callback that will be triggered when availability changes
func (b *ElasticSearchIndexer) SetAvailabilityChangeCallback(callback func(bool)) {
b.lock.Lock()
defer b.lock.Unlock()
b.availabilityCallback = callback
}

// Ping checks if elastic is available
func (b *ElasticSearchIndexer) Ping() bool {
b.lock.RLock()
defer b.lock.RUnlock()
return b.available
}

Expand Down Expand Up @@ -305,7 +312,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
_, err := b.client.Bulk().
Index(b.indexerAliasName).
Add(reqs...).
Do(context.Background())
Do(ctx)
return b.checkError(err)
}
return nil
Expand All @@ -315,7 +322,7 @@ func (b *ElasticSearchIndexer) Index(ctx context.Context, repo *repo_model.Repos
func (b *ElasticSearchIndexer) Delete(repoID int64) error {
_, err := b.client.DeleteByQuery(b.indexerAliasName).
Query(elastic.NewTermsQuery("repo_id", repoID)).
Do(context.Background())
Do(graceful.GetManager().HammerContext())
return b.checkError(err)
}

Expand Down Expand Up @@ -397,7 +404,7 @@ func extractAggs(searchResult *elastic.SearchResult) []*SearchResultLanguages {
}

// Search searches for codes and language stats by given conditions.
func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
func (b *ElasticSearchIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
searchType := esMultiMatchTypeBestFields
if isMatch {
searchType = esMultiMatchTypePhrasePrefix
Expand Down Expand Up @@ -438,7 +445,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
).
Sort("repo_id", true).
From(start).Size(pageSize).
Do(context.Background())
Do(ctx)
if err != nil {
return 0, nil, nil, b.checkError(err)
}
Expand All @@ -452,7 +459,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
Aggregation("language", aggregation).
Query(query).
Size(0). // We only needs stats information
Do(context.Background())
Do(ctx)
if err != nil {
return 0, nil, nil, b.checkError(err)
}
Expand All @@ -469,7 +476,7 @@ func (b *ElasticSearchIndexer) Search(repoIDs []int64, language, keyword string,
).
Sort("repo_id", true).
From(start).Size(pageSize).
Do(context.Background())
Do(ctx)
if err != nil {
return 0, nil, nil, b.checkError(err)
}
Expand All @@ -486,27 +493,41 @@ func (b *ElasticSearchIndexer) Close() {

func (b *ElasticSearchIndexer) checkError(err error) error {
var opErr *net.OpError
if b.available && (elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
b.available = false
if b.availabilityCallback != nil {
b.availabilityCallback(b.available)
}
if !(elastic.IsConnErr(err) || (errors.As(err, &opErr) && (opErr.Op == "dial" || opErr.Op == "read"))) {
return err
}

b.setAvailability(false)

return err
}

func (b *ElasticSearchIndexer) checkAvailability() {
if b.available {
if b.Ping() {
return
}

// Request cluster state to check if elastic is available again
_, err := b.client.ClusterState().Do(context.Background())
_, err := b.client.ClusterState().Do(graceful.GetManager().ShutdownContext())
if err != nil {
b.setAvailability(false)
return
}

b.setAvailability(true)
}

func (b *ElasticSearchIndexer) setAvailability(available bool) {
b.lock.Lock()
defer b.lock.Unlock()

if b.available == available {
return
}
b.available = true

b.available = available
if b.availabilityCallback != nil {
// Call the callback from within the lock to ensure that the ordering remains correct
b.availabilityCallback(b.available)
}
}
2 changes: 1 addition & 1 deletion modules/indexer/code/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Indexer interface {
SetAvailabilityChangeCallback(callback func(bool))
Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *repoChanges) error
Delete(repoID int64) error
Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error)
Close()
}

Expand Down
3 changes: 2 additions & 1 deletion modules/indexer/code/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package code

import (
"context"
"path/filepath"
"testing"

Expand Down Expand Up @@ -65,7 +66,7 @@ func testIndexer(name string, t *testing.T, indexer Indexer) {

for _, kw := range keywords {
t.Run(kw.Keyword, func(t *testing.T) {
total, res, langs, err := indexer.Search(kw.RepoIDs, "", kw.Keyword, 1, 10, false)
total, res, langs, err := indexer.Search(context.TODO(), kw.RepoIDs, "", kw.Keyword, 1, 10, false)
assert.NoError(t, err)
assert.EqualValues(t, len(kw.IDs), total)
assert.Len(t, langs, kw.Langs)
Expand Down
5 changes: 3 additions & 2 deletions modules/indexer/code/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package code

import (
"bytes"
"context"
"strings"

"code.gitea.io/gitea/modules/highlight"
Expand Down Expand Up @@ -106,12 +107,12 @@ func searchResult(result *SearchResult, startIndex, endIndex int) (*Result, erro
}

// PerformSearch perform a search on a repository
func PerformSearch(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) {
func PerformSearch(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int, []*Result, []*SearchResultLanguages, error) {
if len(keyword) == 0 {
return 0, nil, nil, nil
}

total, results, resultLanguages, err := indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch)
total, results, resultLanguages, err := indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch)
if err != nil {
return 0, nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions modules/indexer/code/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (w *wrappedIndexer) Delete(repoID int64) error {
return indexer.Delete(repoID)
}

func (w *wrappedIndexer) Search(repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
func (w *wrappedIndexer) Search(ctx context.Context, repoIDs []int64, language, keyword string, page, pageSize int, isMatch bool) (int64, []*SearchResult, []*SearchResultLanguages, error) {
indexer, err := w.get()
if err != nil {
return 0, nil, nil, err
}
return indexer.Search(repoIDs, language, keyword, page, pageSize, isMatch)
return indexer.Search(ctx, repoIDs, language, keyword, page, pageSize, isMatch)
}

func (w *wrappedIndexer) Close() {
Expand Down
5 changes: 3 additions & 2 deletions modules/indexer/issues/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package issues

import (
"context"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (b *BleveIndexer) Delete(ids ...int64) error {

// Search searches for issues by given conditions.
// Returns the matching issue IDs
func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
func (b *BleveIndexer) Search(ctx context.Context, keyword string, repoIDs []int64, limit, start int) (*SearchResult, error) {
var repoQueriesP []*query.NumericRangeQuery
for _, repoID := range repoIDs {
repoQueriesP = append(repoQueriesP, numericEqualityQuery(repoID, "RepoID"))
Expand All @@ -258,7 +259,7 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int)
search := bleve.NewSearchRequestOptions(indexerQuery, limit, start, false)
search.SortBy([]string{"-_score"})

result, err := b.indexer.Search(search)
result, err := b.indexer.SearchInContext(ctx, search)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion modules/indexer/issues/bleve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package issues

import (
"context"
"os"
"testing"

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestBleveIndexAndSearch(t *testing.T) {
}

for _, kw := range keywords {
res, err := indexer.Search(kw.Keyword, []int64{2}, 10, 0)
res, err := indexer.Search(context.TODO(), kw.Keyword, []int64{2}, 10, 0)
assert.NoError(t, err)

ids := make([]int64, 0, len(res.Hits))
Expand Down
6 changes: 4 additions & 2 deletions modules/indexer/issues/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package issues

import (
"context"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/models/db"
)
Expand Down Expand Up @@ -41,8 +43,8 @@ func (i *DBIndexer) Close() {
}

// Search dummy function
func (i *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
func (i *DBIndexer) Search(ctx context.Context, kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
total, ids, err := models.SearchIssueIDsByKeyword(ctx, kw, repoIDs, limit, start)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f6f7beb

Please sign in to comment.