Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Asynchronous processing #31

Closed
tomtau opened this issue Nov 30, 2018 · 21 comments
Closed

Asynchronous processing #31

tomtau opened this issue Nov 30, 2018 · 21 comments

Comments

@tomtau
Copy link
Contributor

tomtau commented Nov 30, 2018

As mentioned in: #21 (comment)
Currently, there's one mutex and synchronous IO, but not much reason for that. Mempool/info connections may not need to mutate the app state + The Tendermint ABCI specs say:

Thus, DeliverTx and CheckTx messages are sent asynchronously, while all other messages are sent synchronously.

@tomtau
Copy link
Contributor Author

tomtau commented Jan 17, 2019

btw, here's a bounty on this issue: https://www.bountysource.com/issues/66866776-asynchronous-processing

@enginespot
Copy link
Contributor

enginespot commented May 18, 2019

Hi @tomtau
I use Tokio to support all asynchronous I/O operations, and I found there are several issues to be solved for the entire application.

  • The communication protocol based on ABCI has not been fully operational and stable.
    The original empty_app is stable. however, when I modified it to an asynchronous application completely, it became not stable, I checked the previous communication logic via Wireshark. it sent an extra data packet [4, 26, 0] after each response, I do not know why it has happened.
    so I did the same thing as a workaround
    .and_then(move |writer| { // Workaround for ABCI protocol let mut flush_response = Response::new(); flush_response.set_flush(ResponseFlush::new()); println!("Return Flush Response! {:?}", flush_response); writer.send(flush_response)
  • add more error handling process
  • optimization async architecture
  • add more test cases

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

Hi @enginespot , thanks for your contribution!

ad the first point -- Most of the message exchanges are synchronous: https://tendermint.com/docs/spec/abci/abci.html#messages so they need Flush to be called AFAIK.
The only exception to that are CheckTx and DeliverTX that are sent asynchronously (but in order) -- so presumably rust-abci could split the receiving and responding parts to be concurrent in some way.

ad error handling -- #49 + feel free to open some more specific issues

ad optimization -- I think relaxing that mutex could help; Mempool/info may not need to mutate the state

ad test cases -- a bit vague issues are here #29 #36

@ebuchman @melekes @liamsi feel free to correct or clarify any of the synchronous / asynchronous assumptions between Tendermint and ABCI apps

@enginespot
Copy link
Contributor

enginespot commented May 20, 2019

Hi @tomtau, thanks for your quick reply
for the first point, I changed the decoder/encoder logic, currently, ABCI protocol should work well now. but there is another error occurred:

panic: Tendermint state.AppHash does not match AppHash after replay. Got 0000000000000000, expected 

goroutine 1 [running]:
github.com/tendermint/tendermint/consensus.checkAppHash(0xa, 0x0, 0xc000037310, 0x6, 0xc00039a440, 0x11, 0x285, 0x0, 0xc00039a460, 0x20, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:461 +0x1ec
github.com/tendermint/tendermint/consensus.(*Handshaker).replayBlocks(0xc000382180, 0xa, 0x0, 0xc000037310, 0x6, 0xc00039a440, 0x11, 0x285, 0x0, 0xc00039a460, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:437 +0x327
github.com/tendermint/tendermint/consensus.(*Handshaker).ReplayBlocks(0xc000382180, 0xa, 0x0, 0xc000037310, 0x6, 0xc00039a440, 0x11, 0x285, 0x0, 0xc00039a460, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:358 +0x9d4
github.com/tendermint/tendermint/consensus.(*Handshaker).Handshake(0xc000382180, 0x1d79c40, 0xc000158620, 0x2, 0x1d6af40)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:264 +0x6b8
github.com/tendermint/tendermint/node.NewNode(0xc0001943c0, 0x1d64500, 0xc0000c3d60, 0xc000957500, 0x1d509a0, 0xc000939bc0, 0xc000957790, 0x1c3d318, 0xc0009577a0, 0x1d6af40, ...)
	/go/src/github.com/tendermint/tendermint/node/node.go:266 +0xb4f
github.com/tendermint/tendermint/node.DefaultNewNode(0xc0001943c0, 0x1d6af40, 0xc0009574f0, 0xc0001ecb00, 0xc0008d1c90, 0xc00093f3c0)
	/go/src/github.com/tendermint/tendermint/node/node.go:109 +0x448
github.com/tendermint/tendermint/cmd/tendermint/commands.NewRunNodeCmd.func1(0xc00019c900, 0xc00036eba0, 0x0, 0x3, 0x0, 0x0)
	/go/src/github.com/tendermint/tendermint/cmd/tendermint/commands/run_node.go:53 +0x5b
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).execute(0xc00019c900, 0xc00036eb10, 0x3, 0x3, 0xc00019c900, 0xc00036eb10)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:698 +0x431
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).ExecuteC(0x24f0ea0, 0x2, 0xc00016b4e0, 0x1af4e59)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:783 +0x2ca
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).Execute(...)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:736
github.com/tendermint/tendermint/libs/cli.Executor.Execute(0x24f0ea0, 0x1c3f420, 0x2, 0xc00003b6e0)
	/go/src/github.com/tendermint/tendermint/libs/cli/setup.go:89 +0x3c
main.main()
	/go/src/github.com/tendermint/tendermint/cmd/tendermint/main.go:45 +0x24a

hmm... can you help me to check what's wrong or give some tips?

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

@enginespot Which ABCI application are you testing it against? ABCI applications need to be deterministic -- i.e. given the same messages (in the same order), they should return the same responses... AppHash is a compact representation of the ABCI app state (that was derived after applying actions corresponding to ABCI messages) -- often, it's the root of some Merkle tree, but it could be anything... e.g. in that "counter" example app, it's count in BigEndian: https://github.com/tendermint/rust-abci/blob/master/examples/counter_app.rs#L63

@enginespot
Copy link
Contributor

@tomtau I use "./tendermint node" and also "empty_app" for testing, hmm... what I do by now is only change all of the logic with Tokio framework, and change ABCI decode/encode logic, maybe it has no relationship with AppHash?

@enginespot
Copy link
Contributor

for the first point, I tried to print the ResponseCommit data,

if response.has_commit() { println!("Encode Response Commit{:?}", &output[..]); }
and I find that the output data is different,
for the original version:
Encode Response Commit! [4, 98, 0]
but for the version I changed, the output data is :
Encode Response Commit! [24, 98, 10, 18, 8, 0, 0, 0, 0, 0, 0, 0, 0]

so I'm not sure if it's a problem here.

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

@enginespot could you post the content of genesis.json in .tendermint/config/? when did you get that panic? Immediately after starting up the node?
If so, my guess would be that InitChainRequest/Response failed, because it returns 0000000000000000 as app hash ("data" field) in the block commit response, but the genesis had it as:

"app_hash": "",

@enginespot
Copy link
Contributor

{
  "genesis_time": "2019-05-20T06:05:27.604801Z",
  "chain_id": "test-chain-FlpL8q",
  "consensus_params": {
    "block": {
      "max_bytes": "22020096",
      "max_gas": "-1",
      "time_iota_ms": "1000"
    },
    "evidence": {
      "max_age": "100000"
    },
    "validator": {
      "pub_key_types": [
        "ed25519"
      ]
    }
  },
  "validators": [
    {
      "address": "34A9A97D4D78A59A20AED4AED086DF3EB67311EB",
      "pub_key": {
        "type": "tendermint/PubKeyEd25519",
        "value": "t6hp5hXGAaadDT4RoWgHGd4bkX2I0P6BkRs/QDsGESU="
      },
      "power": "10",
      "name": ""
    }
  ],
  "app_hash": ""
}

@enginespot
Copy link
Contributor

➜  ~ ./tendermint node --trace --log_level info
I[2019-05-20|15:02:21.639] Starting multiAppConn                        module=proxy impl=multiAppConn
I[2019-05-20|15:02:21.639] Starting socketClient                        module=abci-client connection=query impl=socketClient
E[2019-05-20|15:02:21.640] abci.socketClient failed to connect to tcp://127.0.0.1:26658.  Retrying... module=abci-client connection=query err="dial tcp 127.0.0.1:26658: connect: connection refused"
E[2019-05-20|15:02:24.640] abci.socketClient failed to connect to tcp://127.0.0.1:26658.  Retrying... module=abci-client connection=query err="dial tcp 127.0.0.1:26658: connect: connection refused"
E[2019-05-20|15:02:27.644] abci.socketClient failed to connect to tcp://127.0.0.1:26658.  Retrying... module=abci-client connection=query err="dial tcp 127.0.0.1:26658: connect: connection refused"
I[2019-05-20|15:02:30.646] Starting socketClient                        module=abci-client connection=mempool impl=socketClient
I[2019-05-20|15:02:30.647] Starting socketClient                        module=abci-client connection=consensus impl=socketClient
I[2019-05-20|15:02:30.647] Starting EventBus                            module=events impl=EventBus
I[2019-05-20|15:02:30.647] Starting PubSub                              module=pubsub impl=PubSub
I[2019-05-20|15:02:30.677] Starting IndexerService                      module=txindex impl=IndexerService
I[2019-05-20|15:02:30.685] ABCI Handshake App Info                      module=consensus height=0 hash= software-version= protocol-version=0
I[2019-05-20|15:02:30.691] ABCI Replay Blocks                           module=consensus appHeight=0 storeHeight=751 stateHeight=751
I[2019-05-20|15:02:30.696] Applying block                               module=consensus height=1
I[2019-05-20|15:02:30.702] Executed block                               module=consensus height=1 validTxs=0 invalidTxs=0
...
...
I[2019-05-20|15:02:32.682] Applying block                               module=consensus height=751
I[2019-05-20|15:02:32.684] Executed block                               module=consensus height=751 validTxs=0 invalidTxs=0
panic: Tendermint state.AppHash does not match AppHash after replay. Got 0000000000000000, expected 

goroutine 1 [running]:
github.com/tendermint/tendermint/consensus.checkAppHash(0xa, 0x0, 0xc000036dd0, 0x6, 0xc00003a940, 0x11, 0x2ef, 0x0, 0xc00003a960, 0x20, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:461 +0x1ec
github.com/tendermint/tendermint/consensus.(*Handshaker).replayBlocks(0xc00007d800, 0xa, 0x0, 0xc000036dd0, 0x6, 0xc00003a940, 0x11, 0x2ef, 0x0, 0xc00003a960, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:437 +0x327
github.com/tendermint/tendermint/consensus.(*Handshaker).ReplayBlocks(0xc00007d800, 0xa, 0x0, 0xc000036dd0, 0x6, 0xc00003a940, 0x11, 0x2ef, 0x0, 0xc00003a960, ...)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:358 +0x9d4
github.com/tendermint/tendermint/consensus.(*Handshaker).Handshake(0xc00007d800, 0x1d79c40, 0xc000112ee0, 0x2, 0x1d6af40)
	/go/src/github.com/tendermint/tendermint/consensus/replay.go:264 +0x6b8
github.com/tendermint/tendermint/node.NewNode(0xc000010280, 0x1d64500, 0xc000914a00, 0xc000366000, 0x1d509a0, 0xc0008d6450, 0xc0008e8630, 0x1c3d318, 0xc0008e8640, 0x1d6af40, ...)
	/go/src/github.com/tendermint/tendermint/node/node.go:266 +0xb4f
github.com/tendermint/tendermint/node.DefaultNewNode(0xc000010280, 0x1d6af40, 0xc0008e8490, 0xc000492600, 0xc0008e3c90, 0xc000150fa0)
	/go/src/github.com/tendermint/tendermint/node/node.go:109 +0x448
github.com/tendermint/tendermint/cmd/tendermint/commands.NewRunNodeCmd.func1(0xc0000befc0, 0xc0000e2990, 0x0, 0x3, 0x0, 0x0)
	/go/src/github.com/tendermint/tendermint/cmd/tendermint/commands/run_node.go:53 +0x5b
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).execute(0xc0000befc0, 0xc0000e2900, 0x3, 0x3, 0xc0000befc0, 0xc0000e2900)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:698 +0x431
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).ExecuteC(0x24f0ea0, 0x2, 0xc00000db80, 0x1af4e59)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:783 +0x2ca
github.com/tendermint/tendermint/vendor/github.com/spf13/cobra.(*Command).Execute(...)
	/go/src/github.com/tendermint/tendermint/vendor/github.com/spf13/cobra/command.go:736
github.com/tendermint/tendermint/libs/cli.Executor.Execute(0x24f0ea0, 0x1c3f420, 0x2, 0xc0000e51c0)
	/go/src/github.com/tendermint/tendermint/libs/cli/setup.go:89 +0x3c
main.main()
	/go/src/github.com/tendermint/tendermint/cmd/tendermint/main.go:45 +0x24a

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

and if you configure app_hash in genesis.json to something like:

...
"app_hash": "0000000000000000",
...
``` ?

@enginespot
Copy link
Contributor

enginespot commented May 20, 2019

hmm... the same error occurs.
I find that if I change the encode method

fn encode(&mut self, msg: Response, buf: &mut BytesMut) -> io::Result<()> {
        let mut msg_to_vec = Vec::new();
        msg.write_to_vec(&mut msg_to_vec).unwrap();
        let msg_len: i64 = msg_to_vec.len() as i64;
        let varint = i64::encode_var_vec(msg_len);
        buf.put(&varint);
        buf.reserve(msg_len as usize);
        msg.write_to_writer(&mut buf.writer()).unwrap();
        println!("Encode response! {:?}", &buf[..]);
        
        Ok(())
    }

to the follow method:

 fn encode(&mut self, msg: Response, buf: &mut BytesMut) -> io::Result<()> {

        if msg.has_commit() {
            buf.writer().write(b"\x04\x62\0");
        }
        else {
            let mut msg_to_vec = Vec::new();
            msg.write_to_vec(&mut msg_to_vec).unwrap();
            let msg_len: i64 = msg_to_vec.len() as i64;
            let varint = i64::encode_var_vec(msg_len);
            buf.put(&varint);
            buf.reserve( msg_len as usize);
            msg.write_to_writer(&mut buf.writer()).unwrap();
            println!("Encode response! {:?}", &buf[..]);
        }
        Ok(())
    }

the empty_app will work well.

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

@enginespot did you run tendermint unsafe_reset_all before starting the node?

@enginespot
Copy link
Contributor

@tomtau Great!
after I run the command

tendermint unsafe_reset_all

the error disappeared.:)

@enginespot
Copy link
Contributor

Hi @tomtau
currently, we support async processing by async I/O, Which place do you think needs to be changed by now?

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

@enginespot async I/O is a good start. the next part I mentioned above #31 (comment):

presumably rust-abci could split the receiving and responding parts to be concurrent in some way.

For example, in the original Tendermint implementation:
https://github.com/tendermint/tendermint/blob/dfa9a9a30a666132425b29454e90a472aa579a48/abci/server/socket_server.go#L120

there's a channel of responses, the request processing routine reads from the connection and writes to the response channel, the response writing routine pulls responses from the channel and writes them to the connection.

So, ideally, Rust ABCI would work in a similar fashion.

I'll let @ebuchman @melekes @liamsi comment if they have any thoughts about it.

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

This splitting may actually be already done in the PR, with the ABCICodec: https://github.com/tendermint/rust-abci/pull/50/files#diff-8063a415c8305a1c847560858568db8fR11

@enginespot
Copy link
Contributor

we use "spawn" to process each request so it will do not block, I think currently it can receiving and responding data concurrently.

https://tokio.rs/docs/futures/spawning/

            let responses = reader.map(move |request| {
                println!("Got Request! {:?}", request);
                let response = respond(&app_instance, &request);
                return response;
            });

            let writes = responses.fold(_writer, |writer, response| {
                println!("Return Response! {:?}", response);
                writer.send(response)
            });
            tokio::spawn(writes.then(|_| Ok(())))

@tomtau
Copy link
Contributor Author

tomtau commented May 20, 2019

It may be fine. There are two parts I haven't tested / verified: 1) is the order preserved? 2) the respond call is potentially blocking -- would it affect the rest?

@enginespot
Copy link
Contributor

enginespot commented May 20, 2019

for the 1) is the order preserved?
since that it will process request one by one, so the response should be one by one, so the order is preserved .
for the 2) the respond call is potentially blocking -- would it affect the rest?
I updated the new coding and removed the following code

let mut guard = app.lock().unwrap();	
                let a = guard.deref_mut();

so by now , it should non-blocking

hmm... I have no simple code for the testing, so I think it also needs a test to confirm.

@tomtau
Copy link
Contributor Author

tomtau commented May 21, 2019

@enginespot read through the code and left a comment in the corresponding PR #50 it'll be better to move the code-related discussion there

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants