-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathstreaming.go
139 lines (123 loc) · 4.43 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package baseapp
import (
"fmt"
"github.com/spf13/cast"
"github.com/cosmos/cosmos-sdk/codec"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/types"
)
// ABCIListener is the interface that we're exposing as a streaming.
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(blockHeight int64, req []byte, res []byte) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(blockHeight int64, req []byte, res []byte) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(blockHeight int64, req []byte, res []byte) error
// ListenStoreKVPair updates the steaming service with the latest StoreKVPair messages
ListenStoreKVPair(blockHeight int64, data []byte) error
}
// StreamingService for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService struct {
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners map[types.StoreKey][]types.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener ABCIListener
// StopNodeOnErr stops the node when true
StopNodeOnErr bool
}
// KVStoreListener is used so that we do not need to update the underlying
// io.Writer inside the StoreKVPairWriteListener everytime we begin writing
type KVStoreListener struct {
BlockHeight func() int64
listener ABCIListener
stopNodeOnErr bool
}
// NewKVStoreListener create an instance of an NewKVStoreListener that sends StoreKVPair data to listening service
func NewKVStoreListener(
listener ABCIListener,
stopNodeOnErr bool,
blockHeight func() int64,
) *KVStoreListener {
return &KVStoreListener{
listener: listener,
stopNodeOnErr: stopNodeOnErr,
BlockHeight: blockHeight,
}
}
// Write satisfies io.Writer
func (iw *KVStoreListener) Write(b []byte) (int, error) {
blockHeight := iw.BlockHeight()
if err := iw.listener.ListenStoreKVPair(blockHeight, b); err != nil {
if iw.stopNodeOnErr {
panic(err)
}
return 0, err
}
return len(b), nil
}
const (
StreamingTomlKey = "streaming"
StreamingEnableTomlKey = "enable"
StreamingPluginTomlKey = "plugin"
StreamingKeysTomlKey = "keys"
StreamingStopNodeOnErrTomlKey = "stop-node-on-err"
)
// RegisterStreamingService registers the ABCI streaming service provided by the streaming plugin.
func RegisterStreamingService(
bApp *BaseApp,
appOpts servertypes.AppOptions,
kodec codec.BinaryCodec,
keys map[string]*types.KVStoreKey,
streamingService interface{},
) error {
// type checking
abciListener, ok := streamingService.(ABCIListener)
if !ok {
return fmt.Errorf("failed to register streaming service: failed type check %v", streamingService)
}
// expose keys
keysKey := fmt.Sprintf("%s.%s", StreamingTomlKey, StreamingKeysTomlKey)
exposeKeysStr := cast.ToStringSlice(appOpts.Get(keysKey))
exposeStoreKeys := exposeStoreKeys(exposeKeysStr, keys)
stopNodeOnErrKey := fmt.Sprintf("%s.%s", StreamingTomlKey, StreamingStopNodeOnErrTomlKey)
stopNodeOnErr := cast.ToBool(appOpts.Get(stopNodeOnErrKey))
blockHeightFn := func() int64 { return bApp.deliverState.ctx.BlockHeight() }
writer := NewKVStoreListener(abciListener, stopNodeOnErr, blockHeightFn)
listener := types.NewStoreKVPairWriteListener(writer, kodec)
listeners := make(map[types.StoreKey][]types.WriteListener, len(exposeStoreKeys))
for _, key := range exposeStoreKeys {
listeners[key] = append(listeners[key], listener)
}
bApp.SetStreamingService(StreamingService{
Listeners: listeners,
ABCIListener: abciListener,
StopNodeOnErr: stopNodeOnErr,
})
return nil
}
func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
}
func exposeStoreKeys(keysStr []string, keys map[string]*types.KVStoreKey) []types.StoreKey {
var exposeStoreKeys []types.StoreKey
if exposeAll(keysStr) {
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]types.StoreKey, 0, len(keysStr))
for _, keyStr := range keysStr {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
return exposeStoreKeys
}