Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于 10000 个 websocket 处理数据使用 nutsdb 缓存状态 #567

Open
xs23933 opened this issue Dec 24, 2023 · 0 comments
Open

关于 10000 个 websocket 处理数据使用 nutsdb 缓存状态 #567

xs23933 opened this issue Dec 24, 2023 · 0 comments

Comments

@xs23933
Copy link

xs23933 commented Dec 24, 2023

Describe the bug
在微信群中有讨论过这个问题
使用默认配置 defaultOptions

大约 10000 个 websocket 客户端, 每个客户端每 5 秒轮询一次读取任务状态符合状态则执行相对状态,并更新数据

To Reproduce
Steps to reproduce the behavior(Be specific!):

  1. 服务器腾讯云 具体情况
    16 核 64G 云服务器
## 服务器配置
  # ulimit -n
700001

## 进程配置
#cat /proc/26006/limits
Limit                     Soft Limit           Hard Limit           Units
Max cpu time              unlimited            unlimited            seconds
Max file size             unlimited            unlimited            bytes
Max data size             unlimited            unlimited            bytes
Max stack size            8388608              unlimited            bytes
Max core file size        0                    unlimited            bytes
Max resident set          unlimited            unlimited            bytes
Max processes             700000               700000               processes
Max open files            700002               700002               files
Max locked memory         65536                65536                bytes
Max address space         unlimited            unlimited            bytes
Max file locks            unlimited            unlimited            locks
Max pending signals       254557               254557               signals
Max msgqueue size         819200               819200               bytes
Max nice priority         0                    0
Max realtime priority     0                    0
Max realtime timeout      unlimited            unlimited            us

## 内存情况
# free -h
              total        used        free      shared  buff/cache   available
Mem:            62G        2.0G         52G        740K        7.8G         59G
Swap:            0B          0B          0B

## 当前连接数
# lsof -p 26006 |wc -l
22051
  1. 使用默认配置信息连接数据库
       CacheDB, err = nutsdb.Open(
		nutsdb.DefaultOptions,
		nutsdb.WithDir("./tasksdb"),
		nutsdb.WithSyncEnable(false),
	)
	if err != nil {
		panic("init cacheDB failed %v" + err.Error())
	}
  1. 每个 websocket 5 秒钟轮询查询任务变化情况
 先载入 nutsdb 的保存的数据
      st := time.Now()
     defer func() {
            fmt.Printf("载入耗时 %s", time.Since(st)) // 执行结果大约是小于 10ms
      }()
       err = CacheDB.View(func(tx *nutsdb.Tx) error {
			v, err := tx.Get(bucket, key)
			if err != nil {
				return err
			}
			defer func() { core.Info("Load bucket: %s, key: %s %s", bucket, key, time.Since(t)) }()
			tb := time.Now()
			err = sonic.Unmarshal(v.Value, m)
			return err
		})

执行发送某些websocket后立即保存 新的状态参数

      
func (m *LivesTasksView) SaveCache(bucket string) error {
	st := time.Now()
	key := []byte(utils.MD5(fmt.Sprintf("%s_%s_%s", m.LiveId, m.ID, m.TaskAccount.AccountId)))
	core.Info("1 Save bucket: %s, key: %s %s", bucket, key, time.Since(st))
	st = time.Now()
	err := CacheDB.Update(func(tx *nutsdb.Tx) error {
		value, err := sonic.Marshal(m)
		if err != nil {
			return err
		}
		return tx.Put(utils.MD5(bucket), key, value, 3600)
	})
	fmt.Printf("2 Save bucket: %s, key: %s %s", bucket, key, time.Since(st)) // 这里执行结果随时间推移 最小 5s - 20s 不等
	return err
}
  1. 现在已经改为 sync.Map 存储上面数据
当前写入速度 1.32us - 4.4us, 缺点是要自己去持久性存储
func (c *ConnManager) LoadTaskStats(liveId, taskId, uid core.UUID, st time.Time) (any, error) {
	key := fmt.Sprintf("%s_%s_%s", liveId, taskId, uid)
	if v, ok := c.taskStats.Load(key); ok {
		return v, nil
	}

	return nil, errors.New("not cache")
}

func (c *ConnManager) SaveTaskStats(liveId, taskId, uid core.UUID, stats any) error {
	t := time.Now()
	defer func() {
		core.Info("save task stats %s", time.Since(t))
	}()
	c.taskStats.Store(fmt.Sprintf("%s_%s_%s", liveId, taskId, uid), stats)
	return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant