Skip to content

Commit

Permalink
Implements ExchangeCommandVersion command (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia authored Nov 24, 2023
1 parent 6dda1a8 commit 3257a7d
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 0 deletions.
23 changes: 23 additions & 0 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,29 @@ async def route(self, routing_key: str, super_stream: str) -> list[str]:
)
return resp.streams

async def exchange_command_version(
self, command_info: schema.FrameHandlerInfo
) -> schema.FrameHandlerInfo:

command_versions_input = []
command_versions_input.append(command_info)
resp = await self.sync_request(
schema.ExchangeCommandVersionRequest(
self._corr_id_seq.next(),
command_versions=command_versions_input,
),
resp_schema=schema.ExchangeCommandVersionResponse,
raise_exception=False,
)

command_version = schema.FrameHandlerInfo(
resp.command_versions[command_info.key_command - 1].key_command,
resp.command_versions[command_info.key_command - 1].min_version,
resp.command_versions[command_info.key_command - 1].max_version,
)

return command_version


class ClientPool:
def __init__(
Expand Down
1 change: 1 addition & 0 deletions rstream/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Key(enum.Enum):
Route = 24
Partitions = 25
ConsumerUpdate = 26
CommandExchangeCommandVersion = 27
ConsumerUpdateRequest = 32794


Expand Down
22 changes: 22 additions & 0 deletions rstream/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,28 @@ class SuperStreamRouteResponse(Frame, is_response=True):
streams: list[str] = field(metadata={"type": [T.string]})


@dataclass
class FrameHandlerInfo(Struct):
key_command: int = field(metadata={"type": T.uint16})
min_version: int = field(metadata={"type": T.uint16})
max_version: int = field(metadata={"type": T.uint16})


@dataclass
class ExchangeCommandVersionRequest(Frame):
key = Key.CommandExchangeCommandVersion
correlation_id: int = field(metadata={"type": T.uint32})
command_versions: list[FrameHandlerInfo]


@dataclass
class ExchangeCommandVersionResponse(Frame, is_response=True):
key = Key.CommandExchangeCommandVersion
correlation_id: int = field(metadata={"type": T.uint32})
response_code: int = field(metadata={"type": T.uint16})
command_versions: list[FrameHandlerInfo]


@dataclass
class SuperStreamPartitions(Frame):
key = Key.Partitions
Expand Down
15 changes: 15 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from rstream import schema
from rstream.client import Client
from rstream.constants import Key

from .http_requests import (
create_binding,
Expand Down Expand Up @@ -121,3 +122,17 @@ async def test_routes(client: Client, stream: str) -> None:

assert len(partitions) == 1
assert partitions[0] == "test-stream-0"


async def exchange_command_versions(client: Client) -> None:

expected_min_version = 1
expected_max_version = 1
command_version_input = schema.FrameHandlerInfo(
Key.Publish.value, min_version=expected_min_version, max_version=expected_min_version
)
command_version_server = await client.exchange_command_version(command_version_input)

assert command_version_server.key_command == Key.Publish.value
assert command_version_server.min_version == expected_min_version
assert command_version_server.max_version == expected_max_version

0 comments on commit 3257a7d

Please sign in to comment.