Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer.Close() hangs in syscall to rd_kafka_destroy #463

Open
4 of 7 tasks
ekoutanov opened this issue Apr 30, 2020 · 7 comments
Open
4 of 7 tasks

Producer.Close() hangs in syscall to rd_kafka_destroy #463

ekoutanov opened this issue Apr 30, 2020 · 7 comments
Assignees

Comments

@ekoutanov
Copy link

Description

The problem occurred as part of volume testing goneli with aggressive injection of random faults, with a Kafka broker that has been (intentionally) subjected to very high load.

The test code frequently creates and destroys consumer and producer instances.

On one such occasion, Producer.Close() was left in a blocked state.

The stack trace shows this:

goroutine 6567 [syscall, 58 minutes]:
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka._Cfunc_rd_kafka_destroy(0x7ff638024670)
_cgo_gotypes.go:1554 +0x63
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka.(*Producer).Close.func1(0xc005b4a370)
/home/emil/go/pkg/mod/gopkg.in/confluentinc/[email protected]/kafka/producer.go:366 +0x78
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka.(*Producer).Close(0xc005b4a370)
/home/emil/go/pkg/mod/gopkg.in/confluentinc/[email protected]/kafka/producer.go:366 +0xd3
github.com/obsidiandynamics/goneli.(*neli).Close(0xc005b6c200, 0x0, 0x0)
/home/emil/go/pkg/mod/github.com/obsidiandynamics/[email protected]/neli.go:363 +0x1ad

How to reproduce

Rapidly cycle through producer instances: open producer, produce some messages, close producer with some in-flight messages without flushing or waiting for message confirmations. (I suspect this last point is pivotal to being able to reproduce this.)

The specific test that caused the issue is here; however, it took several days of uninterrupted running for the issue to surface.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 1.4
  • Apache Kafka broker version: 2.4.0
  • Client configuration: ConfigMap{...}
  • Operating system: Fedora 31 and 32
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts: Broker logs are clean
  • Critical issue
@ekoutanov
Copy link
Author

Looking at Close():

func (p *Producer) Close() {
	// Wait for poller() (signaled by closing pollerTermChan)
	// and channel_producer() (signaled by closing ProduceChannel)
	close(p.pollerTermChan)
	close(p.produceChannel)
	p.handle.waitGroup.Wait()

	close(p.events)

	p.handle.cleanup()

	C.rd_kafka_destroy(p.handle.rk)
}

Control flow makes it through most of the method, stalling on the last line. This means that the channels are closed before attempting to destroy, which means no delivery notifications will be processed from this point.

I'm wondering what would happen if we're trying to close a producer with in-flight messages, where there are outstanding delivery notifications. What if a notification arrives while we are halfway through Close(). Is it possible that a message is instantiated in the C code and is being referenced, which causes the destructor to block indefinitely until the reference is released? @edenhill

This is pure speculation, of course, as I'm unfamiliar with the workings of librdkafka.

@ekoutanov
Copy link
Author

Some more data: it appears the issue is not only bound to producers, but can also affect consumers. Here is a trace showing a hung consumer in Close():

goroutine 47494 [syscall, 50 minutes]:
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka._Cfunc_rd_kafka_destroy(0x7fe108078110)
	_cgo_gotypes.go:1554 +0x63
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka.(*Consumer).Close.func3(0xc0001da000)
	/home/emil/go/pkg/mod/gopkg.in/confluentinc/[email protected]/kafka/consumer.go:350 +0x78
gopkg.in/confluentinc/confluent-kafka-go.v1/kafka.(*Consumer).Close(0xc0001da000, 0xc72880, 0xd9e148)
	/home/emil/go/pkg/mod/gopkg.in/confluentinc/[email protected]/kafka/consumer.go:350 +0x168
github.com/obsidiandynamics/goneli.(*neli).Close(0xc0001f8fc0, 0x0, 0x0)
	/home/emil/go/pkg/mod/github.com/obsidiandynamics/[email protected]/neli.go:378 +0x232

@agaurav
Copy link

agaurav commented May 15, 2020

i am seeing this as well with v1.4.0

@edenhill edenhill self-assigned this May 18, 2020
@edenhill
Copy link
Contributor

Are all other references to the consumer or producer deleted prior to calling Close()?
I.e., no other go-routine that uses the instance or reads off/writes to a channel that the producer or consumer is using?

@ekoutanov
Copy link
Author

For the consumer, yes. For the producer, there is just a goroutine consuming from the events channel.

@edenhill
Copy link
Contributor

Is there any possibility to spin up gdb or pstack to get a backtrace of the client when it is in a stalled state?

@loganrosen
Copy link

loganrosen commented Aug 8, 2023

I'm seeing this frequently as well with my consumer running 2.2.0. I ran gdb and looked at the backtraces for all threads – these seem to be the relevant ones. @edenhill is this helpful at all?

Thread 11 (Thread 0x7fe36cff9700 (LWP 114)):
#0  0x00007fe45596d017 in pthread_join () from /lib64/libpthread.so.0
#1  0x00000000008635a2 in thrd_join ()
#2  0x00000000008e627c in rd_kafka_destroy_app ()
#3  0x00000000004fd324 in runtime.asmcgocall.abi0 ()
#4  0x0000000000000001 in ?? ()
#5  0x000000c002726900 in ?? ()
#6  0x00007fe36cff5200 in ?? ()
#7  0x00000000004bf765 in runtime.mProf_Malloc.func1 ()
#8  0x000000000149dbb0 in runtime.memstats ()
#9  0x0000000000000298 in ?? ()
#10 0x000000c00024c820 in ?? ()
#11 0x00000000004fb4a9 in runtime.systemstack.abi0 ()
#12 0x0000000000000000 in ?? ()
Thread 67 (Thread 0x7fe417fff700 (LWP 36)):
#0  0x00007fe45596d017 in pthread_join () from /lib64/libpthread.so.0
#1  0x00000000008635a2 in thrd_join ()
#2  0x00000000008ebd9e in rd_kafka_destroy_internal ()
#3  0x00000000008ec8e0 in rd_kafka_thread_main ()
#4  0x00000000008632e7 in _thrd_wrapper_function ()
#5  0x00007fe45596bea5 in start_thread () from /lib64/libpthread.so.0
#6  0x00007fe455272b0d in clone () from /lib64/libc.so.6

Full backtrace log: output.log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants