Skip to content

Commit

Permalink
[improve][broker] ServerCnx: go to Failed state when auth fails (apac…
Browse files Browse the repository at this point in the history
…he#19312)

PIP: apache#12105

When authentication fails in the `ServerCnx`, the state is left in `Start` if the primary `authData` fails authentication and in `Connecting` or `Connected` if the `originalAuthData` authentication fails. To prevent any kind of unexpected behavior, we should go to `Failed` state.

Note that the tests verify the current behavior where a failed `originalAuthData` results first in a `Connected` command from the broker and then an `Error` command. I documented that I think this is sub optimal here apache#19311.

* Update `ServerCnx` state to `Failed` when there is an authentication exception during `handleConnect` and during `handleAuthResponse`.
* Update `handleAuthResponse` reply to `"Unable to authenticate"` instead of the `AuthenticationState` exception.

A new test is added. The added test covers the change made in apache#19295 where we updated `ServerCnx` so that we call `AuthState#authenticate` instead of relying on the implementation detail that the initialization calls `authenticate`. That PR should have added a test.

This is not a breaking change.

- [x] `doc-not-needed`

PR in forked repository: #18

(cherry picked from commit 8049690)
(cherry picked from commit 3ef3bf1)
(cherry picked from commit 467cd32)
  • Loading branch information
michaeljmarshall committed Feb 22, 2023
1 parent 85f0a85 commit 94de805
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ protected void handleConnect(CommandConnect connect) {
doAuthentication(clientData, false, clientProtocolVersion, clientVersion);
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
logAuthException(remoteAddress, "connect", getPrincipal(), Optional.empty(), e);
String msg = "Unable to authenticate";
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg));
Expand All @@ -891,11 +892,13 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) {
authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY);
} catch (AuthenticationException e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage());
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage()));
close();
} catch (Exception e) {
service.getPulsarStats().recordConnectionCreateFail();
state = State.Failed;
String msg = "Unable to handleAuthResponse";
log.warn("[{}] {} ", remoteAddress, msg, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import java.net.SocketAddress;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

/**
* Class that provides the same authentication semantics as the {@link MockAuthenticationProvider} except
* that this one initializes the {@link MockMultiStageAuthenticationState} class to support testing
* multistage authentication.
*/
public class MockMultiStageAuthenticationProvider extends MockAuthenticationProvider {

@Override
public String getAuthMethodName() {
return "multi-stage";
}

@Override
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
return new MockMultiStageAuthenticationState(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

import javax.naming.AuthenticationException;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Performs multistage authentication by extending the paradigm created in {@link MockAuthenticationProvider}.
*/
public class MockMultiStageAuthenticationState implements AuthenticationState {

private final MockMultiStageAuthenticationProvider provider;
private String authRole = null;

MockMultiStageAuthenticationState(MockMultiStageAuthenticationProvider provider) {
this.provider = provider;
}

@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first");
}
return null;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
String data = new String(authData.getBytes(), UTF_8);
String[] parts = data.split("\\.");
if (parts.length == 2) {
if ("challenge".equals(parts[0])) {
return AuthData.of("challenged".getBytes());
} else {
AuthenticationDataCommand command = new AuthenticationDataCommand(data);
authRole = provider.authenticate(command);
// Auth successful, no more auth required
return null;
}
}
throw new AuthenticationException("Failed to authenticate");
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return null;
}

@Override
public boolean isComplete() {
return authRole != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand All @@ -94,6 +95,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnected;
Expand Down Expand Up @@ -448,11 +450,84 @@ public void testConnectCommandWithAuthenticationNegative() throws Exception {
ByteBuf clientCommand = Commands.newConnect("none", "", null);
channel.writeInbound(clientCommand);

assertEquals(serverCnx.getState(), State.Start);
assertEquals(serverCnx.getState(), State.Failed);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
}

@Test(timeOut = 30000)
public void testConnectCommandWithFailingOriginalAuthData() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
svcConfig.setProxyRoles(Collections.singleton("proxy"));

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

ByteBuf clientCommand = Commands.newConnect(authMethodName, "pass.proxy", 1,null,
null, "client", "fail", authMethodName);
channel.writeInbound(clientCommand);

// We currently expect two responses because the originalAuthData is verified after sending
// a successful response to the proxy. Because this is a synchronous operation, there is currently
// no risk. It would be better to fix this. See https://github.com/apache/pulsar/issues/19311.
Object response1 = getResponse();
assertTrue(response1 instanceof CommandConnected);
Object response2 = getResponse();
assertTrue(response2 instanceof CommandError);
assertEquals(((CommandError) response2).getMessage(), "Unable to authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testAuthResponseWithFailingAuthData() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMultiStageAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();

when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

// Trigger connect command to result in AuthChallenge
ByteBuf clientCommand = Commands.newConnect(authMethodName, "challenge.client", "1");
channel.writeInbound(clientCommand);

Object challenge1 = getResponse();
assertTrue(challenge1 instanceof CommandAuthChallenge);

// Trigger another AuthChallenge to verify that code path continues to challenge
ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, AuthData.of("challenge.client".getBytes()), 1, "1");
channel.writeInbound(authResponse1);

Object challenge2 = getResponse();
assertTrue(challenge2 instanceof CommandAuthChallenge);

// Trigger failure
ByteBuf authResponse2 = Commands.newAuthResponse(authMethodName, AuthData.of("fail.client".getBytes()), 1, "1");
channel.writeInbound(authResponse2);

Object response3 = getResponse();
assertTrue(response3 instanceof CommandError);
assertEquals(((CommandError) response3).getMessage(), "Unable to authenticate");
assertEquals(serverCnx.getState(), State.Failed);
assertFalse(serverCnx.isActive());
channel.finish();
}

@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.service.utils;

import java.util.Queue;

import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
Expand Down Expand Up @@ -76,6 +76,11 @@ protected void handleConnected(CommandConnected connected) {
queue.offer(new CommandConnected().copyFrom(connected));
}

@Override
protected void handleAuthChallenge(CommandAuthChallenge challenge) {
queue.offer(new CommandAuthChallenge().copyFrom(challenge));
}

@Override
protected void handleSubscribe(CommandSubscribe subscribe) {
queue.offer(new CommandSubscribe().copyFrom(subscribe));
Expand Down

0 comments on commit 94de805

Please sign in to comment.