Skip to content

Commit

Permalink
Small fixes to SASL documentation and logging; validate security_prot…
Browse files Browse the repository at this point in the history
…ocol (#1231)
  • Loading branch information
dpkp authored Oct 3, 2017
1 parent 7794ce8 commit fc9da05
Showing 1 changed file with 26 additions and 21 deletions.
47 changes: 26 additions & 21 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class BrokerConnection(object):
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
Expand Down Expand Up @@ -145,13 +146,15 @@ class BrokerConnection(object):
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
sasl_mechanism (str): string picking sasl mechanism when security_protocol
is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
Default: None
sasl_mechanism (str): Authentication mechanism when security_protocol
is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
PLAIN, GSSAPI. Default: PLAIN
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -179,12 +182,10 @@ class BrokerConnection(object):
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name':'kafka'
'sasl_kerberos_service_name': 'kafka'
}
if gssapi is None:
SASL_MECHANISMS = ('PLAIN',)
else:
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')

def __init__(self, host, port, afi, **configs):
self.hostname = host
Expand Down Expand Up @@ -213,6 +214,9 @@ def __init__(self, host, port, afi, **configs):
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))

assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))

if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
assert ssl_available, "Python wasn't built with SSL support"

Expand All @@ -224,7 +228,7 @@ def __init__(self, host, port, afi, **configs):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'

self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -332,6 +336,7 @@ def connect(self):
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
else:
# security_protocol PLAINTEXT
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -367,7 +372,6 @@ def connect(self):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
Expand Down Expand Up @@ -500,21 +504,21 @@ def _try_authenticate_plain(self, future):
if data != b'\x00\x00\x00\x00':
return future.failure(Errors.AuthenticationFailedError())

log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
return future.success(True)

def _try_authenticate_gssapi(self, future):

data = b''
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
#Exchange tokens until authentication either suceeded or failed:
ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
# Exchange tokens until authentication either succeeds or fails:
received_token = None
try:
while not ctx_Context.complete:
#calculate the output token
# calculate the output token
try:
output_token = ctx_Context.step(received_token)
except GSSError as e:
Expand All @@ -533,10 +537,10 @@ def _try_authenticate_gssapi(self, future):
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)

# The server will send a token back. processing of this token either
# establishes a security context, or needs further token exchange
# the gssapi will be able to identify the needed next step
# The connection is closed on failure
# The server will send a token back. Processing of this token either
# establishes a security context, or it needs further token exchange.
# The gssapi will be able to identify the needed next step.
# The connection is closed on failure.
response = self._sock.recv(2000)
self._sock.setblocking(False)

Expand All @@ -546,7 +550,7 @@ def _try_authenticate_gssapi(self, future):
future.failure(error)
self.close(error=error)

#pass the received token back to gssapi, strip the first 4 bytes
# pass the received token back to gssapi, strip the first 4 bytes
received_token = response[4:]

except Exception as e:
Expand All @@ -555,6 +559,7 @@ def _try_authenticate_gssapi(self, future):
future.failure(error)
self.close(error=error)

log.info('%s: Authenticated as %s', self, gssname)
return future.success(True)

def blacked_out(self):
Expand Down

0 comments on commit fc9da05

Please sign in to comment.