Skip to content

Commit

Permalink
chore: add sasl doc
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Oct 13, 2024
1 parent 84401f2 commit ec82a52
Showing 1 changed file with 41 additions and 18 deletions.
59 changes: 41 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The implementation is specifically designed to handle only the wire protocol. Th

## Supported messages

One of the main features of this library is its ability to generate Elixir code using [Kafka message definitions](https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message). This ensures that all currently available and future messages (excluding the ones mentioned below) should be easily supported in all versions by the implementation.
One of the main features of this library is its ability to generate Elixir code using [Kafka message definitions](https://github.com/apache/kafka/tree/trunk/clients/src/main/resources/common/message). This ensures that all currently available and future messages (excluding the ones mentioned below) should be easily supported in all versions by the implementation.

This implementation of the Kafka protocol supports all currently available messages and versions, except for `Fetch` versions below 4. This is due to a change in record batch serialization in newer versions (>=4) of this message. As a result, the library is NOT compatible with Kafka versions prior 0.11.

Expand Down Expand Up @@ -46,6 +46,7 @@ Inside the `response` variable you will get something like this:
headers: %{correlation_id: 123}
}
```

## Messages API

Each message has it's own set of fields, as described in the Kafka official documentation. On this library, each message has it's own module within `KlifeProtocol.Messages` namespace. In the previous example we used `KlifeProtocol.Messages.ApiVersions` which stands for the [API version message](https://kafka.apache.org/protocol.html#The_Messages_ApiVersions).
Expand Down Expand Up @@ -151,22 +152,39 @@ After, you can use the `deserialize_response/3` function of the messages API, pa
{:ok, %{content: content}} = Messages.ApiVersions.deserialize_response(rest_data, version, false)
```

## SASL

SASL is handled by the `KlifeProtocol.Socket` module and client libraries can pass SASL options to `connect/3` function.

For now the only supported mechanism is PLAIN and you can use it like this:

```elixir
sasl_opts = [
mechanism: "PLAIN",
sasl_auth_vsn: 2,
sasl_handshake_vsn: 1,
mechanism_opts: [
username: "klifeusr",
password: "klifepwd"
]
]

{:ok, socket} = KlifeProtocol.Socket.connect("localhost", 9092, [backend: :ssl, sasl_opts: sasl_opts])
```

## Compression and Record Batch Attributes

Currently supports two compression methods: `snappy` using [sanppyer library](https://github.com/zmstone/snappyer) and `gzip` using [erlang zlib library](https://www.erlang.org/doc/man/zlib.html).


To configure the compression strategy, along with other important data such as `timestampType` and `isTransactional`, you can utilize the Kafka record batch's [attributes byte](https://kafka.apache.org/documentation/#recordbatch).


Klife protocol provides an interface to simplify the creation of this attributes using the `encode_attributes/1` and `decode_attributes/1` functions of the `RecordBatch` module. Here's an example:

```elixir
alias KlifeProtocol.RecordBatch

opts = [
compression: :snappy,
compression: :snappy,
timestamp_type: :log_append_time,
is_transactional: true,
is_control_batch: true,
Expand All @@ -185,6 +203,7 @@ attr_val = RecordBatch.encode_attributes(opts)
```

With these functions, a client can easily generate and decode attributes for each record batch it handles.

## Performance

This section provides performance benchmarks for the main use cases of produce serialization and fetch deserialization, conducted using the [benchee tool](https://github.com/bencheeorg/benchee). The benchmarks measure only the serialization work, as the Kafka cluster is running solely to retrieve data samples for benchmarking purposes. Note that network latency is not included in the measurements, only serialization/deserialization performance.
Expand All @@ -200,28 +219,31 @@ Kernel: 6.1.0-13-amd64 (64-bit)
```

All benchmarks can be executed by running the benchmark mix task from the project's base folder:

```
bash run-kafka.sh
mix benchmark produce_serialization
mix benchmark fetch_deserialization
bash stop-kafka.sh
```

### Produce Serialization
| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg |
|---------|----------|---------|--------|--------|--------|---------|-------|----------|
| 1 | 500 kb | 3.92 k | 3.92 K | 254 μs | 252 μs | 300 μs | ±4.8% | 3 kb |
| 10 | 50 kb | 37.6 k | 3.76 K | 265 μs | 263 μs | 316 μs | ±4.4% | 12 kb |
| 50 | 10 kb | 146.6 k | 2.93 K | 341 μs | 340 μs | 393 μs | ±4.6% | 52 kb |
| 100 | 5 kb | 246.0 k | 2.46 K | 406 μs | 399 μs | 520 μs | ±7.5% | 94 kb |

| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg |
| ------- | -------- | ------- | ------ | ------ | ------ | ------ | ----- | -------- |
| 1 | 500 kb | 3.92 k | 3.92 K | 254 μs | 252 μs | 300 μs | ±4.8% | 3 kb |
| 10 | 50 kb | 37.6 k | 3.76 K | 265 μs | 263 μs | 316 μs | ±4.4% | 12 kb |
| 50 | 10 kb | 146.6 k | 2.93 K | 341 μs | 340 μs | 393 μs | ±4.6% | 52 kb |
| 100 | 5 kb | 246.0 k | 2.46 K | 406 μs | 399 μs | 520 μs | ±7.5% | 94 kb |

### Fetch Deserialization
| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg |
|---------|----------|--------|--------|--------|--------|---------|------|----------|
| 1 | 500 kb | 4.23 k | 4.23 k | 236 μs | 224 μs | 400 μs | ±14% | 22 kb |
| 10 | 50 kb | 36.2 k | 3.62 k | 276 μs | 257 μs | 421 μs | ±17% | 69 kb |
| 50 | 10 kb | 116 k | 2.32 k | 430 μs | 419 μs | 558 μs | ±13% | 281 kb |
| 100 | 5 kb | 170 k | 1.70 k | 587 μs | 580 μs | 772 μs | ±13% | 545 kb |

| REC QTY | REC SIZE | REC/S | IPS | AVG | P50 | P99 | SD | Mem. Usg |
| ------- | -------- | ------ | ------ | ------ | ------ | ------ | ---- | -------- |
| 1 | 500 kb | 4.23 k | 4.23 k | 236 μs | 224 μs | 400 μs | ±14% | 22 kb |
| 10 | 50 kb | 36.2 k | 3.62 k | 276 μs | 257 μs | 421 μs | ±17% | 69 kb |
| 50 | 10 kb | 116 k | 2.32 k | 430 μs | 419 μs | 558 μs | ±13% | 281 kb |
| 100 | 5 kb | 170 k | 1.70 k | 587 μs | 580 μs | 772 μs | ±13% | 545 kb |

## Project Overview

Expand All @@ -238,6 +260,7 @@ The project is composed by 5 main components:
- Socket: Simple wrapper of `:gen_tcp` and `:ssl` `connect/3` function that set socket opts that are needed to proper communicate with kafka broker `packet: 4` and `binary`. It is intended to be used only for the socket initialization, all other operations must be done using `:gen_tcp` or `:ssl` directly.

![](./assets/overview.png "Project overview")

## Running Tests

```
Expand All @@ -246,12 +269,12 @@ mix test
bash stop-kafka.sh
```

In order to prevent race conditions with kafka initialization is recommended to wait a couple seconds between `bash run-kafka.sh` and `mix test`.
In order to prevent race conditions with kafka initialization is recommended to wait a couple seconds between `bash run-kafka.sh` and `mix test`.

If you want to run tests using a SSL connection, you can use an env var `CONN_MODE=SSL` like this:

```
bash run-kafka.sh
CONN_MODE=SSL mix test
bash stop-kafka.sh
```
```

0 comments on commit ec82a52

Please sign in to comment.