Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: redis/rueidis
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.0.25
Choose a base ref
...
head repository: redis/rueidis
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.0.26
Choose a head ref
  • 13 commits
  • 11 files changed
  • 4 contributors

Commits on Dec 16, 2023

  1. Remove mutable globals

    ash2k committed Dec 16, 2023

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    ash2k Mikhail Mazurskiy
    Copy the full SHA
    3b962e6 View commit details

Commits on Dec 17, 2023

  1. Handle errors explicitly

    ash2k committed Dec 17, 2023

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    ash2k Mikhail Mazurskiy
    Copy the full SHA
    cfc8c74 View commit details
  2. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    ash2k Mikhail Mazurskiy
    Copy the full SHA
    a4d059b View commit details
  3. Mark WithClient as deprecated

    ash2k committed Dec 17, 2023

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    ash2k Mikhail Mazurskiy
    Copy the full SHA
    5479ac3 View commit details
  4. Merge pull request #428 from ash2k/otel-tweaks

    OTEL tweaks
    rueian authored Dec 17, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    65f705a View commit details

Commits on Dec 22, 2023

  1. Verified

    This commit was signed with the committer’s verified signature.
    rueian Rueian
    Copy the full SHA
    0249ec8 View commit details

Commits on Dec 23, 2023

  1. Verified

    This commit was signed with the committer’s verified signature.
    rueian Rueian
    Copy the full SHA
    a81d7e4 View commit details
  2. Merge pull request #433 from redis/fix-zpopmax-count

    fix: rueidiscompat's ZPopMax and ZPopMin with count 1
    rueian authored Dec 23, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    3ec15df View commit details

Commits on Dec 25, 2023

  1. fix: send READONLY to replicas when SendToReplicas option is set (#430)

    * fix: send readonly command
    
    * refactor: rollback
    
    * refactor: rollback
    
    * BREAKING: remove error
    
    * refactor: send readonly command in _refresh
    
    * refactor: use shallow copy
    
    * style: unify style
    
    * refactor: call conn function
    
    * fix: wrong code
    
    * refactor: default to read-only mode
    
    * refactor: change more simpler
    
    * refactor: rollback
    
    * fix: check send replicas
    proost authored Dec 25, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    b611f0c View commit details

Commits on Dec 26, 2023

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    a85dd76 View commit details

Commits on Dec 28, 2023

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    07e32e7 View commit details

Commits on Dec 29, 2023

  1. Verified

    This commit was signed with the committer’s verified signature.
    rueian Rueian
    Copy the full SHA
    06f2f2e View commit details
  2. feat: bump v1.0.26

    rueian committed Dec 29, 2023

    Verified

    This commit was signed with the committer’s verified signature.
    rueian Rueian
    Copy the full SHA
    50bfe8f View commit details
Showing with 124 additions and 55 deletions.
  1. +1 −1 README.md
  2. +28 −14 cluster.go
  3. +22 −4 cluster_test.go
  4. +2 −2 internal/cmds/cmds.go
  5. +11 −11 pipe.go
  6. +7 −0 rueidis.go
  7. +10 −0 rueidis_test.go
  8. +2 −6 rueidiscompat/adapter.go
  9. +8 −1 rueidisotel/README.md
  10. +27 −15 rueidisotel/metrics.go
  11. +6 −1 rueidisotel/trace.go
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -460,7 +460,7 @@ module mymodule
go 1.18
require github.com/redis/rueidis v1.0.25-go1.18
require github.com/redis/rueidis v1.0.26-go1.18
```

## Contributing
42 changes: 28 additions & 14 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -71,17 +71,18 @@ var retrycachep = util.NewPool(func(capacity int) *retrycache {
})

type clusterClient struct {
pslots [16384]conn
rslots []conn
opt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
pslots [16384]conn
rslots []conn
opt *ClientOption
replicaOpt *ClientOption
conns map[string]connrole
connFn connFn
sc call
mu sync.RWMutex
stop uint32
cmd Builder
retry bool
aws bool
}

// NOTE: connrole and conn must be initialized at the same time
@@ -93,8 +94,8 @@ type connrole struct {
func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient, err error) {
client = &clusterClient{
cmd: cmds.NewBuilder(cmds.InitSlot),
opt: opt,
connFn: connFn,
opt: opt,
conns: make(map[string]connrole),
retry: !opt.DisableRetry,
aws: len(opt.InitAddress) == 1 && strings.Contains(opt.InitAddress[0], "amazonaws.com"),
@@ -104,6 +105,12 @@ func newClusterClient(opt *ClientOption, connFn connFn) (client *clusterClient,
return nil, ErrReplicaOnlyConflict
}

if opt.SendToReplicas != nil {
replicaOpt := *opt
replicaOpt.ReplicaOnly = true
client.replicaOpt = &replicaOpt
}

client.connFn = func(dst string, opt *ClientOption) conn {
cc := connFn(dst, opt)
cc.SetOnCloseHook(func(err error) {
@@ -225,7 +232,13 @@ func (c *clusterClient) _refresh() (err error) {
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
var cc conn
if c.opt.SendToReplicas != nil {
cc = c.connFn(addr, c.replicaOpt)
} else {
cc = c.connFn(addr, c.opt)
}
conns[addr] = connrole{conn: cc, replica: true}
}
}
// make sure InitAddress always be present
@@ -241,7 +254,8 @@ func (c *clusterClient) _refresh() (err error) {

c.mu.RLock()
for addr, cc := range c.conns {
if fresh, ok := conns[addr]; ok {
fresh, ok := conns[addr]
if ok && (cc.replica == fresh.replica || c.opt.SendToReplicas == nil) {
conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
26 changes: 22 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
@@ -861,12 +861,21 @@ func TestClusterClientInit(t *testing.T) {
})

t.Run("Refresh cluster which has multi nodes per shard with SendToReplica option", func(t *testing.T) {
m := &mockConn{
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
return RedisResult{
err: errors.New("unexpected call"),
}
},
}
replicaNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return RedisResult{
err: errors.New("unexpected call"),
}
},
}

@@ -878,8 +887,17 @@ func TestClusterClientInit(t *testing.T) {
},
},
func(dst string, opt *ClientOption) conn {
copiedM := *m
return &copiedM
if dst == "127.0.0.1:0" || dst == "127.0.2.1:0" {
if opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in primary node")
}
return primaryNodeConn
} else {
if !opt.ReplicaOnly {
t.Fatalf("unexpected replicaOnly option in replica node")
}
return replicaNodeConn
}
},
)
if err != nil {
4 changes: 2 additions & 2 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
@@ -93,8 +93,8 @@ type Incomplete struct {
// Completed represents a completed Redis command, should be created by the Build() of command builder.
type Completed struct {
cs *CommandSlice
cf uint16
ks uint16
cf uint16 // cmd flag
ks uint16 // key slot
}

// Pin prevents a Completed to be recycled
22 changes: 11 additions & 11 deletions pipe.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import (
)

const LibName = "rueidis"
const LibVer = "1.0.25"
const LibVer = "1.0.26"

var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")

@@ -83,19 +83,19 @@ var _ wire = (*pipe)(nil)
type pipe struct {
conn net.Conn
error atomic.Value
clhks atomic.Value
pshks atomic.Value
clhks atomic.Value // closed hook, invoked after the conn is closed
pshks atomic.Value // pubsub hook, registered by the SetPubSubHooks
queue queue
cache CacheStore
r *bufio.Reader
w *bufio.Writer
close chan struct{}
onInvalidations func([]RedisMessage)
r2psFn func() (p *pipe, err error)
r2pipe *pipe
ssubs *subs
nsubs *subs
psubs *subs
r2psFn func() (p *pipe, err error) // func to build pipe for resp2 pubsub
r2pipe *pipe // internal pipe for resp2 pubsub only
ssubs *subs // pubsub smessage subscriptions
nsubs *subs // pubsub message subscriptions
psubs *subs // pubsub pmessage subscriptions
info map[string]RedisMessage
timeout time.Duration
pinggap time.Duration
@@ -108,7 +108,7 @@ type pipe struct {
state int32
waits int32
recvs int32
r2ps bool
r2ps bool // identify this pipe is used for resp2 pubsub or not
}

func newPipe(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) {
@@ -224,7 +224,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
}
if err != nil {
if init[i][0] == "READONLY" {
// igore READONLY command error
// ignore READONLY command error
continue
}
if re, ok := err.(*RedisError); ok {
@@ -290,7 +290,7 @@ func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool)
defer resultsp.Put(resp)
for i, r := range resp.s[:len(resp.s)-2] { // skip error checking on the last CLIENT SETINFO
if init[i][0] == "READONLY" {
// igore READONLY command error
// ignore READONLY command error
continue
}
if err = r.Error(); err != nil {
7 changes: 7 additions & 0 deletions rueidis.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,8 @@ const (
DefaultReadBuffer = 1 << 19
// DefaultWriteBuffer is the default value of bufio.NewWriterSize for each connection, which is 0.5MiB
DefaultWriteBuffer = 1 << 19
// MaxPipelineMultiplex is the maximum meaningful value for ClientOption.PipelineMultiplex
MaxPipelineMultiplex = 8
)

var (
@@ -46,6 +48,8 @@ var (
// ErrReplicaOnlyNotSupported means ReplicaOnly flag is not supported by
// current client
ErrReplicaOnlyNotSupported = errors.New("ReplicaOnly is not supported for single client")
// ErrWrongPipelineMultiplex means wrong value for ClientOption.PipelineMultiplex
ErrWrongPipelineMultiplex = errors.New("ClientOption.PipelineMultiplex must not be bigger than MaxPipelineMultiplex")
)

// ClientOption should be passed to NewClient to construct a Client
@@ -314,6 +318,9 @@ func NewClient(option ClientOption) (client Client, err error) {
option.InitAddress[i], option.InitAddress[j] = option.InitAddress[j], option.InitAddress[i]
})
}
if option.PipelineMultiplex > MaxPipelineMultiplex {
return nil, ErrWrongPipelineMultiplex
}
if option.Sentinel.MasterSet != "" {
option.PipelineMultiplex = singleClientMultiplex(option.PipelineMultiplex)
return newSentinelClient(&option, makeConn)
10 changes: 10 additions & 0 deletions rueidis_test.go
Original file line number Diff line number Diff line change
@@ -315,6 +315,16 @@ func TestTLSClient(t *testing.T) {
<-done
}

func TestNewClientMaxMultiplex(t *testing.T) {
_, err := NewClient(ClientOption{
InitAddress: []string{"127.0.0.1:6379"},
PipelineMultiplex: MaxPipelineMultiplex + 1,
})
if err != ErrWrongPipelineMultiplex {
t.Fatalf("unexpected error %v", err)
}
}

func TestSingleClientMultiplex(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
option := ClientOption{}
8 changes: 2 additions & 6 deletions rueidiscompat/adapter.go
Original file line number Diff line number Diff line change
@@ -2028,9 +2028,7 @@ func (c *Compat) ZPopMax(ctx context.Context, key string, count ...int64) *ZSlic
resp = c.client.Do(ctx, c.client.B().Zpopmax().Key(key).Build())
case 1:
resp = c.client.Do(ctx, c.client.B().Zpopmax().Key(key).Count(count[0]).Build())
if count[0] > 1 {
return newZSliceCmd(resp)
}
return newZSliceCmd(resp)
default:
panic("too many arguments")
}
@@ -2044,9 +2042,7 @@ func (c *Compat) ZPopMin(ctx context.Context, key string, count ...int64) *ZSlic
resp = c.client.Do(ctx, c.client.B().Zpopmin().Key(key).Build())
case 1:
resp = c.client.Do(ctx, c.client.B().Zpopmin().Key(key).Count(count[0]).Build())
if count[0] > 1 {
return newZSliceCmd(resp)
}
return newZSliceCmd(resp)
default:
panic("too many arguments")
}
9 changes: 8 additions & 1 deletion rueidisotel/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
# OpenTelemetry Tracing
# OpenTelemetry Tracing & Connection Metrics

Use `rueidisotel.WithClient` to create a client with OpenTelemetry Tracing enabled.

Use `rueidisotel.NewClient` to create a client with OpenTelemetry Tracing and Connection Metrics enabled.
default metrics are:
- `rueidis_dial_attempt`: number of dial attempts
- `rueidis_dial_success`: number of successful dials
- `rueidis_dial_conns`: number of connections
- `rueidis_dial_latency`: dial latency in seconds

```golang
package main

42 changes: 27 additions & 15 deletions rueidisotel/metrics.go
Original file line number Diff line number Diff line change
@@ -14,15 +14,9 @@ import (
)

var (
DefaultHistogramBuckets = []float64{
defaultHistogramBuckets = []float64{
.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10,
}
DefaultDialFn = func(dst string, dialer *net.Dialer, cfg *tls.Config) (conn net.Conn, err error) {
if cfg != nil {
return tls.DialWithDialer(dialer, "tcp", dst, cfg)
}
return dialer.Dial("tcp", dst)
}
)

type HistogramOption struct {
@@ -44,10 +38,13 @@ func WithHistogramOption(histogramOption HistogramOption) Option {
// - rueidis_dial_conns: number of active connections
// - rueidis_dial_latency: dial latency in seconds
func NewClient(clientOption rueidis.ClientOption, opts ...Option) (rueidis.Client, error) {
oclient := newClient(opts...)
oclient, err := newClient(opts...)
if err != nil {
return nil, err
}

if clientOption.DialFn == nil {
clientOption.DialFn = DefaultDialFn
clientOption.DialFn = defaultDialFn
}

attempt, err := oclient.meter.Int64Counter("rueidis_dial_attempt")
@@ -90,13 +87,13 @@ func NewClient(clientOption rueidis.ClientOption, opts ...Option) (rueidis.Clien
return oclient, nil
}

func newClient(opts ...Option) *otelclient {
func newClient(opts ...Option) (*otelclient, error) {
cli := &otelclient{}
for _, opt := range opts {
opt(cli)
}
if cli.histogramOption.Buckets == nil {
cli.histogramOption.Buckets = DefaultHistogramBuckets
cli.histogramOption.Buckets = defaultHistogramBuckets
}
if cli.meterProvider == nil {
cli.meterProvider = otel.GetMeterProvider() // Default to global MeterProvider
@@ -109,9 +106,16 @@ func newClient(opts ...Option) *otelclient {
cli.meter = cli.meterProvider.Meter(name)
cli.tracer = cli.tracerProvider.Tracer(name)
// Now create the counters using the meter
cli.cscMiss, _ = cli.meter.Int64Counter("rueidis_do_cache_miss")
cli.cscHits, _ = cli.meter.Int64Counter("rueidis_do_cache_hits")
return cli
var err error
cli.cscMiss, err = cli.meter.Int64Counter("rueidis_do_cache_miss")
if err != nil {
return nil, err
}
cli.cscHits, err = cli.meter.Int64Counter("rueidis_do_cache_hits")
if err != nil {
return nil, err
}
return cli, nil
}

func trackDialing(
@@ -132,7 +136,8 @@ func trackDialing(
return nil, err
}

dialLatency.Record(ctx, time.Since(start).Seconds())
// Use floating point division for higher precision (instead of Seconds method).
dialLatency.Record(ctx, float64(time.Since(start))/float64(time.Second))
success.Add(ctx, 1)
conns.Add(ctx, 1)

@@ -157,3 +162,10 @@ func (t *connTracker) Close() error {

return t.Conn.Close()
}

func defaultDialFn(dst string, dialer *net.Dialer, cfg *tls.Config) (conn net.Conn, err error) {
if cfg != nil {
return tls.DialWithDialer(dialer, "tcp", dst, cfg)
}
return dialer.Dial("tcp", dst)
}
7 changes: 6 additions & 1 deletion rueidisotel/trace.go
Original file line number Diff line number Diff line change
@@ -22,8 +22,13 @@ var (
var _ rueidis.Client = (*otelclient)(nil)

// WithClient creates a new rueidis.Client with OpenTelemetry tracing enabled.
//
// Deprecated: use NewClient() instead.
func WithClient(client rueidis.Client, opts ...Option) rueidis.Client {
cli := newClient(opts...)
cli, err := newClient(opts...)
if err != nil {
panic(err)
}
cli.client = client
return cli
}