-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathHttpPusher.go
74 lines (61 loc) · 1.41 KB
/
HttpPusher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package main
import (
"fmt"
"github.com/astaxie/beego/logs"
"github.com/nahid/gohttp"
"os"
)
type SendData struct {
Token string
Msg string
Topic string
}
var (
HttpSender *PushClient
)
var ThreadNum = 4;
type MessageHttp struct {
line string
topic string
}
type PushClient struct {
url string
lineHttpChan chan *MessageHttp
}
func NewHttpPusher(Address string) (afk *PushClient, err error) {
afk = &PushClient{
lineHttpChan: make(chan *MessageHttp, 1024),
url: Address,
}
if err != nil {
fmt.Printf("Failed to create Connetcion: %s\n", err)
os.Exit(1)
}
for i := 0; i < ThreadNum; i++ {
// 根据配置文件循环开启线程去发消息到kafka
go afk.Pusher()
}
return
}
func InitHttpPusher() (err error) {
HttpSender, err = NewHttpPusher("http://kafka.wd.cn/rest/api/msg")
return
}
func (k *PushClient) Pusher() {
//从channel中读取日志内容放到kafka消息队列中
logs.Info("[start pusher]")
req := gohttp.NewRequest()
ch := make(chan *gohttp.AsyncResponse)
for v := range k.lineHttpChan {
var headerVals = map[string]string{}
headerVals["Token"]="bigdata.hive"
headerVals["Topic"]=v.topic
req.Body([]byte(v.line)).Headers(headerVals).AsyncPost(k.url, ch)
<-ch //让go route提前退出
//close(ch)
}
}
func (k *PushClient) addMessage(line string, topic string) (err error) {
k.lineHttpChan <- &MessageHttp{line: line, topic: topic}
return
}