-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathpush_stream.go
76 lines (64 loc) · 1.68 KB
/
push_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package output
import (
"context"
"github.com/ab180/lrmr/cluster/node"
"github.com/ab180/lrmr/lrdd"
"github.com/ab180/lrmr/lrmrpb"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"
)
type PushStream struct {
stream lrmrpb.Node_PushDataClient
reqCache *lrmrpb.PushDataRequest
}
func OpenPushStream(ctx context.Context, rpc lrmrpb.NodeClient, n *node.Node, host, taskID string,
) (Output, error) {
header := &lrmrpb.DataHeader{
TaskID: taskID,
}
if n != nil {
header.FromHost = n.Host
} else {
header.FromHost = "master"
}
rawHead, _ := jsoniter.MarshalToString(header)
runCtx := metadata.AppendToOutgoingContext(ctx, "dataHeader", rawHead)
stream, err := rpc.PushData(runCtx)
if err != nil {
return nil, errors.Wrapf(err, "open stream to %s", host)
}
return &PushStream{
stream: stream,
reqCache: &lrmrpb.PushDataRequest{},
}, nil
}
func (p *PushStream) Write(rows []lrdd.Row) error {
var err error
for i, row := range rows {
if len(p.reqCache.Data) == cap(p.reqCache.Data) {
p.reqCache.Data = append(p.reqCache.Data, &lrdd.RawRow{})
} else {
p.reqCache.Data = p.reqCache.Data[:len(p.reqCache.Data)+1]
if p.reqCache.Data[len(p.reqCache.Data)-1] == nil {
p.reqCache.Data[len(p.reqCache.Data)-1] = &lrdd.RawRow{}
}
}
p.reqCache.Data[i].Key = row.Key
p.reqCache.Data[i].Value, err = row.Value.MarshalMsg(p.reqCache.Data[i].Value)
if err != nil {
return err
}
row.Value.ReturnToPool()
}
err = p.stream.Send(p.reqCache)
if err != nil {
return err
}
p.reqCache.RemainCapicityReset()
return nil
}
func (p *PushStream) Close() error {
_, err := p.stream.CloseAndRecv()
return err
}