Skip to content

Commit

Permalink
Change UpdateRepoIndex api to include watchers (go-gitea#7012)
Browse files Browse the repository at this point in the history
* Change UpdateRepoIndex api to include watchers

* Add timeout
  • Loading branch information
zeripath authored and jeffliu27 committed Jul 18, 2019
1 parent 036c9de commit 4a5d8d3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 26 deletions.
23 changes: 7 additions & 16 deletions integrations/repo_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package integrations

import (
"log"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -34,22 +33,14 @@ func TestSearchRepo(t *testing.T) {
repo, err := models.GetRepositoryByOwnerAndName("user2", "repo1")
assert.NoError(t, err)

models.UpdateRepoIndexer(repo)
waiter := make(chan error, 1)
models.UpdateRepoIndexer(repo, waiter)

log.Printf("Waiting for indexing\n")

i := 0
for i < 60 {
if repo.IndexerStatus != nil && len(repo.IndexerStatus.CommitSha) != 0 {
break
}
time.Sleep(1 * time.Second)
i++
}
if i < 60 {
log.Printf("Indexing took: %ds\n", i)
} else {
log.Printf("Waited the limit: %ds for indexing, continuing\n", i)
select {
case err := <-waiter:
assert.NoError(t, err)
case <-time.After(1 * time.Minute):
assert.Fail(t, "UpdateRepoIndexer took too long")
}

req := NewRequestf(t, "GET", "/user2/repo1/search?q=Description&page=1")
Expand Down
70 changes: 60 additions & 10 deletions modules/indexer/codes/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type Indexer interface {
Search(repoIDs []int64, keyword string, page, pageSize int) (*SearchResult, error)
}

type repoIndexerOperation struct {
repo *Repository
deleted bool
watchers []chan<- error
}

var (
// codesIndexerQueue queue of issue ids to be updated
codesIndexerQueue Queue
Expand Down Expand Up @@ -302,18 +308,62 @@ func nonGenesisChanges(repo *models.Repository, revision string) (*repoChanges,
return &changes, err
}

// // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
// func DeleteRepoFromIndexer(repo *models.Repository) {
// codesIndexerQueue.Push(&IndexerData{
// RepoID: repo.ID,
// IsDelete: true,
// })
// }

// // UpdateRepoIndexer update a repository's entries in the indexer
// func UpdateRepoIndexer(repo *models.Repository) {
// codesIndexerQueue.Push(&IndexerData{
// RepoID: repo.ID,
// IsDelete: false,
// })
// }

// Currently not being used?
func processRepoIndexerOperationQueue() {
for {
op := <-repoIndexerOperationQueue
var err error
if op.deleted {
if err = indexer.DeleteRepoFromIndexer(op.repo.ID); err != nil {
log.Error("DeleteRepoFromIndexer: %v", err)
}
} else {
if err = updateRepoIndexer(op.repo); err != nil {
log.Error("updateRepoIndexer: %v", err)
}
}
for _, watcher := range op.watchers {
watcher <- err
}
}
}

func addOperationToQueue(op repoIndexerOperation) {
if !setting.Indexer.RepoIndexerEnabled {
return
}
select {
case repoIndexerOperationQueue <- op:
break
default:
go func() {
repoIndexerOperationQueue <- op
}()
}
}

// DeleteRepoFromIndexer remove all of a repository's entries from the indexer
func DeleteRepoFromIndexer(repo *models.Repository) {
codesIndexerQueue.Push(&IndexerData{
RepoID: repo.ID,
IsDelete: true,
})
func DeleteRepoFromIndexer(repo *Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repo: repo, deleted: true, watchers: watchers})
}

// UpdateRepoIndexer update a repository's entries in the indexer
func UpdateRepoIndexer(repo *models.Repository) {
codesIndexerQueue.Push(&IndexerData{
RepoID: repo.ID,
IsDelete: false,
})
func UpdateRepoIndexer(repo *Repository, watchers ...chan<- error) {
addOperationToQueue(repoIndexerOperation{repo: repo, deleted: false, watchers: watchers})
}

0 comments on commit 4a5d8d3

Please sign in to comment.