-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error: Failed to send message: AlreadyClosed #78
Comments
Is there any kind of activity here to solve this issue? $ docker run -it \
-p 6650:6650 \
-p 8080:8080 \
-v $PWD/data:/pulsar/data \
apachepulsar/pulsar:2.0.0-rc1-incubating \
bin/pulsar standalone And here is the pulsar server logs: 18:33:03.492 [pulsar-io-50-4] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:55506
18:33:03.607 [pulsar-io-50-4] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506][persistent://public/default/my-topic] Creating producer. producerId=0
18:33:03.722 [Thread-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506] persistent://public/default/my-topic configured with schema false
18:33:03.728 [Thread-5] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/my-topic}, client=/172.17.0.1:55506, producerName=standalone-8-2, producerId=0}
18:33:19.431 [pulsar-io-50-4] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:55506
18:33:20.480 [pulsar-web-57-1] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [04/Mar/2020:18:33:20 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 882 "-" "Pulsar-Java-v2.5.0" 9 I see on log |
I have the same problem. Sending a message immediately after initializing the producer works fine, so it can't be a TLS handshake issue. The connection seems to timeout after a second or so. |
@alijmlzd did you try rolling back to an older version? Any luck finding one? |
@alexyans No, but I will try that and share my result. |
I tried 2.4 of the client as it was the one recommended by Kafkaesque, but the problem was still there for me. Didn't bother going further back as this is unspecified behavior and most likely caused by the pulsar server, and I have no control of what version Kafkaesque uses. It appears that the server explicitly closes the connection for some reason. For my use case, I worked around the problem by always creating the producer right before i emit a message. Flushing and closing after use should prevent leaks. |
Oh, as a quick fix its a good idea. but what if I have about more than 1000 send messages per second? Is it ok to create a producer, send and then flush and close the producer each time? |
That would be hard to tell without looking at the code. We know the client uses a connection pool for the original client connection, but it's not clear to me whether the producers/consumers create separate connections or piggyback on the one client connection. I'm not exactly sure what happens when you create a producer, it could be cheaper than it looks. But yeah, I share your frustration. Hope it gets resolved soon. |
It seems that Here is an example. const Pulsar = require("pulsar-client");
async function produceTest(timeout) {
const client = new Pulsar.Client({
serviceUrl: "pulsar://localhost:6650"
});
const producer = await client.createProducer({
topic: "persistent://public/default/my-topic"
});
await new Promise((resolve, reject) => {
setTimeout(() => {
producer.send({
data: Buffer.from("My Message")
});
resolve();
}, timeout);
});
await client.close();
}
produceTest(1000);
produceTest(9000); |
@equanz your example handles the rejection, sure. But there shouldn't be a rejection in the first place. A producer closing after some timeout is unspecified behavior. It also drifts from the convention: a producer connection should remain open as long as you need it open. Otherwise there would be no need to explicitly close it as per the docs. If it closes itself no matter what you do, it is safe from leaks anyway. |
As above, it seems that client has already been destructed when executing Here is the destructor implementation in C++ client(Node.js client wraps C++ client) I think you need to avoid client destruction before producing. |
"AlreadyClosed" indicates that the client is already closed. You need to avoid client destruction. See a producer example: http://pulsar.apache.org/docs/en/client-libraries-node/#producer-example |
How I avoid client destruction? In the following example, I create a producer and produce a message every 1 second. const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650'
});
// Create a producer
const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
sendTimeoutMs: 30000,
batchingEnabled: true,
});
let i =0;
async function produceMessage() {
i = i + 1;
const msg = `my-message-${i}`;
const ret = await producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
console.log(ret);
setTimeout(produceMessage, 1000);
}
produceMessage();
})(); Output:
|
This is a playground example though. In a real system you can't necessarily control the rate of messages produced, as they correspond to real-time events. You could send mock keep-alive messages but it's ugly and a waste of resoures. |
@alexyans yeah this is an example, but in real system, I want to use the same producer and publish messages every now and then. we have a system that publish a message every one minute, in this case I will need to create new producer every-time. I think there is no ref maintaned to producer and this why nodejs decided to close the producer. I think we need to fix this. |
There are two sides to this, cause I experienced it! 2 - It might be that you created a producer with one connection and sending messages via another connection that is not meant for that producer - you will get alreadyclosed exception even if the producer is alive! |
Hey! I see you what you are writing here, but this is a bug in the client. Here is the go client, doing the same thing I did with my nodejs example (#78 (comment)) - https://gist.github.com/yosiat/97460fb4dee41c5d0d0f6873a31c77d9 The go code is producing all messages, while the node client is dying after some time. the problem in my opinion that my function is maintaining a reference to the producer but no to the client which cause NodeJS/V8 to garage collect the client since it's not being referenced. @eaba my example has only one client and one producer. |
okay if that's true, i think you're right about the root cause. @sijie how do you want to proceed? |
@sijie sounds like a good approach, I tried my approach of creating napi ref, but couldn't find a way to implement it. I will implement your approach and create a pull request for that. |
@sijie tried out creating a reference to a client from the producer but it didn't help (tried with InstanceAccesor) and it created a weird dependency on the code which I didn't feel comfortable doing. But I found out how to create a reference for a client and un-reference it when the client closes the connection. This PR - #85 fixed the code I wrote above :) |
@yosiat thank you!! |
@alexyans happy to help :) |
I'm new to Pulsar and I'm having the exact same issue, when running the official examples using:
Official const Pulsar = require('pulsar-client');
(async () => {
// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
// Create a producer
const producer = await client.createProducer({
topic: 'my-topic',
});
// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();
await producer.close();
await client.close();
})(); Output of running
Environment info: Output of
I installed the C++ client with the installation instructions here: https://pulsar.apache.org/docs/client-libraries-cpp/#compilation Let me know if I should open another issue. I don't know C++ but it appears that the patch added in #85 isn't in current edit - 11f28a6#diff-4c2ad2d5dd2477f5e7deebe4d839fbde8c00e36b38321cdb1d92c39c07fb070dL154 possibly removed the fixed introduced by #85, though the fix appears to still be in [email protected] (currently in my node_modules). Thanks. |
@jbmusso |
@jbmusso To fix it, for example, simply await Producer#send. For more detail, please see: apache/pulsar-client-cpp#51 |
Hi there,
Why this example with 1 second timeout works fine:
But this example with a few more seconds timeout occurs an error?
Full error message:
The text was updated successfully, but these errors were encountered: