From fc9da053c8656c2d846889b1c08d54cecd3907d0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 3 Oct 2017 11:31:55 -0700 Subject: [PATCH] Small fixes to SASL documentation and logging; validate security_protocol (#1231) --- kafka/conn.py | 47 ++++++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 16e923846..304045f51 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -112,7 +112,8 @@ class BrokerConnection(object): to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] security_protocol (str): Protocol used to communicate with brokers. - Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. + Default: PLAINTEXT. ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None. @@ -145,13 +146,15 @@ class BrokerConnection(object): metrics (kafka.metrics.Metrics): Optionally provide a metrics instance for capturing network IO stats. Default: None. metric_group_prefix (str): Prefix for metric names. Default: '' - sasl_mechanism (str): string picking sasl mechanism when security_protocol - is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported. - Default: None + sasl_mechanism (str): Authentication mechanism when security_protocol + is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: + PLAIN, GSSAPI. Default: PLAIN sasl_plain_username (str): username for sasl PLAIN authentication. Default: None sasl_plain_password (str): password for sasl PLAIN authentication. Default: None + sasl_kerberos_service_name (str): Service name to include in GSSAPI + sasl mechanism handshake. Default: 'kafka' """ DEFAULT_CONFIG = { @@ -179,12 +182,10 @@ class BrokerConnection(object): 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': None, 'sasl_plain_password': None, - 'sasl_kerberos_service_name':'kafka' + 'sasl_kerberos_service_name': 'kafka' } - if gssapi is None: - SASL_MECHANISMS = ('PLAIN',) - else: - SASL_MECHANISMS = ('PLAIN', 'GSSAPI') + SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI') def __init__(self, host, port, afi, **configs): self.hostname = host @@ -213,6 +214,9 @@ def __init__(self, host, port, afi, **configs): (socket.SOL_SOCKET, socket.SO_SNDBUF, self.config['send_buffer_bytes'])) + assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, ( + 'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS)) + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): assert ssl_available, "Python wasn't built with SSL support" @@ -224,7 +228,7 @@ def __init__(self, host, port, afi, **configs): assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' - assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl' + assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() @@ -332,6 +336,7 @@ def connect(self): log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING else: + # security_protocol PLAINTEXT log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() @@ -367,7 +372,6 @@ def connect(self): if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): - log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) log.debug('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() @@ -500,21 +504,21 @@ def _try_authenticate_plain(self, future): if data != b'\x00\x00\x00\x00': return future.failure(Errors.AuthenticationFailedError()) + log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) return future.success(True) def _try_authenticate_gssapi(self, future): - data = b'' gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname - ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service) + ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service) ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) - ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - #Exchange tokens until authentication either suceeded or failed: + ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') + # Exchange tokens until authentication either succeeds or fails: received_token = None try: while not ctx_Context.complete: - #calculate the output token + # calculate the output token try: output_token = ctx_Context.step(received_token) except GSSError as e: @@ -533,10 +537,10 @@ def _try_authenticate_gssapi(self, future): size = Int32.encode(len(msg)) self._sock.sendall(size + msg) - # The server will send a token back. processing of this token either - # establishes a security context, or needs further token exchange - # the gssapi will be able to identify the needed next step - # The connection is closed on failure + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. response = self._sock.recv(2000) self._sock.setblocking(False) @@ -546,7 +550,7 @@ def _try_authenticate_gssapi(self, future): future.failure(error) self.close(error=error) - #pass the received token back to gssapi, strip the first 4 bytes + # pass the received token back to gssapi, strip the first 4 bytes received_token = response[4:] except Exception as e: @@ -555,6 +559,7 @@ def _try_authenticate_gssapi(self, future): future.failure(error) self.close(error=error) + log.info('%s: Authenticated as %s', self, gssname) return future.success(True) def blacked_out(self):