Skip to content

Commit

Permalink
fix: add service not ready check (#757)
Browse files Browse the repository at this point in the history
### Motivation

We are using this client with AuthenticationOAuth2 to connect the Pulsar proxy.

If the OAuth2 token is expired, it still works, but if I then restarted the broker service, the client fails to reconnect, you can see these log on the client:

```
time="2022-03-25T22:34:59Z" level=info msg="[Reconnecting to broker in  1m4.607284318s]" producerID=853 producer_name=test-2-9-81-426 topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]" error=ServiceNotReady message="Disconnected from server at test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650" serviceURL="pulsar+ssl://test-2-9.cv-pulsar.sn3.dev:6651" topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=warning msg="[Failed to lookup topic]" error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276 topic="persistent://private/test/topic-52"
time="2022-03-25T22:35:03Z" level=error msg="[Failed to create producer at reconnect]" error=ServiceNotReady producerID=553 producer_name=test-2-9-81-276 topic="persistent://private/test/topic-52"
```


These errors and warnings on the proxy:
```
2022-03-25T22:36:22,539+0000 [pulsar-proxy-io-2-2] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x5cf3d820, L:/10.24.10.175:44760 - R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650]] Connected to server
2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, L:/10.24.10.175:44760 - R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received error from server: Unable to authenticate
2022-03-25T22:36:22,540+0000 [pulsar-proxy-io-2-2] ERROR org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, L:/10.24.10.175:44760 - R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Failed to authenticate the client
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, L:/10.24.10.175:44760 - R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Received unknown request id from server: -1
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x5cf3d820, L:/10.24.10.175:44760 ! R:test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650] Disconnected
2022-03-25T22:36:22,541+0000 [pulsar-proxy-io-2-2] WARN  org.apache.pulsar.proxy.server.LookupProxyHandler - [persistent://private/test/topic-13] failed to get Partitioned metadata : Disconnected from server at test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
org.apache.pulsar.client.api.PulsarClientException$ConnectException: Disconnected from server at test-2-9-broker.cv-pulsar.svc.cluster.local/10.88.25.238:6650
	at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:266) [io.streamnative-pulsar-client-original-2.9.2.9.jar:2.9.2.9]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:392) [io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:357) [io.netty-netty-codec-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) [io.netty-netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) [io.netty-netty-transport-classes-epoll-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]

```

In this case, we need to close this connection to re-authentication.

### Modifications

Check the `ServerError` error from the cmd response, if this error equals the`ServerError_ServiceNotReady`, we need to disconnect the proxy.
  • Loading branch information
nodece authored Apr 12, 2022
1 parent 85d7661 commit 64bfd32
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,12 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
c.handleResponse(cmd.ProducerSuccess.GetRequestId(), cmd)

case pb.BaseCommand_PARTITIONED_METADATA_RESPONSE:
c.checkServerError(cmd.PartitionMetadataResponse.Error)
c.handleResponse(cmd.PartitionMetadataResponse.GetRequestId(), cmd)

case pb.BaseCommand_LOOKUP_RESPONSE:
lookupResult := cmd.LookupTopicResponse
c.checkServerError(lookupResult.Error)
c.handleResponse(lookupResult.GetRequestId(), cmd)

case pb.BaseCommand_CONSUMER_STATS_RESPONSE:
Expand Down Expand Up @@ -574,6 +576,16 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
}
}

func (c *connection) checkServerError(err *pb.ServerError) {
if err == nil {
return
}

if *err == pb.ServerError_ServiceNotReady {
c.Close()
}
}

func (c *connection) Write(data Buffer) {
c.writeRequestsCh <- data
}
Expand Down

0 comments on commit 64bfd32

Please sign in to comment.