Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for BITPOS (bitmap command) #112

Merged
merged 1 commit into from
Dec 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fakeredis/_msgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
JSON_PATH_NOT_FOUND_OR_NOT_STRING = "ERR Path '{}' does not exist or not a string"
JSON_PATH_DOES_NOT_EXIST = "ERR Path '{}' does not exist"
LCS_CANT_HAVE_BOTH_LEN_AND_IDX = "ERR If you want both the length and indexes, please just use IDX."
BIT_ARG_MUST_BE_ZERO_OR_ONE = "ERR The bit argument must be 1 or 0."

FLAG_NO_SCRIPT = 's' # Command not allowed in scripts
FLAG_LEAVE_EMPTY_VAL = 'v'
FLAG_TRANSACTION = 't'
48 changes: 39 additions & 9 deletions fakeredis/commands_mixins/bitmap_mixin.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,46 @@
from fakeredis import _msgs as msgs
from fakeredis._commands import (command, Key, Int, BitOffset, BitValue, fix_range_string)
from fakeredis._commands import (command, Key, Int, BitOffset, BitValue, fix_range_string, fix_range)
from fakeredis._helpers import SimpleError, casematch


class BitmapCommandsMixin:
# BITMAP commands
# TODO: bitfield, bitfield_ro, bitpos
@staticmethod
def _bytes_as_bin_string(value):
return ''.join([bin(i).lstrip('0b').rjust(8, '0') for i in value])

@command((Key(bytes), Int), (bytes,))
def bitpos(self, key, bit, *args):
if bit != 0 and bit != 1:
raise SimpleError(msgs.BIT_ARG_MUST_BE_ZERO_OR_ONE)
if len(args) > 3:
raise SimpleError(msgs.SYNTAX_ERROR_MSG)
if len(args) == 3 and self.version < 7:
raise SimpleError(msgs.SYNTAX_ERROR_MSG)
bit_mode = False
if len(args) == 3 and self.version >= 7:
bit_mode = casematch(args[2], b'bit')
if not bit_mode and not casematch(args[2], b'byte'):
raise SimpleError(msgs.SYNTAX_ERROR_MSG)
start = 0 if len(args) == 0 else Int.decode(args[0])
bit_chr = str(bit)
key_value = key.value if key.value else b''

if bit_mode:
value = self._bytes_as_bin_string(key_value)
end = len(value) if len(args) <= 1 else Int.decode(args[1])
start, end = fix_range(start, end, len(value))
value = value[start:end]
else:
end = len(key_value) if len(args) <= 1 else Int.decode(args[1])
start, end = fix_range(start, end, len(key_value))
value = self._bytes_as_bin_string(key_value[start:end])

result = value.find(bit_chr)
if result != -1:
result += start if bit_mode else (start * 8)
return result

@command((Key(bytes, 0),), (bytes,))
def bitcount(self, key, *args):
Expand All @@ -28,10 +63,9 @@ def bitcount(self, key, *args):
raise SimpleError(msgs.SYNTAX_ERROR_MSG)

if bit_mode:
value = key.value.decode() if key.value else ''
value = list(map(int, ''.join([bin(ord(i)).lstrip('0b').rjust(8, '0') for i in value])))
value = self._bytes_as_bin_string(key.value if key.value else b'')
start, end = fix_range_string(start, end, len(value))
return value[start:end].count(1)
return value[start:end].count('1')
start, end = fix_range_string(start, end, len(key.value))
value = key.value[start:end]

Expand Down Expand Up @@ -74,19 +108,15 @@ def setbit(self, key, offset, value):
@staticmethod
def _bitop(op, *keys):
value = keys[0].value
if not isinstance(value, bytes):
raise SimpleError(msgs.WRONGTYPE_MSG)
ans = keys[0].value
i = 1
while i < len(keys):
value = keys[i].value if keys[i].value is not None else b''
if not isinstance(value, bytes):
raise SimpleError(msgs.WRONGTYPE_MSG)
ans = bytes(op(a, b) for a, b in zip(ans, value))
i += 1
return ans

@command((bytes, Key(), Key(bytes)), (Key(bytes),))
@command((bytes, Key()), (Key(bytes),))
def bitop(self, op_name, dst, *keys):
if len(keys) == 0:
raise SimpleError(msgs.WRONG_ARGS_MSG6.format('bitop'))
Expand Down
51 changes: 50 additions & 1 deletion test/test_mixins/test_bitmap_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,19 @@ def test_bitcount_mode_redis6(r):
r.bitcount('key', start=1, end=1, mode='byte')
with pytest.raises(redis.ResponseError):
r.bitcount('key', start=1, end=1, mode='bit')
with pytest.raises(redis.ResponseError):
raw_command(r, 'bitcount', 'key', '1', '2', 'dsd', 'cd')


@pytest.mark.min_server('7')
def test_bitcount_mode(r):
def test_bitcount_mode_redis7(r: redis.Redis):
r.set('key', 'foobar')
assert r.bitcount('key', start=1, end=1, mode='byte') == 6
assert r.bitcount('key', start=5, end=30, mode='bit') == 17
with pytest.raises(redis.ResponseError):
r.bitcount('key', start=5, end=30, mode='dscd')
with pytest.raises(redis.ResponseError):
raw_command(r, 'bitcount', 'key', '1', '2', 'dsd', 'cd')


def test_bitcount_wrong_type(r):
Expand Down Expand Up @@ -159,3 +165,46 @@ def test_bitop_errors(r):
r.bitop('and', 'dest', 'key1', 'key-set')
with pytest.raises(redis.ResponseError):
r.bitop('and', 'dest')


def test_bitpos(r: redis.Redis):
key = "key:bitpos"
r.set(key, b"\xff\xf0\x00")
assert r.bitpos(key, 0) == 12
assert r.bitpos(key, 0, 2, -1) == 16
assert r.bitpos(key, 0, -2, -1) == 12
r.set(key, b"\x00\xff\xf0")
assert r.bitpos(key, 1, 0) == 8
assert r.bitpos(key, 1, 1) == 8
r.set(key, b"\x00\x00\x00")
assert r.bitpos(key, 1) == -1
r.set(key, b"\xff\xf0\x00")


@pytest.mark.min_server('7')
def test_bitops_mode_redis7(r: redis.Redis):
key = "key:bitpos"
r.set(key, b"\xff\xf0\x00")
assert r.bitpos(key, 0, 8, -1, 'bit') == 12
assert r.bitpos(key, 1, 8, -1, 'bit') == 8
with pytest.raises(redis.ResponseError):
assert r.bitpos(key, 0, 8, -1, 'bad_mode') == 12


@pytest.mark.max_server('6')
def test_bitops_mode_redis6(r: redis.Redis):
key = "key:bitpos"
r.set(key, b"\xff\xf0\x00")
with pytest.raises(redis.ResponseError):
assert r.bitpos(key, 0, 8, -1, 'bit') == 12


def test_bitpos_wrong_arguments(r: redis.Redis):
key = "key:bitpos:wrong:args"
r.set(key, b"\xff\xf0\x00")
with pytest.raises(redis.ResponseError):
raw_command(r, 'bitpos', key, '7')
with pytest.raises(redis.ResponseError):
raw_command(r, 'bitpos', key, 1, '6', '5', 'BYTE', '6')
with pytest.raises(redis.ResponseError):
raw_command(r, 'bitpos', key)