Replies: 14 comments 4 replies
-
You shouldn’t need to make more than one producer for just one kafka cluster. |
Beta Was this translation helpful? Give feedback.
-
Thank you for chipping in. But the thing is, this transaction example in this library creates many producers rather than using a single instance which is confusing. I am lost on what is right and what is wrong for transactional batch producers. |
Beta Was this translation helpful? Give feedback.
-
Ah, in the case where you’re setting a It looks like it makes a producer with a transaction ID, publishes, and then puts it into a pool of transaction ids. This means that a transaction producer is only being used for one transaction at a time. This is a way of recycling transaction producers after they’ve been used, rather than throwing them away after they’ve been used. |
Beta Was this translation helpful? Give feedback.
-
So this is what I understand then: If there is a producer available in a pool for transactional usage, use that one. If it is already in-use, create a new producer which will come with a unique transaction id. |
Beta Was this translation helpful? Give feedback.
-
Well, this is a way of avoiding making unnecessary transaction-id’ed producers just to throw them away at the end. Sometimes, it might just be simpler to just generate a new tx-id producer and throw it away, because building the pool is complex and something that if put into production needs tests… so one-use tx-id producers could have utility in some cases. But if it’s going to stick around long-lived, you can get some value out of reusing tx-id producers at times. |
Beta Was this translation helpful? Give feedback.
-
Thank you for the help. |
Beta Was this translation helpful? Give feedback.
-
I've just noticed that using recycled producers throws Also, sometimes I see this: ==================
WARNING: DATA RACE
Write at 0x00c00010f310 by goroutine 251:
github.com/IBM/sarama.(*transactionManager).transitionTo()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:220 +0x330
github.com/IBM/sarama.(*asyncProducer).maybeTransitionToErrorState()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1276 +0xec
github.com/IBM/sarama.(*asyncProducer).returnError()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1286 +0xad
github.com/IBM/sarama.(*asyncProducer).retryMessage()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1323 +0x109
github.com/IBM/sarama.(*asyncProducer).retryMessages()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1332 +0x8c
github.com/IBM/sarama.(*brokerProducer).handleError.func2()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1197 +0x8f4
github.com/IBM/sarama.(*produceSet).eachPartition()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/produce_set.go:223 +0x89b
github.com/IBM/sarama.(*brokerProducer).handleError()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1196 +0x3cd
github.com/IBM/sarama.(*brokerProducer).handleResponse()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1060 +0x84
github.com/IBM/sarama.(*brokerProducer).run()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:993 +0x1a93
github.com/IBM/sarama.(*brokerProducer).run-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x33
Previous read at 0x00c00010f310 by goroutine 252:
github.com/IBM/sarama.(*transactionManager).publishTxnPartitions()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/transaction_manager.go:766 +0x40e
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:818 +0x23c
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.gowrap2()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x33
Goroutine 251 (running) created at:
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:791 +0x4f3
github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
github.com/IBM/sarama.(*partitionProducer).updateLeader()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
github.com/IBM/sarama.(*partitionProducer).dispatch()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
Goroutine 252 (running) created at:
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:794 +0x690
github.com/IBM/sarama.(*asyncProducer).getBrokerProducer()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:1343 +0x133
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil.(*partitionProducer).updateLeader.func1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:765 +0x244
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:94 +0x83
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:95 +0x3e
github.com/eapache/go-resiliency/breaker.(*Breaker).Run()
/Users/inan/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:58 +0x88
github.com/IBM/sarama.(*partitionProducer).updateLeader()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:756 +0xca
github.com/IBM/sarama.(*partitionProducer).updateLeaderIfBrokerProducerIsNil()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:619 +0xcb
github.com/IBM/sarama.(*partitionProducer).dispatch()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:689 +0xd08
github.com/IBM/sarama.(*partitionProducer).dispatch-fm()
<autogenerated>:1 +0x33
github.com/IBM/sarama.withRecover()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x41
github.com/IBM/sarama.(*asyncProducer).newPartitionProducer.gowrap1()
/Users/inan/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:600 +0x33
================== |
Beta Was this translation helpful? Give feedback.
-
Hm, the error being reported seems to indicate that there is concurrent use of the transaction producer, which shouldn’t be done. Like I said, recycling is a lot more complex, and takes a lot of attention to detail with an assurance that use does not continue after it has been returned to a pool. |
Beta Was this translation helpful? Give feedback.
-
I think the txn and exactly once examples here will cause issues to people because both of them are a bit messy and misleading. To me they are the root cause of future buggy implementations. It would be better to just produce rather than produce and consume in same example. |
Beta Was this translation helpful? Give feedback.
-
Another problem is that I use |
Beta Was this translation helpful? Give feedback.
-
Hm… I cannot really say. I think we’re starting to wander into the areas beyond my knowledge of the package. 😂 |
Beta Was this translation helpful? Give feedback.
-
It sounds like, in concurrent environments we should not reuse producers and pools because I am not sure where to go with this whole thing! |
Beta Was this translation helpful? Give feedback.
-
I think the problem is because So technically it is impossible, even to think about, reusing producers in clients when async producers are being imposed in this library. |
Beta Was this translation helpful? Give feedback.
-
Hello, have you found a solution to this problem? |
Beta Was this translation helpful? Give feedback.
-
Hi,
I can see that discussions don't draw much attention but I am going to ask a very simple question with a hope that someone would answer!
Do we create a new producer in a loop then reuse it for transactions OR do we create a new for each iterations?
Thanks
Note:
kafka.NewTXNProducer()
creates a brand new sarama producer.Beta Was this translation helpful? Give feedback.
All reactions