Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XReadGroup sometimes returns an i/o timeout error when no message is read #2963

Open
danielnelson opened this issue Apr 5, 2024 · 0 comments

Comments

@danielnelson
Copy link

When using Redis streams with multiple consumers, I get an i/o timeout on consumers that don't read a message. This seems to occur when another consumer did read a message but due to low traffic this consumer reached the BLOCK timeout. If none of the consumers at all read a message I get a redis.Nil error.

Expected Behavior

When no item is read from the stream the consumer should always get a redis.Nil error.

Current Behavior

In the script below I receive i/o timeout errors:

read tcp 127.0.0.1:47842->127.0.0.1:6379: i/o timeout 40.01311409s

In addition to the error type not being redis.Nil, it takes about 10s extra for the command to complete. I also imagine that the connection is no longer reused.

Possible Solution

Steps to Reproduce

In this script I have 5 stream consumers each doing a 30s blocking read. There is one goroutine adding a single item with a random 0-60s delay:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	rdb := redis.NewClient(&redis.Options{})

	ctx := context.Background()
	stream := "timeout-stream"
	group := "timeout-group"

	_, err := rdb.XGroupCreateMkStream(ctx, stream, group, "0").Result()
	if err != nil && !strings.HasPrefix(err.Error(), "BUSYGROUP") {
		fmt.Println(err)
		os.Exit(1)
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			v := rand.Intn(60)
			time.Sleep(time.Duration(v) * time.Second)

			fmt.Println("XADD")
			_, err = rdb.XAdd(ctx, &redis.XAddArgs{
				Stream: stream,
				ID:     "*",
				Values: []string{"mykey", "myvalue"},
			}).Result()

			if err != nil {
				fmt.Println(err)
				os.Exit(1)
			}
		}
	}()

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			consumer := fmt.Sprintf("consumer-%d", i)
			for {
				start := time.Now()
				item, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
					Group:    group,
					Consumer: consumer,
					Streams:  []string{stream, ">"},
					Count:    1,
					Block:    30 * time.Second,
				}).Result()
				if err != nil {
					fmt.Printf("%s %s\n", err, time.Since(start))
					continue
				}

				rdb.XAck(ctx, stream, group, item[0].Messages[0].ID)
				fmt.Println("XACK")
			}
		}()
	}

	wg.Wait()
}

Example Output:

XADD
XACK
read tcp 127.0.0.1:46528->127.0.0.1:6379: i/o timeout 40.018844088s
read tcp 127.0.0.1:46540->127.0.0.1:6379: i/o timeout 40.019074019s
read tcp 127.0.0.1:46530->127.0.0.1:6379: i/o timeout 40.019142248s
read tcp 127.0.0.1:46536->127.0.0.1:6379: i/o timeout 40.01921703s
redis: nil 30.095551148s
redis: nil 30.005739055s
redis: nil 30.005510956s
redis: nil 30.005496188s
redis: nil 30.005690323s
XADD
XACK
XADD
XACK
XADD
XACK
XADD
XACK
XADD
XACK
read tcp 127.0.0.1:46516->127.0.0.1:6379: i/o timeout 40.001276509s
read tcp 127.0.0.1:54476->127.0.0.1:6379: i/o timeout 40.001512971s
XADD
XACK                                                
read tcp 127.0.0.1:35944->127.0.0.1:6379: i/o timeout 40.014963904s
XADD                                                
XACK                                                
XADD                                                
XACK                                                
XADD                                                
XACK                                                
read tcp 127.0.0.1:54470->127.0.0.1:6379: i/o timeout 40.001857285s
XADD                                                
XACK                                                
XADD                                                
XACK                                                
read tcp 127.0.0.1:58234->127.0.0.1:6379: i/o timeout 40.008846243s
read tcp 127.0.0.1:45494->127.0.0.1:6379: i/o timeout 40.009956562s
read tcp 127.0.0.1:49356->127.0.0.1:6379: i/o timeout 40.001642322s
redis: nil 30.06908248s                             
redis: nil 32.071284132s

Context (Environment)

Detailed Description

Possible Implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant