Skip to content

Commit

Permalink
feat: add support for plain sasl
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Oct 13, 2024
1 parent db0ad5c commit 8979ca0
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 58 deletions.
3 changes: 3 additions & 0 deletions lib/klife_protocol/sasl_mechanism/behaviour.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule KlifeProtocol.SASLMechanism.Behaviour do
@callback execute_auth(sendrcv_fun :: function, opts :: list) :: :ok | {:error, reason :: term}
end
11 changes: 11 additions & 0 deletions lib/klife_protocol/sasl_mechanism/plain.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule KlifeProtocol.SASLMechanism.Plain do
@behaviour KlifeProtocol.SASLMechanism.Behaviour

def execute_auth(send_recv_fun, opts) do
usr = Keyword.fetch!(opts, :username)
pwd = Keyword.fetch!(opts, :password)
to_send = :erlang.iolist_to_binary([0, usr, 0, pwd])
"" = send_recv_fun.(to_send)
:ok
end
end
90 changes: 86 additions & 4 deletions lib/klife_protocol/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,99 @@ defmodule KlifeProtocol.Socket do
@moduledoc """
Initialize a socket using `connect/3` of `:gen_tcp` or `:ssl` backend injecting the options `:binary` and `packet: 4`.
Any other option will be forwarded to the proper backend module.
Options:
- backend: :gen_tcp or :ssl (default to :gen_tcp)
- backend: `:gen_tcp` or `:ssl` (defaults to `:gen_tcp`)
- sasl_opts: optional. sasl only. Is a keyword list containing:
- mechanism: sasl mechanism string (eg. "PLAIN"),
- sasl_auth_vsn: api version number to use for SaslAuthenticate messages,
- sasl_handshake_vsn: api version number to use for SaslHandshake messages,
- mechanism_opts: mechanism specific keywordlist
- any other option will be forwarded to the backend module `connect/3` function
"""

alias KlifeProtocol.Messages.SaslAuthenticate
alias KlifeProtocol.Messages.SaslHandshake

def connect(host, port, opts \\ []) do
{backend, opts} = Keyword.pop(opts, :backend, :gen_tcp)
{sasl_opts, opts} = Keyword.pop(opts, :sasl_opts, [])
must_have_opts = [:binary, packet: 4]
backend.connect(String.to_charlist(host), port, opts ++ must_have_opts)

case backend.connect(String.to_charlist(host), port, opts ++ must_have_opts) do
{:ok, socket} ->
:ok = maybe_handle_sasl(socket, backend, sasl_opts)
{:ok, socket}

err ->
err
end
end

defp maybe_handle_sasl(_socket, _backend, []), do: :ok

defp maybe_handle_sasl(socket, backend, sasl_opts) do
mechanism = Keyword.fetch!(sasl_opts, :mechanism)
sasl_auth_vsn = Keyword.fetch!(sasl_opts, :sasl_auth_vsn)
sasl_handshake_vsn = Keyword.fetch!(sasl_opts, :sasl_handshake_vsn)
mechanism_opts = Keyword.get(sasl_opts, :mechanism_opts, [])

send_recv_raw_fun = fn data ->
:ok = apply(backend, :send, [socket, data])
{:ok, rcv_bin} = apply(backend, :recv, [socket, 0, 5000])
rcv_bin
end

:ok = sasl_handshake(send_recv_raw_fun, mechanism, sasl_handshake_vsn)

send_recv_fun = fn data ->
to_send = %{content: %{auth_bytes: data}, headers: %{correlation_id: 123}}
to_send_raw = SaslAuthenticate.serialize_request(to_send, sasl_auth_vsn)
rcv_bin = send_recv_raw_fun.(to_send_raw)

{:ok, %{content: resp}} =
SaslAuthenticate.deserialize_response(rcv_bin, sasl_auth_vsn)

case resp do
%{error_code: 0, auth_bytes: auth_bytes} ->
auth_bytes

%{error_code: ec} ->
raise """
Unexpected error on SASL authentication message. ErrorCode: #{inspect(ec)}
"""
end
end

case mechanism do
"PLAIN" ->
KlifeProtocol.SASLMechanism.Plain.execute_auth(send_recv_fun, mechanism_opts)

other ->
raise "Unsupported SASL mechanism #{inspect(other)}"
end
end

defp sasl_handshake(send_rcv_raw_fun, mechanism, sasl_handshake_vsn) do
to_send = %{content: %{mechanism: mechanism}, headers: %{correlation_id: 123}}
to_send_raw = SaslHandshake.serialize_request(to_send, sasl_handshake_vsn)
recv_bin = send_rcv_raw_fun.(to_send_raw)

{:ok, %{content: resp}} =
SaslHandshake.deserialize_response(recv_bin, sasl_handshake_vsn)

case resp do
%{error_code: 0, mechanisms: server_enabled_mechanisms} ->
if mechanism in server_enabled_mechanisms do
:ok
else
raise "Server does not support SASL mechanism #{mechanism}. Supported mechanisms are: #{inspect(server_enabled_mechanisms)}"
end

%{error_code: ec} ->
raise """
Unexpected error on SASL handhsake message. ErrorCode: #{inspect(ec)}
"""
end
end
end
4 changes: 2 additions & 2 deletions lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ if Mix.env() in [:dev] do
end

def do_run_bench("produce_serialization") do
headers = Helpers.genereate_headers()
headers = Helpers.generate_headers()

input_1_500 = %{
headers: headers,
Expand Down Expand Up @@ -144,7 +144,7 @@ if Mix.env() in [:dev] do
offset: offset_4
}} = Helpers.produce_message(topic_name, produce_content.(50, 10_000))

headers = Helpers.genereate_headers()
headers = Helpers.generate_headers()

content_1 = %{
replica_id: -1,
Expand Down
2 changes: 2 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ defmodule KlifeProtocol.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# CRC
{:crc32cer, "~> 0.1.8"},
# Compression
{:snappyer, "~> 1.2.7"},
# Code generation
{:jason, "~> 1.4", only: :dev, runtime: false},
Expand Down
85 changes: 67 additions & 18 deletions test/compose_files/docker-compose-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ services:
- "2888:2888"
- "3888:3888"
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_INIT_LIMIT=5
- ZOOKEEPER_SYNC_LIMIT=2
- ZOOKEEPER_SERVERS=zookeeper:2888:3888
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper:2888:3888
KAFKA_OPTS:
-Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf
-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
-Dzookeeper.allowSaslFailedClients=false
-Dzookeeper.requireClientAuthScheme=sasl
volumes:
- ./sasl/zookeeper.jaas.conf:/etc/kafka/zookeeper_server_jaas.conf:ro,z

kafka1:
image: confluentinc/cp-kafka:latest
Expand All @@ -22,21 +29,35 @@ services:
ports:
- "19092:9092"
- "19093:9093"
- "19094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZOOKEEPER: zookeeper:2181
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9091,kafka3:9091
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9091,EXTERNAL_PLAIN://kafka1:9092,EXTERNAL://kafka1:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9091,EXTERNAL_PLAIN://kafka1:9092,EXTERNAL://kafka1:9093,SASL_SSL://kafka1:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL,SASL_SSL:SASL_SSL
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_SSL_KEYSTORE_LOCATION: /secrets/kafka.keystore.jks
KAFKA_SSL_TRUSTSTORE_LOCATION: /secrets/kafka.truststore.jks
KAFKA_SSL_KEY_PASSWORD: klifeprotocol
KAFKA_SSL_KEYSTORE_PASSWORD: klifeprotocol
KAFKA_SSL_TRUSTSTORE_PASSWORD: klifeprotocol
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: creds
KAFKA_SSL_KEY_CREDENTIALS: creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
KAFKA_SASL_KERBEROS_SERVICE_NAME: klife

volumes:
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks:ro,z
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks:ro,z
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks
- ./ssl/localhost.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks
- ./ssl/localhost.truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
- ./sasl/broker.jaas.conf:/etc/kafka/kafka_server_jaas.conf
- ./sasl/creds:/etc/kafka/secrets/creds

kafka2:
image: confluentinc/cp-kafka:latest
Expand All @@ -45,21 +66,35 @@ services:
ports:
- "29092:9092"
- "29093:9093"
- "29094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZOOKEEPER: zookeeper:2181
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9091,kafka3:9091
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9091,EXTERNAL_PLAIN://kafka2:9092,EXTERNAL://kafka2:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9091,EXTERNAL_PLAIN://kafka2:9092,EXTERNAL://kafka2:9093,SASL_SSL://kafka2:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL,SASL_SSL:SASL_SSL
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_SSL_KEYSTORE_LOCATION: /secrets/kafka.keystore.jks
KAFKA_SSL_TRUSTSTORE_LOCATION: /secrets/kafka.truststore.jks
KAFKA_SSL_KEY_PASSWORD: klifeprotocol
KAFKA_SSL_KEYSTORE_PASSWORD: klifeprotocol
KAFKA_SSL_TRUSTSTORE_PASSWORD: klifeprotocol
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: creds
KAFKA_SSL_KEY_CREDENTIALS: creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
KAFKA_SASL_KERBEROS_SERVICE_NAME: klife
volumes:
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks:ro,z
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks:ro,z
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks
- ./ssl/localhost.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks
- ./ssl/localhost.truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
- ./sasl/broker.jaas.conf:/etc/kafka/kafka_server_jaas.conf
- ./sasl/creds:/etc/kafka/secrets/creds

kafka3:
image: confluentinc/cp-kafka:latest
Expand All @@ -68,18 +103,32 @@ services:
ports:
- "39092:9092"
- "39093:9093"
- "39094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZOOKEEPER: zookeeper:2181
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9091,kafka3:9091
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9091,EXTERNAL_PLAIN://kafka3:9092,EXTERNAL://kafka3:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9091,EXTERNAL_PLAIN://kafka3:9092,EXTERNAL://kafka3:9093,SASL_SSL://kafka3:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_PLAIN:PLAINTEXT,EXTERNAL:SSL,SASL_SSL:SASL_SSL
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_SSL_KEYSTORE_LOCATION: /secrets/kafka.keystore.jks
KAFKA_SSL_TRUSTSTORE_LOCATION: /secrets/kafka.truststore.jks
KAFKA_SSL_KEY_PASSWORD: klifeprotocol
KAFKA_SSL_KEYSTORE_PASSWORD: klifeprotocol
KAFKA_SSL_TRUSTSTORE_PASSWORD: klifeprotocol
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: creds
KAFKA_SSL_KEY_CREDENTIALS: creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
KAFKA_SASL_KERBEROS_SERVICE_NAME: klife
volumes:
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks:ro,z
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks:ro,z
- ./ssl/localhost.keystore.jks:/secrets/kafka.keystore.jks
- ./ssl/localhost.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
- ./ssl/localhost.truststore.jks:/secrets/kafka.truststore.jks
- ./ssl/localhost.truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
- ./sasl/broker.jaas.conf:/etc/kafka/kafka_server_jaas.conf
- ./sasl/creds:/etc/kafka/secrets/creds
11 changes: 11 additions & 0 deletions test/compose_files/sasl/broker.jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="klifeusr"
password="klifepwd";
};
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="klifeusr"
password="klifepwd"
user_klifeusr="klifepwd";
};
1 change: 1 addition & 0 deletions test/compose_files/sasl/creds
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
klifeprotocol
4 changes: 4 additions & 0 deletions test/compose_files/sasl/zookeeper.jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_klifeusr="klifepwd";
};
8 changes: 4 additions & 4 deletions test/messages/api_versions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Messages.ApiVersionsTest do

test "request and response v0" do
version = 0
headers = %{correlation_id: correlation_id} = Helpers.genereate_headers()
headers = %{correlation_id: correlation_id} = Helpers.generate_headers()

{:ok, result} =
%{headers: headers, content: %{}}
Expand Down Expand Up @@ -35,7 +35,7 @@ defmodule Messages.ApiVersionsTest do

test "request and response v1" do
version = 1
headers = %{correlation_id: correlation_id} = Helpers.genereate_headers()
headers = %{correlation_id: correlation_id} = Helpers.generate_headers()

{:ok, result} =
%{headers: headers, content: %{}}
Expand Down Expand Up @@ -65,7 +65,7 @@ defmodule Messages.ApiVersionsTest do

test "request and response v2" do
version = 2
headers = %{correlation_id: correlation_id} = Helpers.genereate_headers()
headers = %{correlation_id: correlation_id} = Helpers.generate_headers()

{:ok, result} =
%{headers: headers, content: %{}}
Expand Down Expand Up @@ -96,7 +96,7 @@ defmodule Messages.ApiVersionsTest do
test "request and response v3" do
version = 3

headers = %{correlation_id: correlation_id} = Helpers.genereate_headers()
headers = %{correlation_id: correlation_id} = Helpers.generate_headers()

content = %{
client_software_name: "klife",
Expand Down
Loading

0 comments on commit 8979ca0

Please sign in to comment.