Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: follower fetch halts rather than switch to leader if replica is stopped/restarted #2090

Closed
lizthegrey opened this issue Dec 15, 2021 · 8 comments

Comments

@lizthegrey
Copy link
Contributor

lizthegrey commented Dec 15, 2021

Versions
Sarama Kafka Go
v1.30.1 v3.0.0 (Confluent v7.0.1) v1.17.1
Configuration
func newSaramaConfig(version sarama.KafkaVersion) *sarama.Config {
        config := sarama.NewConfig()
        config.Version = version
        if hostname, err := os.Hostname(); err == nil && hostname != "" {
                config.ClientID = hostname
        }
        isDev := os.Getenv("ENV") == "development"
		config.RackID = os.Getenv("AZ")
        config.Metadata.AllowAutoTopicCreation = isDev
        config.Producer.Return.Successes = true
        config.Producer.Partitioner = sarama.NewManualPartitioner
        config.Producer.Compression = sarama.CompressionZSTD
        config.Producer.Flush.Messages = 2000
        config.Producer.Flush.Frequency = 500 * time.Millisecond
        config.MetricRegistry = metrics.DefaultRegistry
        config.Metadata.Retry.Max = 20
        config.Metadata.Retry.Backoff = 250 * time.Millisecond
        config.Producer.Retry.Max = 3
        config.Producer.Retry.Backoff = 150 * time.Millisecond
        config.ChannelBufferSize = 131072
        config.Consumer.MaxWaitTime = 500 * time.Millisecond // default 250ms
        config.Consumer.Fetch.Min = 1024 * 1024              // default 1 byte
        config.Consumer.Fetch.Default = 10 * 1024 * 1024     // bytes, default 1MB

        return config
}
Logs

Broker 1289 is the leader; broker 1290 is our local replica in us-east-1b

logs: CLICK ME


[2021-12-15 23:21:09,592] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fe
tcherId=1] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,593] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fe
tcherId=1] Error sending fetch request (sessionId=1420514310, epoch=410982) to n
ode 1290: (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Client was shutdown before response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkCli
entUtils.java:109)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlo
ckingSend.scala:122)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:272)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:519)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)
[2021-12-15 23:21:09,594] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=1] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,594] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,596] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,596] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,597] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,599] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=2] Shutting down (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,599] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=2] Stopped (kafka.server.ReplicaFetcherThread)
[2021-12-15 23:21:09,599] INFO [ReplicaFetcher replicaId=1289, leaderId=1290, fetcherId=2] Shutdown completed (kafka.server.ReplicaFetcherThread)

[2021-12-15 23:22:31,384] INFO [Partition honeycomb-prod.retriever-mutation-45 b
roker=1289] Expanding ISR from 1291,1289 to 1291,1289,1290. Adding Replicas: 129
0, Adding Observers: . (kafka.cluster.Partition)
[2021-12-15 23:22:31,389] INFO [Partition honeycomb-prod.retriever-mutation-45 b
roker=1289] ISR updated to 1291,1289,1290 and version updated to [275] (kafka.cl
uster.Partition)
2021/12/15 22:54:23 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined
time="2021-12-15T22:54:23.819342721Z" level=info msg="Debug service listening on localhost:6060"
time="2021-12-15T22:54:23.865122484Z" level=info msg="init finished" az=us-east-1b azSource=env
time="2021-12-15T22:54:23.90819463Z" level=info msg="init finished" az=us-east-1b azSource=env
time="2021-12-15T22:54:23Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T22:54:23.938697636Z" level=info msg="Started consumer" offset=83721780247 partition=45 topic=honeycomb-prod.retriever-mutation
time="2021-12-15T22:54:23.938809265Z" level=info msg="writer HTTP server at :8090"
time="2021-12-15T22:54:23.990433201Z" level=info msg="Evicted from kennel? Will try to recover"
time="2021-12-15T22:54:23.991823014Z" level=warning msg="Error Announce()ing self to kennel, will try again later" error="Error 1062: Duplicate entry 'retriever-011c744d6127dded6' for key 'state.hostname'\n/mnt/ramdisk/hound/retriever/kennel/kennel.go:239                        (*DefaultMaster).Announce\n/mnt/ramdisk/go/pkg/mod/github.com/sirupsen/[email protected]/exported.go:56 WithError\n/usr/local/go/src/runtime/asm_arm64.s:1133                               goexit"
time="2021-12-15T22:54:26Z" level=info msg="consumer/broker/1289 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T22:54:27Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T22:55:24.129418648Z" level=info msg="Marked node as 'live'" checkpointOffset=83723422400 kafkaOffset=83723426319
time="2021-12-15T23:04:23Z" level=info msg="client/metadata got error from broker -1 while fetching metadata: EOF\n"
time="2021-12-15T23:17:31.180915735Z" level=info msg="Shutting down..."
2021/12/15 23:17:31 rpc.Serve: accept:accept unix /var/lib/retriever/ipc.sock: use of closed network connection
time="2021-12-15T23:17:31Z" level=info msg="consumer/broker/1289 closed dead subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:31.399286856Z" level=info msg="Consumer for honeycomb-prod.retriever-mutation:45 stopped"
time="2021-12-15T23:17:37Z" level=info msg="init finished" az=us-east-1b azSource=env
2021/12/15 23:17:37 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined
time="2021-12-15T23:17:37.795678138Z" level=info msg="Debug service listening on localhost:6060"
time="2021-12-15T23:17:37.83935365Z" level=info msg="init finished" az=us-east-1b azSource=env
time="2021-12-15T23:17:37.881483511Z" level=info msg="init finished" az=us-east-1b azSource=env
[LaunchDarkly] 2021/12/15 23:17:37 INFO: Starting LaunchDarkly client 5.0.2
[LaunchDarkly] 2021/12/15 23:17:37 INFO: Starting LaunchDarkly streaming connection
[LaunchDarkly] 2021/12/15 23:17:37 INFO: Waiting up to 30000 milliseconds for LaunchDarkly client to start...
[LaunchDarkly] 2021/12/15 23:17:37 INFO: Connecting to LaunchDarkly stream
[LaunchDarkly] 2021/12/15 23:17:37 INFO: LaunchDarkly streaming is active
[LaunchDarkly] 2021/12/15 23:17:37 INFO: Initialized LaunchDarkly client
time="2021-12-15T23:17:37.906031798Z" level=info msg="init finished" az=us-east-1b azSource=env
time="2021-12-15T23:17:37Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:37.913246534Z" level=info msg="Started consumer" offset=83743247313 partition=45 topic=honeycomb-prod.retriever-mutation
time="2021-12-15T23:17:37.91336211Z" level=info msg="writer HTTP server at :8090"
time="2021-12-15T23:17:38.416476647Z" level=info msg="Marked node as 'live'" checkpointOffset=83743247312 kafkaOffset=83743457646
time="2021-12-15T23:17:40Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:17:40Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:42Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:17:42Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:42Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:17:42Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:44Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:17:44Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:17:47Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:17:47Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:21:09Z" level=info msg="consumer/broker/1290 disconnecting due to error processing FetchRequest: EOF\n"
time="2021-12-15T23:21:09Z" level=info msg="kafka: error while consuming honeycomb-prod.retriever-mutation/45: EOF"
time="2021-12-15T23:21:11Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:21:11Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:21:11Z" level=info msg="Failed to connect to broker ip-10-0-185-142.ec2.internal:9092: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:11Z" level=info msg="Error while sending ApiVersionsRequest to broker ip-10-0-185-142.ec2.internal:9092: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:11Z" level=info msg="consumer/broker/1290 disconnecting due to error processing FetchRequest: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:11Z" level=info msg="kafka: error while consuming honeycomb-prod.retriever-mutation/45: dial tcp 10.0.185.142:9092: connect: connection refused"
time="2021-12-15T23:21:13Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:21:13Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:21:13Z" level=info msg="Failed to connect to broker ip-10-0-185-142.ec2.internal:9092: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:13Z" level=info msg="Error while sending ApiVersionsRequest to broker ip-10-0-185-142.ec2.internal:9092: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:13Z" level=info msg="consumer/broker/1290 disconnecting due to error processing FetchRequest: dial tcp 10.0.185.142:9092: connect: connection refused\n"
time="2021-12-15T23:21:13Z" level=info msg="kafka: error while consuming honeycomb-prod.retriever-mutation/45: dial tcp 10.0.185.142:9092: connect: connection refused"
time="2021-12-15T23:21:15Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:21:15Z" level=info msg="client/broker remove invalid broker #1290 with ip-10-0-185-142.ec2.internal:9092"
time="2021-12-15T23:21:15Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
-------
Broker 1290 resumes accepting connections around 23:22Z, but has fallen out of ISR and needs to catch up.
-------
time="2021-12-15T23:23:38.231301104Z" level=info msg="Marked node as 'consuming'" checkpointOffset=83747678208 kafkaOffset=83748646787
  (this indicates that we are now 100000 offsets behind the latest kafka offset known to Sarama)
-------
Broker 1290 re-enters ISR for partition 45 around 23:25Z
-------
time="2021-12-15T23:27:40Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:27:40Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:44Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:27:46Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:48Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:27:48Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:50Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:27:50Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:52Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:27:53Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:54Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:27:54Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:54Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:27:54Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:56Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:27:56Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:27:58Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:27:58Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:03Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:28:04Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:06Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:06Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:08Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:08Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:12Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:28:14Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:16Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:16Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:19Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:19Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:23Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:28:24Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:26Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:26Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:28Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:28Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:32Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:28:33Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:35Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:35Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:38Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:38Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:45Z" level=info msg="consumer/broker/1290 abandoned subscription to honeycomb-prod.retriever-mutation/45 because consuming was taking too long\n"
time="2021-12-15T23:28:47Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:49Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:49Z" level=info msg="consumer/broker/1289 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:28:52Z" level=info msg="consumer/honeycomb-prod.retriever-mutation/45 finding new broker\n"
time="2021-12-15T23:28:52Z" level=info msg="consumer/broker/1290 added subscription to honeycomb-prod.retriever-mutation/45\n"
time="2021-12-15T23:29:37.942178463Z" level=info msg="Marked node as 'live'" checkpointOffset=83753962338 kafkaOffset=83753969084

Problem Description

When a broker is restarted that we are follower fetching from, Sarama stays with the bad broker waiting for it to come back instead of going to the leader. It remains hung and no messages are consumed until the restarted broker resumes accepting connections AND has re-entered ISR for the partition being read.

@lizthegrey
Copy link
Contributor Author

https://share.getcloudapp.com/v1uLoRK5

Screenshot 2021-12-15 3 59 48 PM

Here are the unique stacks found by go tool trace during the locked up window

runtime.chanrecv2:444
github.com/Shopify/sarama.(*Broker).responseReceiver:940
github.com/Shopify/sarama.withRecover:43

sync.(*WaitGroup).Wait:130
github.com/Shopify/sarama.(*brokerConsumer).subscriptionConsumer:823
github.com/Shopify/sarama.withRecover:43

runtime.chanrecv2:444
github.com/Shopify/sarama.(*partitionConsumer).responseFeeder:464
github.com/Shopify/sarama.withRecover:43

runtime.selectgo:327
github.com/Shopify/sarama.(*Broker).sendAndReceive:868
github.com/Shopify/sarama.(*Broker).Fetch:379
github.com/Shopify/sarama.(*brokerConsumer).fetchNewMessages:955
github.com/Shopify/sarama.(*brokerConsumer).subscriptionConsumer:812
github.com/Shopify/sarama.withRecover:43

internal/poll.(*FD).Read:167
net.(*netFD).Read:56
net.(*conn).Read:183
bufio.(*Reader).Read:227
github.com/Shopify/sarama.(*bufConn).Read:107
io.ReadAtLeast:328
io.ReadFull:347
github.com/Shopify/sarama.(*Broker).readFull:796
github.com/Shopify/sarama.(*Broker).responseReceiver:952
github.com/Shopify/sarama.withRecover:43

syscall.read:687
syscall.Read:189
internal/poll.ignoringEINTRIO:582
internal/poll.(*FD).Read:163
net.(*netFD).Read:56
net.(*conn).Read:183
bufio.(*Reader).Read:227
github.com/Shopify/sarama.(*bufConn).Read:107
io.ReadAtLeast:328
io.ReadFull:347
github.com/Shopify/sarama.(*Broker).readFull:796
github.com/Shopify/sarama.(*Broker).responseReceiver:952
github.com/Shopify/sarama.withRecover:43

@lizthegrey
Copy link
Contributor Author

lizthegrey commented Dec 16, 2021

actually, not convinced this isn't apache/kafka#10326 rearing its head again. sigh. it looks like what caused things to wake back up again was the broker re-entering ISR for the partition, rather than the broker coming back at all. so yes, this is an odd interaction between the Sarama library and the Kafka broker.

@lizthegrey
Copy link
Contributor Author

lizthegrey commented Dec 16, 2021

based on what I'm reading, we got punted off 1290 and had an active subscription to 1289 the whole time we weren't processing any messages, but it was not feeding us any messages (or we were ignoring the results), until 1290 re-entered ISR at which point it sent us a message that we understood to mean 1290 was the new preferred broker and we should re-dispatch. so why wasn't the leader giving us responses to Fetch, while our high watermark reader in separate thread was indicating we were further and further behind?

@dnwe
Copy link
Collaborator

dnwe commented Dec 17, 2021

Thanks for the detailed report, this sounds interesting and I look forward to diving into it, but unfortunately I’m on vacation now and so it’s unlikely I’ll get a chance to properly look into it until the New Year

@lizthegrey
Copy link
Contributor Author

happy new year! let me know if/when you'd like to pair to look at this.

dnwe added a commit that referenced this issue Jan 12, 2022
After Sarama had been given a preferred replica to consume from, it was
mistakenly latching onto that value and not unsetting it in the case
that the preferred replica broker was shutdown and left the cluster
metadata.

Fetches continued to work as long as that broker remained shutdown,
because they were now being sent to the Leader, which would service them
itself as it had no better preferred replica to point the client at.

However, consumption would then hang after the broker came back online,
because the Leader would stop returning records in the FetchResponse and
would instead just return the preferred replicaID, expecting the
client to send its FetchRequests over there. However, because the
partitionConsumer had latched the value of preferredReplica it never
dispatched to (re-)connect to the preferred replica and instead just
continued to send FetchRequests to the leader and received no records
back.

Contributes-to: #2090

Signed-off-by: Dominic Evans <[email protected]>
@dnwe
Copy link
Collaborator

dnwe commented Jan 12, 2022

@lizthegrey sorry for not being in touch before now, it has been a busy start to the year, but I finally managed to carve out some time to look into this problem. I put together a small functional test to exercise the behaviour and immediately spotted one bug in the current implementation that I've pushed a fix for under PR #2108

I tried to describe the issue I found in the description to that PR. I'm not completely 100% certain this was the issue that you hit in your testing, based on how you described when consumption stalled and when it successfully resumed, but I wonder if you might be able to re-test your scenario with go get github.com/Shopify/sarama@2441dcb for me? Thanks again for the detailed bug report!

@lizthegrey
Copy link
Contributor Author

That indeed sounds like the bug we were seeing, thanks! We'll retry in the next few days and get back to you.

@lizthegrey
Copy link
Contributor Author

Yup, the proposed fix indeed fixed it.

niamster pushed a commit to niamster/sarama that referenced this issue Apr 7, 2022
After Sarama had been given a preferred replica to consume from, it was
mistakenly latching onto that value and not unsetting it in the case
that the preferred replica broker was shutdown and left the cluster
metadata.

Fetches continued to work as long as that broker remained shutdown,
because they were now being sent to the Leader, which would service them
itself as it had no better preferred replica to point the client at.

However, consumption would then hang after the broker came back online,
because the Leader would stop returning records in the FetchResponse and
would instead just return the preferred replicaID, expecting the
client to send its FetchRequests over there. However, because the
partitionConsumer had latched the value of preferredReplica it never
dispatched to (re-)connect to the preferred replica and instead just
continued to send FetchRequests to the leader and received no records
back.

Contributes-to: IBM#2090

Signed-off-by: Dominic Evans <[email protected]>
niamster pushed a commit to DataDog/sarama that referenced this issue Apr 20, 2022
After Sarama had been given a preferred replica to consume from, it was
mistakenly latching onto that value and not unsetting it in the case
that the preferred replica broker was shutdown and left the cluster
metadata.

Fetches continued to work as long as that broker remained shutdown,
because they were now being sent to the Leader, which would service them
itself as it had no better preferred replica to point the client at.

However, consumption would then hang after the broker came back online,
because the Leader would stop returning records in the FetchResponse and
would instead just return the preferred replicaID, expecting the
client to send its FetchRequests over there. However, because the
partitionConsumer had latched the value of preferredReplica it never
dispatched to (re-)connect to the preferred replica and instead just
continued to send FetchRequests to the leader and received no records
back.

Contributes-to: IBM#2090

Signed-off-by: Dominic Evans <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants