Skip to content

Commit

Permalink
Merge branch 'dev' of github.com:nats-io/nats-server into lev-reload-…
Browse files Browse the repository at this point in the history
…by-message
  • Loading branch information
levb committed Jul 17, 2023
2 parents dfbfe99 + 40c5770 commit 98612b7
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 39 deletions.
15 changes: 7 additions & 8 deletions server/filestore.go
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
Expand All @@ -36,7 +35,7 @@ import (
"sync/atomic"
"time"

mrand "math/rand"
crand "crypto/rand"

"github.com/klauspost/compress/s2"
"github.com/minio/highwayhash"
Expand Down Expand Up @@ -588,7 +587,7 @@ func (fs *fileStore) genEncryptionKeys(context string) (aek cipher.AEAD, bek cip

const seedSize = 32
seed = make([]byte, seedSize)
if n, err := rand.Read(seed); err != nil || n != seedSize {
if n, err := crand.Read(seed); err != nil || n != seedSize {
return nil, nil, nil, nil, err
}

Expand All @@ -599,7 +598,7 @@ func (fs *fileStore) genEncryptionKeys(context string) (aek cipher.AEAD, bek cip

// Generate our nonce. Use same buffer to hold encrypted seed.
nonce := make([]byte, kek.NonceSize(), kek.NonceSize()+len(seed)+kek.Overhead())
mrand.Read(nonce)
crand.Read(nonce)

bek, err = genBlockEncryptionKey(sc, seed[:], nonce)
if err != nil {
Expand Down Expand Up @@ -653,7 +652,7 @@ func (fs *fileStore) writeStreamMeta() error {
// Encrypt if needed.
if fs.aek != nil {
nonce := make([]byte, fs.aek.NonceSize(), fs.aek.NonceSize()+len(b)+fs.aek.Overhead())
mrand.Read(nonce)
crand.Read(nonce)
b = fs.aek.Seal(nonce, nonce, b, nil)
}

Expand Down Expand Up @@ -3260,7 +3259,7 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error {

// Randomize record
data := make([]byte, rl-emptyRecordLen)
mrand.Read(data)
crand.Read(data)

// Now write to underlying buffer.
var b bytes.Buffer
Expand Down Expand Up @@ -7467,7 +7466,7 @@ func (o *consumerFileStore) encryptState(buf []byte) []byte {
}
// TODO(dlc) - Optimize on space usage a bit?
nonce := make([]byte, o.aek.NonceSize(), o.aek.NonceSize()+len(buf)+o.aek.Overhead())
mrand.Read(nonce)
crand.Read(nonce)
return o.aek.Seal(nonce, nonce, buf, nil)
}

Expand Down Expand Up @@ -7559,7 +7558,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
// Encrypt if needed.
if cfs.aek != nil {
nonce := make([]byte, cfs.aek.NonceSize(), cfs.aek.NonceSize()+len(b)+cfs.aek.Overhead())
mrand.Read(nonce)
crand.Read(nonce)
b = cfs.aek.Seal(nonce, nonce, b, nil)
}

Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -15,6 +15,7 @@ package server

import (
"bytes"
crand "crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
Expand Down Expand Up @@ -1206,7 +1207,7 @@ func (js *jetStream) monitorCluster() {

// Highwayhash key for generating hashes.
key := make([]byte, 32)
rand.Read(key)
crand.Read(key)

// Set to true to start.
js.setMetaRecovering()
Expand Down Expand Up @@ -4435,7 +4436,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {

// Highwayhash key for generating hashes.
key := make([]byte, 32)
rand.Read(key)
crand.Read(key)

// Hash of the last snapshot (fixed size in memory).
var lastSnap []byte
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_cluster_1_test.go
Expand Up @@ -19,6 +19,7 @@ package server
import (
"bytes"
"context"
crand "crypto/rand"
"encoding/json"
"fmt"
"math/rand"
Expand Down Expand Up @@ -5076,7 +5077,7 @@ func TestJetStreamClusterConsumerPerf(t *testing.T) {
}
toSend := 500000
msg := make([]byte, 64)
rand.Read(msg)
crand.Read(msg)

for i := 0; i < toSend; i++ {
nc.Publish("TEST", msg)
Expand Down
9 changes: 5 additions & 4 deletions server/jetstream_cluster_2_test.go
Expand Up @@ -19,6 +19,7 @@ package server
import (
"bytes"
"context"
crand "crypto/rand"
"encoding/binary"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -151,7 +152,7 @@ func TestJetStreamClusterMultiRestartBug(t *testing.T) {

// Send in 10000 messages.
msg, toSend := make([]byte, 4*1024), 10000
rand.Read(msg)
crand.Read(msg)

for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestJetStreamClusterServerLimits(t *testing.T) {
defer nc.Close()

msg, toSend := make([]byte, 4*1024), 5000
rand.Read(msg)
crand.Read(msg)

// Memory first.
max_mem := uint64(2*1024*1024) + uint64(len(msg))
Expand Down Expand Up @@ -331,7 +332,7 @@ func TestJetStreamClusterAckPendingWithExpired(t *testing.T) {

// Send in 100 messages.
msg, toSend := make([]byte, 256), 100
rand.Read(msg)
crand.Read(msg)

for i := 0; i < toSend; i++ {
if _, err = js.Publish("foo", msg); err != nil {
Expand Down Expand Up @@ -2701,7 +2702,7 @@ func TestJetStreamClusterLargeHeaders(t *testing.T) {

// We use u16 to encode msg header len. Make sure we do the right thing when > 65k.
data := make([]byte, 8*1024)
rand.Read(data)
crand.Read(data)
val := hex.EncodeToString(data)[:8*1024]
m := nats.NewMsg("foo")
for i := 1; i <= 10; i++ {
Expand Down
11 changes: 6 additions & 5 deletions server/jetstream_test.go
Expand Up @@ -19,6 +19,7 @@ package server
import (
"bytes"
"context"
crand "crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
Expand Down Expand Up @@ -381,7 +382,7 @@ func TestJetStreamConsumerAndStreamDescriptions(t *testing.T) {

// Test max.
data := make([]byte, JSMaxDescriptionLen+1)
rand.Read(data)
crand.Read(data)
bigDescr := base64.StdEncoding.EncodeToString(data)

_, err = acc.addStream(&StreamConfig{Name: "bar", Description: bigDescr})
Expand Down Expand Up @@ -3452,7 +3453,7 @@ func TestJetStreamConsumerRateLimit(t *testing.T) {

msgSize := 128 * 1024
msg := make([]byte, msgSize)
rand.Read(msg)
crand.Read(msg)

// 10MB
totalSize := 10 * 1024 * 1024
Expand Down Expand Up @@ -4446,7 +4447,7 @@ func TestJetStreamSnapshotsAPIPerf(t *testing.T) {

msg := make([]byte, 128*1024)
// If you don't give gzip some data will spend too much time compressing everything to zero.
rand.Read(msg)
crand.Read(msg)

for i := 0; i < 10000; i++ {
nc.Publish("snap-perf", msg)
Expand Down Expand Up @@ -7216,7 +7217,7 @@ func TestJetStreamPushConsumerFlowControl(t *testing.T) {

msgSize := 1024
msg := make([]byte, msgSize)
rand.Read(msg)
crand.Read(msg)

sendBatch := func(n int) {
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -20189,7 +20190,7 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) {

msgSize := 1024 * 1024 // 1MB
msg := make([]byte, msgSize)
rand.Read(msg)
crand.Read(msg)

for i := 0; i < 20; i++ {
_, err := js.Publish("foo", msg)
Expand Down
6 changes: 3 additions & 3 deletions server/memstore.go
Expand Up @@ -14,9 +14,9 @@
package server

import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -1060,11 +1060,11 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
if secure {
if len(sm.hdr) > 0 {
sm.hdr = make([]byte, len(sm.hdr))
rand.Read(sm.hdr)
crand.Read(sm.hdr)
}
if len(sm.msg) > 0 {
sm.msg = make([]byte, len(sm.msg))
rand.Read(sm.msg)
crand.Read(sm.msg)
}
sm.seq, sm.ts = 0, 0
}
Expand Down
4 changes: 2 additions & 2 deletions server/nkey.go
Expand Up @@ -14,7 +14,7 @@
package server

import (
"crypto/rand"
crand "crypto/rand"
"encoding/base64"
)

Expand Down Expand Up @@ -42,6 +42,6 @@ func (s *Server) nonceRequired() bool {
func (s *Server) generateNonce(n []byte) {
var raw [nonceRawLen]byte
data := raw[:]
rand.Read(data)
crand.Read(data)
base64.RawURLEncoding.Encode(n, data)
}
18 changes: 9 additions & 9 deletions server/norace_test.go
Expand Up @@ -77,7 +77,7 @@ func TestNoRaceAvoidSlowConsumerBigMessages(t *testing.T) {
defer nc2.Close()

data := make([]byte, 1024*1024) // 1MB payload
rand.Read(data)
crand.Read(data)

expected := int32(500)
received := int32(0)
Expand Down Expand Up @@ -1913,7 +1913,7 @@ func TestNoRaceJetStreamClusterSourcesMuxd(t *testing.T) {

// Send in 10000 messages.
msg, toSend := make([]byte, 1024), 10000
rand.Read(msg)
crand.Read(msg)

var sources []*nats.StreamSource
// Create 10 origin streams.
Expand Down Expand Up @@ -2355,7 +2355,7 @@ func TestNoRaceJetStreamSuperClusterRIPStress(t *testing.T) {
}

msg := make([]byte, 1024)
rand.Read(msg)
crand.Read(msg)

// 10 minutes
expires := time.Now().Add(480 * time.Second)
Expand Down Expand Up @@ -2540,7 +2540,7 @@ func TestNoRaceJetStreamFileStoreBufferReuse(t *testing.T) {

m := nats.NewMsg("foo")
m.Data = make([]byte, 8*1024)
rand.Read(m.Data)
crand.Read(m.Data)

start := time.Now()
for i := 0; i < toSend; i++ {
Expand Down Expand Up @@ -3108,7 +3108,7 @@ func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) {

toSend := 10_000
data := make([]byte, 4*1024)
rand.Read(data)
crand.Read(data)

// First one.
js.PublishAsync("KV.FM", data)
Expand Down Expand Up @@ -3168,7 +3168,7 @@ func TestNoRaceJetStreamEncryptionEnabledOnRestartWithExpire(t *testing.T) {
}

data := make([]byte, 4*1024) // 4K payload
rand.Read(data)
crand.Read(data)

for i := 0; i < toSend; i++ {
js.PublishAsync("foo", data)
Expand Down Expand Up @@ -6823,7 +6823,7 @@ func TestNoRaceJetStreamClusterF3Setup(t *testing.T) {
eventTypes := []string{"PAYMENT", "SUBMISSION", "CANCEL"}

msg := make([]byte, 2*1024) // 2k payload
rand.Read(msg)
crand.Read(msg)

// For tracking pub times.
var pubs int
Expand Down Expand Up @@ -7066,7 +7066,7 @@ func TestNoRaceJetStreamClusterDifferentRTTInterestBasedStreamSetup(t *testing.T
wg := sync.WaitGroup{}

msg := make([]byte, 2*1024) // 2k payload
rand.Read(msg)
crand.Read(msg)

// Publishers.
for i := 0; i < numPublishers; i++ {
Expand Down Expand Up @@ -7284,7 +7284,7 @@ func TestNoRaceJetStreamClusterInterestStreamConsistencyAfterRollingRestart(t *t
wg.Wait()

msg := make([]byte, 2*1024) // 2k payload
rand.Read(msg)
crand.Read(msg)

// Controls if publishing is on or off.
var pubActive atomic.Bool
Expand Down
6 changes: 3 additions & 3 deletions server/websocket.go
Expand Up @@ -15,7 +15,7 @@ package server

import (
"bytes"
"crypto/rand"
crand "crypto/rand"
"crypto/sha1"
"crypto/tls"
"encoding/base64"
Expand Down Expand Up @@ -545,7 +545,7 @@ func wsFillFrameHeader(fh []byte, useMasking, first, final, compressed bool, fra
var key []byte
if useMasking {
var keyBuf [4]byte
if _, err := io.ReadFull(rand.Reader, keyBuf[:4]); err != nil {
if _, err := io.ReadFull(crand.Reader, keyBuf[:4]); err != nil {
kv := mrand.Int31()
binary.LittleEndian.PutUint32(keyBuf[:4], uint32(kv))
}
Expand Down Expand Up @@ -958,7 +958,7 @@ func wsAcceptKey(key string) string {

func wsMakeChallengeKey() (string, error) {
p := make([]byte, 16)
if _, err := io.ReadFull(rand.Reader, p); err != nil {
if _, err := io.ReadFull(crand.Reader, p); err != nil {
return _EMPTY_, err
}
return base64.StdEncoding.EncodeToString(p), nil
Expand Down
4 changes: 2 additions & 2 deletions test/norace_test.go
Expand Up @@ -18,9 +18,9 @@ package test

import (
"context"
crand "crypto/rand"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -687,7 +687,7 @@ func TestNoRaceSlowProxy(t *testing.T) {
// Now test send BW.
const payloadSize = 64 * 1024
var payload [payloadSize]byte
rand.Read(payload[:])
crand.Read(payload[:])

// 5MB total.
bytesSent := (5 * 1024 * 1024)
Expand Down

0 comments on commit 98612b7

Please sign in to comment.