diff --git a/server/config/config.go b/server/config/config.go index b3a2e22f23c1..5322bc5a53bb 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -340,10 +340,10 @@ func DefaultConfig() *Config { Streamers: StreamersConfig{ File: FileStreamerConfig{ Keys: []string{"*"}, - WriteDir: "data/file_streamer", + WriteDir: "", OutputMetadata: true, StopNodeOnError: true, - // NOTICE: the default config don't protect the streamer data integrity + // NOTICE: The default config doesn't protect the streamer data integrity // in face of system crash. Fsync: false, }, diff --git a/server/config/toml.go b/server/config/toml.go index 2f1a3b5d3e4e..ca275f399412 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -248,11 +248,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] write_dir = "{{ .Streamers.File.WriteDir }}" prefix = "{{ .Streamers.File.Prefix }}" + # output-metadata specifies if output the metadata file which includes the abci request/responses # during processing the block. output-metadata = "{{ .Streamers.File.OutputMetadata }}" + # stop-node-on-error specifies if propagate the file streamer errors to consensus state machine. stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}" + # fsync specifies if call fsync after writing the files. fsync = "{{ .Streamers.File.Fsync }}" @@ -265,7 +268,6 @@ fsync = "{{ .Streamers.File.Fsync }}" # Setting max_txs to negative 1 (-1) will disable transactions from being inserted into the mempool. # Setting max_txs to a positive number (> 0) will limit the number of transactions in the mempool, by the specified amount. max-txs = "{{ .Mempool.MaxTxs }}" - ` var configTemplate *template.Template diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index e98e678352cb..71cb26ae5192 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -22,7 +22,8 @@ import ( var _ baseapp.StreamingService = &StreamingService{} -// StreamingService is a concrete implementation of StreamingService that writes state changes out to files +// StreamingService is a concrete implementation of StreamingService that writes +// state changes out to files. type StreamingService struct { storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore filePrefix string // optional prefix for each of the generated files @@ -32,37 +33,50 @@ type StreamingService struct { currentBlockNumber int64 blockMetadata types.BlockMetadata - // if write the metadata file, otherwise only data file is outputted. + + // outputMetadata, if true, writes additional metadata to file per block outputMetadata bool - // if true, when commit failed it will panic and stop the consensus state machine to ensure the - // eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data. + + // stopNodeOnErr, if true, will panic and stop the node during ABCI Commit + // to ensure eventual consistency of the output, otherwise, any errors are + // logged and ignored which could yield data loss in streamed output. stopNodeOnErr bool - // if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash. + + // fsync, if true, will execute file Sync to make sure the data is persisted + // onto disk, otherwise there is a risk of data loss during any crash. fsync bool } -// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys -func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { +func NewStreamingService( + writeDir, filePrefix string, + storeKeys []types.StoreKey, + cdc codec.BinaryCodec, + logger log.Logger, + outputMetadata, stopNodeOnErr, fsync bool, +) (*StreamingService, error) { // sort storeKeys for deterministic output sort.SliceStable(storeKeys, func(i, j int) bool { return storeKeys[i].Name() < storeKeys[j].Name() }) + // NOTE: We use the same listener for each store. listeners := make([]*types.MemoryListener, len(storeKeys)) - // in this case, we are using the same listener for each Store for i, key := range storeKeys { listeners[i] = types.NewMemoryListener(key) } - // check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not - // we don't open a dstFile until we receive our first ABCI message + + // Check that the writeDir exists and is writable so that we can catch the + // error here at initialization. If it is not we don't open a dstFile until we + // receive our first ABCI message. if err := isDirWriteable(writeDir); err != nil { return nil, err } + return &StreamingService{ storeListeners: listeners, filePrefix: filePrefix, writeDir: writeDir, - codec: c, + codec: cdc, logger: logger, outputMetadata: outputMetadata, stopNodeOnErr: stopNodeOnErr, @@ -70,65 +84,74 @@ func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey }, nil } -// Listeners satisfies the baseapp.StreamingService interface -// It returns the StreamingService's underlying WriteListeners -// Use for registering the underlying WriteListeners with the BaseApp +// Listeners satisfies the StreamingService interface. It returns the +// StreamingService's underlying WriteListeners. Use for registering the +// underlying WriteListeners with the BaseApp. func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners)) for _, listener := range fss.storeListeners { listeners[listener.StoreKey()] = []types.WriteListener{listener} } + return listeners } -// ListenBeginBlock satisfies the baseapp.ABCIListener interface -// It writes the received BeginBlock request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) { +// ListenBeginBlock satisfies the ABCIListener interface. It sets the received +// BeginBlock request, response and the current block number. Note, these are +// not written to file until ListenCommit is executed and outputMetadata is set, +// after which it will be reset again on the next block. +func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { fss.blockMetadata.RequestBeginBlock = &req fss.blockMetadata.ResponseBeginBlock = &res fss.currentBlockNumber = req.Header.Height return nil } -// ListenDeliverTx satisfies the baseapp.ABCIListener interface -// It writes the received DeliverTx request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) { +// ListenDeliverTx satisfies the ABCIListener interface. It appends the received +// DeliverTx request and response to a list of DeliverTxs objects. Note, these +// are not written to file until ListenCommit is executed and outputMetadata is +// set, after which it will be reset again on the next block. +func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{ Request: &req, Response: &res, }) + return nil } -// ListenEndBlock satisfies the baseapp.ABCIListener interface -// It writes the received EndBlock request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) { +// ListenEndBlock satisfies the ABCIListener interface. It sets the received +// EndBlock request, response and the current block number. Note, these are +// not written to file until ListenCommit is executed and outputMetadata is set, +// after which it will be reset again on the next block. +func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { fss.blockMetadata.RequestEndBlock = &req fss.blockMetadata.ResponseEndBlock = &res return nil } -// ListenEndBlock satisfies the baseapp.ABCIListener interface +// ListenCommit satisfies the ABCIListener interface. It is executed during the +// ABCI Commit request and is responsible for writing all staged data to files. +// It will only return a non-nil error when stopNodeOnErr is set. func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error { - err := fss.doListenCommit(ctx, res) - if err != nil { - fss.logger.Error("Commit listening hook failed", "height", fss.currentBlockNumber, "err", err) + if err := fss.doListenCommit(ctx, res); err != nil { + fss.logger.Error("Listen commit failed", "height", fss.currentBlockNumber, "err", err) if fss.stopNodeOnErr { return err } } + return nil } func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) { fss.blockMetadata.ResponseCommit = &res - // write to target files, the file size is written at the beginning, which can be used to detect completeness. + // Write to target files, the file size is written at the beginning, which can + // be used to detect completeness. metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber) dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber) + if fss.filePrefix != "" { metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName) dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName) @@ -139,6 +162,7 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err != nil { return err } + if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil { return err } @@ -148,42 +172,44 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err := fss.writeBlockData(&buf); err != nil { return err } + return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync) } func (fss *StreamingService) writeBlockData(writer io.Writer) error { for _, listener := range fss.storeListeners { cache := listener.PopStateCache() + for i := range cache { bz, err := fss.codec.MarshalLengthPrefixed(&cache[i]) if err != nil { return err } - if _, err = writer.Write(bz); err != nil { + + if _, err := writer.Write(bz); err != nil { return err } } } - return nil -} -// Stream satisfies the baseapp.StreamingService interface -func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil } -// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface -func (fss *StreamingService) Close() error { - return nil -} +// Stream satisfies the StreamingService interface. It performs a no-op. +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil } + +// Close satisfies the StreamingService interface. It performs a no-op. +func (fss *StreamingService) Close() error { return nil } // isDirWriteable checks if dir is writable by writing and removing a file -// to dir. It returns nil if dir is writable. +// to dir. It returns nil if dir is writable. We have to do this as there is no +// platform-independent way of determining if a directory is writeable. func isDirWriteable(dir string) error { f := path.Join(dir, ".touch") if err := os.WriteFile(f, []byte(""), 0o600); err != nil { return err } + return os.Remove(f) } @@ -193,25 +219,30 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) { if err != nil { return sdkerrors.Wrapf(err, "open file failed: %s", path) } + defer func() { // avoid overriding the real error with file close error if err1 := f.Close(); err1 != nil && err == nil { err = sdkerrors.Wrapf(err, "close file failed: %s", path) } }() + _, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data)))) if err != nil { return sdkerrors.Wrapf(err, "write length prefix failed: %s", path) } + _, err = f.Write(data) if err != nil { return sdkerrors.Wrapf(err, "write block data failed: %s", path) } + if fsync { err = f.Sync() if err != nil { return sdkerrors.Wrapf(err, "fsync failed: %s", path) } } - return + + return err } diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index f2297292e0a8..7ff49aa58f6d 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -114,20 +114,23 @@ func TestFileStreamingService(t *testing.T) { if os.Getenv("CI") != "" { t.Skip("Skipping TestFileStreamingService in CI environment") } - err := os.Mkdir(testDir, 0o700) - require.Nil(t, err) + + require.Nil(t, os.Mkdir(testDir, 0o700)) defer os.RemoveAll(testDir) testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} - testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false) + testStreamingService, err := NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false) require.Nil(t, err) require.IsType(t, &StreamingService{}, testStreamingService) require.Equal(t, testPrefix, testStreamingService.filePrefix) require.Equal(t, testDir, testStreamingService.writeDir) require.Equal(t, testMarshaller, testStreamingService.codec) + testListener1 = testStreamingService.storeListeners[0] testListener2 = testStreamingService.storeListeners[1] + wg := new(sync.WaitGroup) + testStreamingService.Stream(wg) testListenBlock(t) testStreamingService.Close() @@ -135,7 +138,10 @@ func TestFileStreamingService(t *testing.T) { } func testListenBlock(t *testing.T) { - var expectKVPairsStore1, expectKVPairsStore2 [][]byte + var ( + expectKVPairsStore1 [][]byte + expectKVPairsStore2 [][]byte + ) // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) @@ -150,6 +156,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, @@ -157,6 +164,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey3, @@ -164,6 +172,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair3) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2) @@ -184,6 +193,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, @@ -191,6 +201,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -198,6 +209,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2, expectedKVPair3) @@ -218,6 +230,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -225,6 +238,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -232,6 +246,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair2) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair1, expectedKVPair3) @@ -252,6 +267,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -259,6 +275,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -314,37 +331,46 @@ func readInFile(name string) ([]byte, error) { if err != nil { return nil, err } + size := sdk.BigEndianToUint64(bz[:8]) if len(bz) != int(size)+8 { return nil, errors.New("incomplete file ") } + return bz[8:], nil } -// segmentBytes returns all of the protobuf messages contained in the byte array as an array of byte arrays -// The messages have their length prefix removed +// segmentBytes returns all of the protobuf messages contained in the byte array +// as an array of byte arrays. The messages have their length prefix removed. func segmentBytes(bz []byte) ([][]byte, error) { var err error + segments := make([][]byte, 0) for len(bz) > 0 { var segment []byte + segment, bz, err = getHeadSegment(bz) if err != nil { return nil, err } + segments = append(segments, segment) } + return segments, nil } -// getHeadSegment returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array +// getHeadSegment returns the bytes for the leading protobuf object in the byte +// array (removing the length prefix) and returns the remainder of the byte array. func getHeadSegment(bz []byte) ([]byte, []byte, error) { size, prefixSize := binary.Uvarint(bz) if prefixSize < 0 { return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize) } + if size > uint64(len(bz)-prefixSize) { return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize) } + return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil } diff --git a/store/types/listening.go b/store/types/listening.go index 5f21689449fd..7f7f50ea0ead 100644 --- a/store/types/listening.go +++ b/store/types/listening.go @@ -6,7 +6,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" ) -// WriteListener interface for streaming data out from a listenkv.Store +// WriteListener interface for streaming data out from a KVStore type WriteListener interface { // if value is nil then it was deleted // storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores @@ -14,35 +14,42 @@ type WriteListener interface { OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } -// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed -// protobuf encoded StoreKVPairs to an underlying io.Writer +// StoreKVPairWriteListener is used to configure listening to a KVStore by +// writing out length-prefixed Protobuf encoded StoreKVPairs to an underlying +// io.Writer object. type StoreKVPairWriteListener struct { writer io.Writer marshaller codec.BinaryCodec } -// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec -func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener { +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a +// provided io.Writer and codec.BinaryCodec. +func NewStoreKVPairWriteListener(w io.Writer, m codec.Codec) *StoreKVPairWriteListener { return &StoreKVPairWriteListener{ writer: w, marshaller: m, } } -// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs +// OnWrite satisfies the WriteListener interface by writing length-prefixed +// Protobuf encoded StoreKVPairs. func (wl *StoreKVPairWriteListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { - kvPair := new(StoreKVPair) - kvPair.StoreKey = storeKey.Name() - kvPair.Delete = delete - kvPair.Key = key - kvPair.Value = value + kvPair := &StoreKVPair{ + StoreKey: storeKey.Name(), + Key: key, + Value: value, + Delete: delete, + } + by, err := wl.marshaller.MarshalLengthPrefixed(kvPair) if err != nil { return err } + if _, err := wl.writer.Write(by); err != nil { return err } + return nil } @@ -57,7 +64,7 @@ func NewMemoryListener(key StoreKey) *MemoryListener { return &MemoryListener{key: key} } -// OnWrite implements WriteListener interface +// OnWrite implements WriteListener interface. func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { fl.stateCache = append(fl.stateCache, StoreKVPair{ StoreKey: storeKey.Name(), @@ -65,17 +72,19 @@ func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, d Key: key, Value: value, }) + return nil } -// PopStateCache returns the current state caches and set to nil +// PopStateCache returns the current state caches and set to nil. func (fl *MemoryListener) PopStateCache() []StoreKVPair { res := fl.stateCache fl.stateCache = nil + return res } -// StoreKey returns the storeKey it listens to +// StoreKey returns the storeKey it listens to. func (fl *MemoryListener) StoreKey() StoreKey { return fl.key }