Skip to content

Commit

Permalink
fix '$', XREAD BLOCKED, and XREADGROUP BLOCKED usage via Lua
Browse files Browse the repository at this point in the history
  • Loading branch information
alicebob committed Apr 29, 2023
1 parent abfa615 commit 33a1bb4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
28 changes: 21 additions & 7 deletions cmd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,12 @@ parsing:
c,
opts.blockTimeout,
func(c *server.Peer, ctx *connCtx) bool {
if ctx.nested {
setDirty(c)
c.WriteError("ERR XREADGROUP command is not allowed with BLOCK option from scripts")
return false
}

db := m.db(ctx.selectedDB)
res, err := xreadgroup(
db,
Expand Down Expand Up @@ -979,13 +985,15 @@ parsing:
c.WriteError(msgInvalidStreamID)
return
} else if id == "$" {
db := m.DB(getCtx(c).selectedDB)
stream, ok := db.streamKeys[opts.streams[i]]
if ok {
opts.ids[i] = stream.lastID()
} else {
opts.ids[i] = "0-0"
}
withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(getCtx(c).selectedDB)
stream, ok := db.streamKeys[opts.streams[i]]
if ok {
opts.ids[i] = stream.lastID()
} else {
opts.ids[i] = "0-0"
}
})
}
}
args = nil
Expand Down Expand Up @@ -1014,6 +1022,12 @@ parsing:
c,
opts.blockTimeout,
func(c *server.Peer, ctx *connCtx) bool {
if ctx.nested {
setDirty(c)
c.WriteError("ERR XREAD command is not allowed with BLOCK option from scripts")
return false
}

db := m.db(ctx.selectedDB)
res := xread(db, opts.streams, opts.ids, opts.count)
if len(res) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions integration/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ func TestScript(t *testing.T) {
c.Error("NOSCRIPT", "EVALSHA", sha1, "0")
})
})

t.Run("blocking", func(t *testing.T) {
testRaw(t, func(c *client) {
c.Do("XADD", "pl", "0-1", "name", "Mercury")
c.Do("EVAL", `redis.call("XINFO", "STREAM", "pl")`, "0")
c.Do("EVAL", `redis.call("XREAD", "STREAMS", "pl", "$")`, "0")
c.Error("not allowed with BLOCK option", "EVAL", `redis.call("XREAD", "BLOCK", "10", "STREAMS", "pl", "$")`, "0")
c.Error("not allowed with BLOCK option", "EVAL", `redis.call("XREADGROUP", "GROUP", "group", "consumer", "BLOCK", 1000, "STREAMS", "pl", ">")`, "0")
})
})
}

func TestLua(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ func blocking(
m.signal.Broadcast() // main loop might miss this signal
}()

m.Lock()
defer m.Unlock()
if !ctx.nested {
// this is a call via Lua's .call(). It's already locked.
m.Lock()
defer m.Unlock()
}
for {
if c.Closed() {
return
Expand Down

0 comments on commit 33a1bb4

Please sign in to comment.