Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vertx don't instant sends messages while processing handlers! #43

Closed
tetv opened this issue Sep 4, 2016 · 17 comments
Closed

vertx don't instant sends messages while processing handlers! #43

tetv opened this issue Sep 4, 2016 · 17 comments
Labels

Comments

@tetv
Copy link

tetv commented Sep 4, 2016

It seems that vertx only sends cluster messages when is idle (nothing to process).
That means while a vertx is very busy/active there is no messages dispatched to eventBus until the vertx becomes completely idle, that could be a big limitation if there is a need to send instant messages.

What is the solution to force vertx to instant send a message without waiting the need to be idle?

In order to demonstrate this issue, I have created two very simple verticles:

  • Verticles ping: sends 60 instant messages to pong and prints the reply answers.
  • Verticle pong: emulates a 50ms processing time and reply the message

Scenarios: (cluster mode)

  • Test1: pong is a worker verticle:
  • Test2: pong is a multithread worker verticle:
java -cp 'target/*' io.vertx.core.Launcher start my.pacakge.Pong -id pong -worker -cluster
java -cp 'target/*' io.vertx.core.Launcher start my.pacakge.Ping -id ping -worker -cluster

For these tests ping verticle is a worker verticle ([w-##] means vert.x-worker-thread-##), however it's not revenant if it's a standard verticle because both configurations are similar. Both ping and pong suffers from the reported issue and therefore the messages were stuck on eventBus until idle (nothing to process).

Note that If the ping used a multithread worker then the 1 second extra stuff wasn't noticed and the entire process was 1 second quicker (see ping code), however the pong continues suffering from the reported issue.

ping verticle code:

public class Ping extends AbstractVerticle {
    private final static Logger LOG = LoggerFactory.getLogger(Ping.class);

    @Override
    public void start() {
        for (int i = 1; i <= 60; i++) {
            String ball = String.format("ball%02d", i);
            getVertx().eventBus().send("table", ball, response -> {
                if (response.succeeded()) {
                    LOG.info("ping> recv " + response.result().body());
                } else {
                    LOG.error("ping> fail " + response.cause());
                }
            });
        }
        LOG.info("ping> send ball01 to ball60");
        doExtraStuff(); // 1 second wait
    }

    private static void doExtraStuff() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}

pong verticle code:

public class Pong extends AbstractVerticle {
    private final static Logger LOG = LoggerFactory.getLogger(Pong.class);

    @Override
    public void start() {
        getVertx().eventBus().consumer("table", message -> {
            String ball = (String)message.body();
            LOG.info("pong> recv " + ball);
            process(); // 50ms wait
            LOG.info("pong> send " + ball);
            message.reply(ball);
        });
    }

    private static void process() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}
  • Test1 results: ping and pong are worker verticle:
[2016-09-04 02:12:23.169] [w-02] ping> send ball01 to ball60 

# -> (ping is doing extra sfuff for 1 second and the 60 messages aren't dispatched to pong)

[2016-09-04 02:12:24.294]                              [w-03] pong> recv ball01 
[2016-09-04 02:12:24.349]                              [w-03] pong> send ball01 
[2016-09-04 02:12:24.364]                              [w-03] pong> recv ball02 
[2016-09-04 02:12:24.415]                              [w-03] pong> send ball02 
#... (2x57 lines - recv and send for ball03 to ball59 - time between recv and send is ~50ms)
[2016-09-04 02:12:27.392]                              [w-03] pong> recv ball60 
[2016-09-04 02:12:27.443]                              [w-03] pong> send ball60 

# -> (pong is idle and only now the 60 messages are dispatched to ping)
# -> (ping didn't receive answers for at least 3 seconds; pong processing time is 50ms!)

[2016-09-04 02:12:27.463] [w-04] ping> recv ball01 
[2016-09-04 02:12:27.465] [w-04] ping> recv ball02 
#... (57 lines - recv for ball03 to ball59)
[2016-09-04 02:12:27.504] [w-04] ping> recv ball60 

# -> (ping received the 60 reply messages practically in one go!)
  • Test2 results: ping is a worker verticle and pong is a multithread worker verticle:
[2016-09-04 02:12:37.623] [w-02] ping> send ball01 to ball60 

# -> (ping is doing extra sfuff for 1 second and the 60 messages aren't dispatched to pong)

[2016-09-04 02:12:38.754]                              [w-12] pong> ball01 recv 
[2016-09-04 02:12:38.754]                              [w-13] pong> ball02 recv 
#... (17 lines - recv for random bal01 to bal20 except ball01, ball02 and ball18)
[2016-09-04 02:12:38.760]                              [w-09] pong> ball18 recv 

# -> (pong doesn't have more idle threads - all 20 pool threads are processing for 50ms)

[2016-09-04 02:12:38.805]                              [w-13] pong> ball02 send 
[2016-09-04 02:12:38.805]                              [w-14] pong> ball03 send 
[2016-09-04 02:12:38.805]                              [w-13] pong> ball03 recv
#... (2x18 lines - with random send/recv bal01 to bal20 except already displayed balls)
[2016-09-04 02:12:38.835]                              [w-12] pong> ball40 recv 

# -> (pong doesn't have more idle threads - all 20 pool threads are processing for 50ms)

[2016-09-04 02:12:38.872]                              [w-16] pong> ball26 send 
[2016-09-04 02:12:38.872]                              [w-06] pong> ball27 send 
[2016-09-04 02:12:38.873]                              [w-16] pong> ball41 recv 
#... (2x17 lines - with random send/recv bal21 to bal40 except already displayed balls)
[2016-09-04 02:12:38.882]                              [w-02] pong> ball59 recv 
[2016-09-04 02:12:38.887]                              [w-14] pong> ball40 send 
[2016-09-04 02:12:38.888]                              [w-14] pong> ball60 recv 

# -> (pong doesn't have more idle threads - all 20 pool threads are processing for 50ms)

[2016-09-04 02:12:38.925]                              [w-16] pong> ball41 send 
[2016-09-04 02:12:38.926]                              [w-19] pong> ball44 send 
#... (17 lines - send for random bal41 to bal60 except ball41, ball44 and ball60)
[2016-09-04 02:12:38.938]                              [w-14] pong> ball60 send 

# -> (pong has idle threads and only the 60 messages are dispatched to ping)
# -> (ping didn't receive answers for at least 150ms seconds; pong processing time is 50ms!)

[2016-09-04 02:12:38.945] [w-04] ping> recv ball09 
[2016-09-04 02:12:38.959] [w-04] ping> recv ball04 
#... (57 lines - recv for random bal01 to bal60 except ball09, ball04 and ball44)
[2016-09-04 02:12:39.004] [w-04] ping> recv ball44 

# -> (ping received the 60 reply messages practically in one go!)

What is the solution to force vertx to instant send a message without waiting the need to be idle?

@tetv
Copy link
Author

tetv commented Sep 4, 2016

This issue was previously opened in eclipse/vertx repository, and @vietj wrote:

Hi,

with a worker it happens because Hazelcast uses an executeBlocking method that cannot be executed until the current task (the worker) has been finished (as it uses ordering).

can you rather open this issue in vertx-hazelcast project ?

@tetv
Copy link
Author

tetv commented Sep 6, 2016

A workaround while the fix is not out, it's creating another WorkerExecutor instead of using the default vertx.executeBlocking. (optionally you can add the metrics support for it).

Note that this workaround is just for worker verticles. For standard verticles the default vertx.executeBlocking it works well.

public class Pong extends AbstractVerticle {
    private final static Logger LOG = LoggerFactory.getLogger(Pong.class);
    private WorkerExecutor executor;

    @Override
    public void start() {
        executor = getVertx().createSharedWorkerExecutor("vert.x-new-internal-blocking", 20);

        getVertx().eventBus().consumer("table", message -> {
            // Run ordered or not it's up to you
            executor.executeBlocking(future -> {
                String ball = (String)message.body();
                LOG.info("pong> recv " + ball);
                process(); // 50ms wait
                LOG.info("pong> send " + ball);
                message.reply(ball);
                future.complete();
            }, result -> {});
        });
    }

    private static void process() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}

@tetv
Copy link
Author

tetv commented Sep 6, 2016

It's also related to that issue: vert-x3/issues#75

@vietj
Copy link
Contributor

vietj commented Sep 6, 2016

ideally we should not need to use such work around.

@vietj
Copy link
Contributor

vietj commented Sep 6, 2016

@tsegismont @cescoffier how about adding a test in clustering TCK that checks that we can send an event bus message in a blocking worker ?

@tsegismont
Copy link
Contributor

@vietj will do

2016-09-06 19:25 GMT+02:00 Julien Viet [email protected]:

@tsegismont https://github.com/tsegismont @cescoffier
https://github.com/cescoffier how about adding a test in clustering TCK
that checks that we can send an event bus message in a blocking worker ?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#43 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABbltmrfQy_5PNidBGEMLYdIyCSmlkwEks5qnaIRgaJpZM4J0mNR
.

@tetv
Copy link
Author

tetv commented Sep 7, 2016

Tests about this issue: (vertx don't instant sends messages while having messages to process):

Test Type / Verticle Type Standard Worker MT Worker
Direct code (No executeBlocking) No No No
vertx.executeBlocking(ordered) Yes Partial No
vertx.executeBlocking(not ordered) Yes No No
executor.executeBloking(ordered) Yes Yes Yes
executor.executeBloking(not ordered) Yes Yes Yes

Legend:

  • Yes: vertx instant sends messages even if there is messages to process
  • No: vertx only sends messages after not having messages to process (vertx is in idle)
  • Partial: Strange behaviour: The first 40 messages doesn't instant send, but after sends correctly.

If the issue is because a call to executeBlocking (worker verticle) mentioned by @vietj, then one possible solutions could be the second solution mentioned on the reported issue: vert-x3/issues#75

The hazelcast implementation could uses the current thread when it is a worker thread (doing a check on the current context and use execute blocking only when on event loop thread, quite tedious)

Or could use another WorkerExecutor to execute the blocking.

@LoneRifle
Copy link

LoneRifle commented Sep 9, 2016

#29 was recently merged, which enabled the use of non-blocking execution through setting a System property. Some calls to executeBlocking are replaced with executeAsync, which delegates to async methods implemented by Hazelcast's data structures.

@tetv , could you perform the tests again with -Dvertx.hazelcast.async-api=true?

A background of the abovementioned PR can be found at #28.

@tetv
Copy link
Author

tetv commented Sep 12, 2016

Thank you @LoneRifle, I will do again the tests.
I will assume that merge is included in version 3.3.2.

@tetv
Copy link
Author

tetv commented Sep 13, 2016

Tests created using vertx 3.3.2 (with and without executeAync).

  • Ping sends 500 instant messages to Pong.
  • Pong emulates 50ms processing time and answers to Ping.

Each cell tells if the messages are sent instantaneously to Ping after processing

v Test Type / Pong Verticle Type > Standard Worker MT Worker
Direct code (No executeBlocking) No No No
vertx.executeBlocking(ordered) Yes From-24 No*1
vertx.executeBlocking(not ordered) Yes No From-115*2
workerExecutor.executeBloking(ordered) Yes Yes Yes*1
workerExecutor.executeBloking(not ordered) Yes Yes Yes*2
javaExecutor.process(ordered) Yes Yes Yes*1
javaExecutor.process(not ordered) Yes Yes Yes
Using Async=true
Direct code (No executeBlocking) No No From-337
vertx.executeBlocking(ordered) Yes From-69 No*1
vertx.executeBlocking(not ordered) Yes From-58 From-59*2
workerExecutor.executeBloking(ordered) Yes Yes Yes*1
workerExecutor.executeBloking(not ordered) Yes Yes Yes*2
javaExecutor.process(ordered) Yes Yes Yes*1
javaExecutor.process(not ordered) Yes Yes Yes*2

Notes:
*1 Messages are not processed in ordinal order.
*2 First 20 messages (thread pool size) processed are not the first 20 sent messages by ping.

Source code is available here: https://github.com/tetv/vertx3-blocking

The tests were executed on:

  • Ubuntu 16.04 LTS
  • Java 1.8.0_101-b13

@tetv
Copy link
Author

tetv commented Sep 19, 2016

Updated the test source code with junit tests (now using vertx 3.3.3).

v Test Type / Pong Verticle Type > Standard Worker MT Worker
Direct code (No executeBlocking) ✖️≥ 2 ✖️≥ 2 ✖️≥ 40
vertx.executeBlocking(ordered) ✔️ ✖️≥ 2 ✖️≥ 2
vertx.executeBlocking(not ordered) ✔️ ✖️≥ 40 ✖️≥ 40
workerExecutor.executeBloking(ordered) ✔️ ✔️ ✔️
workerExecutor.executeBloking(not ordered) ✔️ ✔️ ✔️
javaExecutor.process(ordered) ✔️ ✔️ ✔️
javaExecutor.process(not ordered) ✔️ ✔️ ✔️

Note: The -Dvertx.hazelcast.async-api=true doesn't influence the result.

There is also an identical issue but for non clustered vertx: eclipse-vertx/vert.x#1631

@tsegismont
Copy link
Contributor

Closed via #54

@tetv
Copy link
Author

tetv commented Mar 11, 2017

I have updated to vertx 3.4.0, and the same issues described above are still happening.
By the way I also have simplified and improve logging in the junit test project.

@vietj
Copy link
Contributor

vietj commented Mar 11, 2017

@tetv we have decided that having a message sent in a worker delayed until the blocking tasks is finished is a normal thing and we are not going to try to solve it. Blocking in Vert.x should be done to wrap blocking API and not used for waiting asynchronous responses.

@tetv
Copy link
Author

tetv commented Mar 11, 2017

Thank you @vietj but it doesn't make much sense for me, at least in a first glance.

In scenarios which a worker executing blocking code sends a message to Y, but because there are 1000 messages in the queue (to be processed) waiting for a worker, then only after these 1000 messages are effectively processed, then the message to Y is effectively sent. Does it make sense?

@vietj
Copy link
Contributor

vietj commented Mar 11, 2017

can you explain more in detail the sequence of operations ?

@tetv
Copy link
Author

tetv commented Mar 11, 2017

Sure @vietj, here it is:

  • There are 1000 messages (M1 to M1000) in the queue to be processed (all workers are busy)
  • Worker W1 is processing M0 and is executing blocking code
  • Worker W1 sends a message to Y (but the message is not effectively sent)
  • Worker W1 finishes processing M0
  • Worker W1 starts processing M1 (first message of 1000 that are in the queue to be processed)
  • ... Several workers start processing M2...M1000
  • Worker WN is free now and is effectively sending message to Y after a long time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

4 participants