-
-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add redis task queue support and improve docs
- Loading branch information
Showing
11 changed files
with
157 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,3 +73,4 @@ prime/ | |
*.snap | ||
*.snap-build | ||
*_source.tar.bz2 | ||
.DS_Store |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// Copyright 2019 The Gitea Authors. All rights reserved. | ||
// Use of this source code is governed by a MIT-style | ||
// license that can be found in the LICENSE file. | ||
|
||
package task | ||
|
||
import ( | ||
"encoding/json" | ||
"errors" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"code.gitea.io/gitea/models" | ||
"code.gitea.io/gitea/modules/log" | ||
|
||
"github.com/go-redis/redis" | ||
) | ||
|
||
var ( | ||
_ Queue = &RedisQueue{} | ||
) | ||
|
||
type redisClient interface { | ||
RPush(key string, args ...interface{}) *redis.IntCmd | ||
LPop(key string) *redis.StringCmd | ||
Ping() *redis.StatusCmd | ||
} | ||
|
||
// RedisQueue redis queue | ||
type RedisQueue struct { | ||
client redisClient | ||
queueName string | ||
} | ||
|
||
func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) { | ||
fields := strings.Fields(connStr) | ||
for _, f := range fields { | ||
items := strings.SplitN(f, "=", 2) | ||
if len(items) < 2 { | ||
continue | ||
} | ||
switch strings.ToLower(items[0]) { | ||
case "addrs": | ||
addrs = items[1] | ||
case "password": | ||
password = items[1] | ||
case "db": | ||
dbIdx, err = strconv.Atoi(items[1]) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
} | ||
return | ||
} | ||
|
||
// NewRedisQueue creates single redis or cluster redis queue | ||
func NewRedisQueue(addrs string, password string, dbIdx int) (*RedisQueue, error) { | ||
dbs := strings.Split(addrs, ",") | ||
var queue = RedisQueue{ | ||
queueName: "task_queue", | ||
} | ||
if len(dbs) == 0 { | ||
return nil, errors.New("no redis host found") | ||
} else if len(dbs) == 1 { | ||
queue.client = redis.NewClient(&redis.Options{ | ||
Addr: strings.TrimSpace(dbs[0]), // use default Addr | ||
Password: password, // no password set | ||
DB: dbIdx, // use default DB | ||
}) | ||
} else { | ||
// cluster will ignore db | ||
queue.client = redis.NewClusterClient(&redis.ClusterOptions{ | ||
Addrs: dbs, | ||
Password: password, | ||
}) | ||
} | ||
if err := queue.client.Ping().Err(); err != nil { | ||
return nil, err | ||
} | ||
return &queue, nil | ||
} | ||
|
||
func (r *RedisQueue) Run() error { | ||
for { | ||
bs, err := r.client.LPop(r.queueName).Bytes() | ||
if err != nil { | ||
if err != redis.Nil { | ||
log.Error(4, "LPop failed: %v", err) | ||
} | ||
time.Sleep(time.Millisecond * 100) | ||
continue | ||
} | ||
|
||
var task models.Task | ||
err = json.Unmarshal(bs, &task) | ||
if err != nil { | ||
log.Error(4, "Unmarshal task failed: %s", err.Error()) | ||
} else { | ||
err = Run(&task) | ||
if err != nil { | ||
log.Error(4, "Run task failed: %s", err.Error()) | ||
} | ||
} | ||
|
||
time.Sleep(time.Millisecond * 100) | ||
} | ||
return nil | ||
} | ||
|
||
// Push implements Queue | ||
func (r *RedisQueue) Push(task *models.Task) error { | ||
bs, err := json.Marshal(task) | ||
if err != nil { | ||
return err | ||
} | ||
return r.client.RPush(r.queueName, bs).Err() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters