Skip to content

Commit

Permalink
Implement BLMOVE
Browse files Browse the repository at this point in the history
  • Loading branch information
sevein committed Apr 11, 2023
1 parent a80f140 commit 93ce01a
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Implemented commands:
- RPUSH
- RPUSHX
- LMOVE
- BLMOVE
- Pub/Sub (complete)
- PSUBSCRIBE
- PUBLISH
Expand Down
77 changes: 77 additions & 0 deletions cmd_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func commandsList(m *Miniredis) {
m.srv.Register("RPUSH", m.cmdRpush)
m.srv.Register("RPUSHX", m.cmdRpushx)
m.srv.Register("LMOVE", m.cmdLmove)
m.srv.Register("BLMOVE", m.cmdBlmove)
}

// BLPOP
Expand Down Expand Up @@ -975,3 +976,79 @@ func (m *Miniredis) cmdLmove(c *server.Peer, cmd string, args []string) {
c.WriteBulk(elem)
})
}

// BLMOVE
func (m *Miniredis) cmdBlmove(c *server.Peer, cmd string, args []string) {
if len(args) != 5 {
setDirty(c)
c.WriteError(errWrongNumber(cmd))
return
}
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c, cmd) {
return
}

opts := struct {
src string
dst string
srcDir string
dstDir string
timeout time.Duration
}{
src: args[0],
dst: args[1],
srcDir: strings.ToLower(args[2]),
dstDir: strings.ToLower(args[3]),
}
if ok := optDuration(c, args[len(args)-1], &opts.timeout); !ok {
return
}

blocking(
m,
c,
opts.timeout,
func(c *server.Peer, ctx *connCtx) bool {
db := m.db(ctx.selectedDB)

if !db.exists(opts.src) {
return false
}
if db.t(opts.src) != "list" || (db.exists(opts.dst) && db.t(opts.dst) != "list") {
c.WriteError(msgWrongType)
return true
}

var elem string
switch opts.srcDir {
case "left":
elem = db.listLpop(opts.src)
case "right":
elem = db.listPop(opts.src)
default:
c.WriteError(msgSyntaxError)
return true
}

switch opts.dstDir {
case "left":
db.listLpush(opts.dst, elem)
case "right":
db.listPush(opts.dst, elem)
default:
c.WriteError(msgSyntaxError)
return true
}

c.WriteBulk(elem)
return true
},
func(c *server.Peer) {
// timeout
c.WriteLen(-1)
},
)
}
166 changes: 165 additions & 1 deletion cmd_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ func TestLpushx(t *testing.T) {
proto.Error(msgWrongType),
)
}

}

func TestLpop(t *testing.T) {
Expand Down Expand Up @@ -1663,3 +1662,168 @@ func TestLmove(t *testing.T) {
)
})
}

func TestBlmove(t *testing.T) {
s, err := Run()
ok(t, err)
defer s.Close()
c, err := proto.Dial(s.Addr())
ok(t, err)
defer c.Close()

t.Run("Behaves just like LMOVE", func(t *testing.T) {
s.Push("src", "LR", "LL", "RR", "RL")
s.Push("dst", "m1", "m2", "m3")

// RIGHT LEFT
{
mustDo(t, c,
"BLMOVE", "src", "dst", "RIGHT", "LEFT", "0",
proto.String("RL"),
)
s.CheckList(t, "src", "LR", "LL", "RR")
s.CheckList(t, "dst", "RL", "m1", "m2", "m3")
}
// LEFT RIGHT
{
mustDo(t, c,
"BLMOVE", "src", "dst", "LEFT", "RIGHT", "0",
proto.String("LR"),
)
s.CheckList(t, "src", "LL", "RR")
s.CheckList(t, "dst", "RL", "m1", "m2", "m3", "LR")
}
// RIGHT RIGHT
{
mustDo(t, c,
"BLMOVE", "src", "dst", "RIGHT", "RIGHT", "0",
proto.String("RR"),
)
s.CheckList(t, "src", "LL")
s.CheckList(t, "dst", "RL", "m1", "m2", "m3", "LR", "RR")
}
// LEFT LEFT
{
mustDo(t, c,
"BLMOVE", "src", "dst", "LEFT", "LEFT", "0",
proto.String("LL"),
)
assert(t, !s.Exists("src"), "src exists")
s.CheckList(t, "dst", "LL", "RL", "m1", "m2", "m3", "LR", "RR")
}

// Non existing lists
{
s.Push("ll", "aap", "noot", "mies")

mustDo(t, c,
"BLMOVE", "ll", "nosuch", "RIGHT", "LEFT", "0",
proto.String("mies"),
)
assert(t, s.Exists("nosuch"), "nosuch exists")
s.CheckList(t, "ll", "aap", "noot")
s.CheckList(t, "nosuch", "mies")

mustNilList(t, c,
"BLMOVE", "nosuch2", "ll", "RIGHT", "LEFT", "0.001",
)
}

// Cycle
{
s.Push("cycle", "aap", "noot", "mies")

mustDo(t, c,
"BLMOVE", "cycle", "cycle", "RIGHT", "LEFT", "0",
proto.String("mies"),
)
s.CheckList(t, "cycle", "mies", "aap", "noot")

mustDo(t, c,
"BLMOVE", "cycle", "cycle", "LEFT", "RIGHT", "0",
proto.String("mies"),
)
s.CheckList(t, "cycle", "aap", "noot", "mies")
}
})

t.Run("Errors", func(t *testing.T) {
s.Push("src", "aap", "noot", "mies")
s.Push("dst", "aap", "noot", "mies")
mustDo(t, c,
"BLMOVE",
proto.Error(errWrongNumber("blmove")),
)
mustDo(t, c,
"BLMOVE", "l",
proto.Error(errWrongNumber("blmove")),
)
mustDo(t, c,
"BLMOVE", "l", "l",
proto.Error(errWrongNumber("blmove")),
)
mustDo(t, c,
"BLMOVE", "l", "l", "l",
proto.Error(errWrongNumber("blmove")),
)
mustDo(t, c,
"BLMOVE", "l", "l", "l", "l",
proto.Error(errWrongNumber("blmove")),
)
mustDo(t, c,
"BLMOVE", "too", "many", "many", "many", "many", "arguments",
proto.Error(errWrongNumber("blmove")),
)

s.Set("str", "string!")
mustDo(t, c,
"BLMOVE", "str", "src", "left", "right", "0",
proto.Error(msgWrongType),
)
mustDo(t, c,
"BLMOVE", "src", "str", "left", "right", "0",
proto.Error(msgWrongType),
)

mustDo(t, c,
"BLMOVE", "src", "dst", "no", "good", "0",
proto.Error("ERR syntax error"),
)
mustDo(t, c,
"BLMOVE", "src", "dst", "invalid", "right", "0",
proto.Error("ERR syntax error"),
)
mustDo(t, c,
"BLMOVE", "src", "dst", "left", "invalid", "0",
proto.Error("ERR syntax error"),
)
})

t.Run("Blocking: simple case", func(t *testing.T) {
c.Do("DEL", "src", "dst")

s.Push("src", "s1", "s2")
s.Push("dst", "d1")
got := goStrings(t, s, "BLMOVE", "src", "dst", "RIGHT", "LEFT", "0.1")
select {
case have := <-got:
equals(t, proto.String("s2"), have)
s.CheckList(t, "src", "s1")
s.CheckList(t, "dst", "s2", "d1")
case <-time.After(200 * time.Millisecond):
t.Error("BLMOVE took too long")
}
})

t.Run("Blocking: timeout", func(t *testing.T) {
c.Do("DEL", "src", "dst")

got := goStrings(t, s, "BLMOVE", "src", "dst", "RIGHT", "LEFT", "0.1")
select {
case have := <-got:
equals(t, proto.NilList, have)
case <-time.After(200 * time.Millisecond):
t.Error("BLMOVE took too long")
}
})
}
71 changes: 71 additions & 0 deletions integration/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,74 @@ func TestLmove(t *testing.T) {
c.Do("LRANGE", "chk", "0", "-1")
})
}

func TestBlmove(t *testing.T) {
skip(t)
testRaw(t, func(c *client) {
c.Do("RPUSH", "src", "LR", "LL", "RR", "RL")
c.Do("BLMOVE", "src", "dst", "LEFT", "RIGHT", "0")
c.Do("LRANGE", "src", "0", "-1")
c.Do("LRANGE", "dst", "0", "-1")
c.Do("BLMOVE", "src", "dst", "RIGHT", "LEFT", "0")
c.Do("BLMOVE", "src", "dst", "LEFT", "LEFT", "0")
c.Do("BLMOVE", "src", "dst", "RIGHT", "RIGHT", "0") // now empty
c.Do("EXISTS", "src")
c.Do("LRANGE", "dst", "0", "-1")

// Cycle left to right
c.Do("RPUSH", "round", "aap", "noot", "mies")
c.Do("BLMOVE", "round", "round", "LEFT", "RIGHT", "0")
c.Do("LRANGE", "round", "0", "-1")
c.Do("BLMOVE", "round", "round", "LEFT", "RIGHT", "0")
c.Do("BLMOVE", "round", "round", "LEFT", "RIGHT", "0")
c.Do("BLMOVE", "round", "round", "LEFT", "RIGHT", "0")
c.Do("BLMOVE", "round", "round", "LEFT", "RIGHT", "0")
c.Do("LRANGE", "round", "0", "-1")
// Cycle right to left
c.Do("BLMOVE", "round", "round", "RIGHT", "LEFT", "0")
c.Do("LRANGE", "round", "0", "-1")
c.Do("BLMOVE", "round", "round", "RIGHT", "LEFT", "0")
c.Do("BLMOVE", "round", "round", "RIGHT", "LEFT", "0")
c.Do("BLMOVE", "round", "round", "RIGHT", "LEFT", "0")
c.Do("BLMOVE", "round", "round", "RIGHT", "LEFT", "0")
c.Do("LRANGE", "round", "0", "-1")
// Cycle same side
c.Do("BLMOVE", "round", "round", "LEFT", "LEFT", "0")
c.Do("LRANGE", "round", "0", "-1")
c.Do("BLMOVE", "round", "round", "RIGHT", "RIGHT", "0")
c.Do("LRANGE", "round", "0", "-1")

// failure cases
c.Do("RPUSH", "chk", "aap", "noot", "mies")
c.Error("wrong number", "LMOVE")
c.Error("wrong number", "LMOVE", "chk")
c.Error("wrong number", "LMOVE", "chk", "dst")
c.Error("wrong number", "LMOVE", "chk", "dst", "chk")
c.Error("wrong number", "LMOVE", "chk", "dst", "chk", "too", "many")
c.Do("SET", "str", "I am a string")
c.Error("wrong kind", "BLMOVE", "chk", "str", "LEFT", "LEFT", "0")
c.Error("wrong kind", "BLMOVE", "str", "chk", "LEFT", "LEFT", "0")
c.Do("LRANGE", "chk", "0", "-1")
})

wg := &sync.WaitGroup{}
wg.Add(1)
testMulti(t,
func(c *client) {
c.Do("BLMOVE", "from", "to", "RIGHT", "LEFT", "1")
c.Do("BLMOVE", "from", "to", "RIGHT", "LEFT", "1")
c.Do("BLMOVE", "from", "to", "RIGHT", "LEFT", "1")
c.Do("BLMOVE", "from", "to", "RIGHT", "LEFT", "1")
c.Do("BLMOVE", "from", "to", "RIGHT", "LEFT", "1") // will timeout
wg.Done()
},
func(c *client) {
c.Do("LPUSH", "from", "aap", "noot", "mies")
time.Sleep(20 * time.Millisecond)
c.Do("LPUSH", "from", "toon")
wg.Wait()
c.Do("LRANGE", "from", "0", "-1")
c.Do("LRANGE", "to", "0", "-1")
},
)
}

0 comments on commit 93ce01a

Please sign in to comment.