Skip to content

Latest commit

 

History

History
250 lines (205 loc) · 7.55 KB

README.md

File metadata and controls

250 lines (205 loc) · 7.55 KB

通用 grpc 流式消息内容读写器 gstream

通用 grpc 流式消息内容读写器。

  • authored by sliveryou

背景

在一份 grpc message 消息的定义中,往往会有较大体量的元数据:

// UploadFileReq 上传文件请求
message UploadFileReq {
    int64 user_id = 1;
    string file_name = 2;
    string file_type = 3;
    string file_hash = 4;
    int64 file_size = 5;
    bytes file_data = 6;
}

// 其中 file_data 就是较大的文件二进制数据

google.golang.org/grpc@v1.29.1/server.go 中,服务端接收的最大消息字节数的被设置为了 4MB,且其他语言的 grpc 消息接收限制大抵也是如此:

const (
	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
)

如果想传输较大体量的消息,一般有两种策略:

  1. 修改消息的阈值:
    通过 grpc.MaxCallRecvMsgSize(bytes int)grpc.MaxCallSendMsgSize(bytes int) 设置,最大不能超过 2GB
  2. 流式消息传输:
    在对应 rpc 定义,将较大体量的消息前添加 stream 关键字,如 rpc UploadFile (stream UploadFileReq) returns (UploadFileResp); // UploadFile 上传文件

grpc 官方建议一次消息传输的最大字节不超过 4MB,所以传输较大消息时,最好是选择流式传输。

设计思路

一般的客户端消息流式发送:

// 伪代码

func cliDemo() {
	f, err := os.Open("test.pdf")
	if err != nil {
		panic(err)
	}
	defer f.Close()

	cli, err := flieclient.UploadFile(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.CloseSend()

	// 首次发送,不传输文件内容,只传输文件相关信息
	err = cli.Send(&fileclient.UploadFileReq{
		FileName: f.Filename,
		FileType: f.FileType,
		FileHash: f.FileHash,
		FileSize: f.Size,
	})
	if err != nil {
		panic(err)
	}

	// 定义一个 buf,从源文件不断读取数据到 buf,而后发送消息
	chunkSize := (3 << 20) + (1 << 19) // 3.5MB
	buf := make([]byte, chunkSize)
	for {
		n, err := f.Read(buf)
		if err != nil {
			if err == io.EOF {
				break
			}
			panic(err)
		}

		// 后续发送,不传输文件相关信息,只传输文件内容
		err = cli.Send(&fileclient.UploadFileReq{FileData: buf[:n]})
		if err != nil {
			panic(err)
		}
	}

	resp, err := cli.CloseAndRecv()
	if err != nil {
		panic(err)
	}
}

一般的服务端消息流式接收:

// 伪代码

func svrDemo(svr file.File_UploadFileServer) {
	// 首次接收,获取文件相关信息
	fi, err := svr.Recv()
	if err != nil {
		panic(err)
	}

	// 定义一个 buf,不断将消息中的元数据写入其中
	var buf bytes.Buffer
	for {
		req, err := svr.Recv()
		if err != nil {
			if err == io.EOF {
				break
			}
			panic(err)
		}
		_, err := io.Copy(buf, bytes.NewBuffer(req.FileData))
		if err != nil {
			panic(err)
		}
	}
	
	// 后续操作
	key := fmt.Sprintf("common/%s.%s", fi.FileHash, fi.FileType)
	oss.PutObject(key, buf)
}

主要构建逻辑:

  1. 客户端不断从源数据读取一定的数据分块,再构建成消息体,Send 消息到服务端,直到数据读完了,则发送 CloseAndRecv 信号,等待服务端的回复;
  2. 服务端不断 Recv 消息,抽取出其中的分块元数据,可以将其整合到 buf 中或者临时文件里,等待下一步处理。

这种模式的主要缺点:

  1. 会发现流式传输好像都是一样代码逻辑,但是却具有业务的特征(特定消息结构体,业务相关),无法单独抽象出来
  2. 服务端每一次在循环中接收都是完整的消息结构,然后抽取其中的元数据将其转化成 io.Reader,给相关 io.Writer 调用,
    要知道,一般需要流式传输的数据往往是较大的文件二进制数据,如很大的视频或者图片等,为了在 grpc 中传输,被客户端切割,
    然后被服务端接收所拼凑还原,在拼凑还原的过程中,存在一个中间态,是把前部分的数据放在内存里呢? 还是生成一个临时文件,将数据存放在其中呢?

在对接文件对象存储的业务中,我设计了一个 OSS 通用客户端接口来对接阿里云、华为云和腾讯云的对象存储服务:

// OSS 客户端接口
type OSS interface {
	// Cloud 获取云服务商名称
	Cloud() string
	// GetURL 获取对象在 OSS 上的完整访问 URL
	GetURL(key string) string
	// GetObject 获取对象在 OSS 的存储数据
	GetObject(key string) (io.ReadCloser, error)
	// PutObject 上传对象至 OSS
	PutObject(key string, reader io.Reader) (string, error)
	// DeleteObjects 批量删除 OSS 上的对象
	DeleteObjects(keys ...string) error
	// UploadFile 上传文件至 OSS,filePath:文件路径,partSize:分块大小(字节),routines:并发数
	UploadFile(key, filePath string, partSize int64, routines int) (string, error)
	// AuthorizedUpload 授权上传至 OSS,expires:过期时间(秒)
	AuthorizedUpload(key string, expires int) (string, error)
	// GetThumbnailSuffix 获取缩略图后缀,如果只传一个值则进行等比缩放,两个值都传时会强制缩放,可能会导致图片变形
	GetThumbnailSuffix(width, height int, size int64) string
}

将上传对象至 OSS 设计成 PutObject(key string, reader io.Reader) (string, error)
而不是 PutObject(key string, buf []bytes) (string, error) 的原因显而易见:边收边传尽量减少中间态才是更好的传输方案。

使用说明

客户端消息流式发送:

// 伪代码

func cliDemo() {
	f, err := os.Open("test.pdf")
	if err != nil {
		panic(err)
	}
	defer f.Close()
	
	cli, err := flieclient.UploadFile(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.CloseSend()
	
	 // 首次发送,不传输文件内容,只传输文件相关信息
	err = cli.Send(&fileclient.UploadFileReq{
		FileName: f.Filename,
		FileType: f.FileType,
		FileHash: f.FileHash,
		FileSize: f.Size,
	})
	if err != nil {
		panic(err)
	}
	
	chunkSize := (3 << 20) + (1 << 19) // 3.5MB
	// 新建 grpc 流式消息内容写入器,传入客户端对象、消息请求体对象、指定传输消息字段和传输消息块大小
	writer := gstream.MustNewStreamWriter(cli, &fileclient.UploadFileReq{}, "FileData", chunkSize)
	_, err = io.Copy(writer, f)
	if err != nil {
		panic(err)
	}
	err = writer.Close()
	if err != nil {
		panic(err)
	}
	
	resp, err := cli.CloseAndRecv()
	if err != nil {
		panic(err)
	}
}

服务端消息流式接收:

// 伪代码

func svrDemo(svr file.File_UploadFileServer) {
	// 首次接收,获取文件相关信息
	fi, err := svr.Recv()
	if err != nil {
		panic(err)
	}
	
	key := fmt.Sprintf("common/%s.%s", fi.FileHash, fi.FileType)
	// 新建 grpc 流式消息内容读取器,传入服务端对象、消息请求体对象、指定接收消息字段和总计消息块大小
	reader := gstream.MustNewStreamReader(svr, &file.UploadFileReq{}, "FileData", fi.FileSize)
	oss.PutObject(key, reader)
}

实现原理

  • StreamWriter

    • 内部传入 grpc 客户端流对象,利用反射动态创建消息对象,并对指定 []byte 字段赋值
    • 内部申请 chunkSize 大小的缓存区,当缓存区写满时再调用客户端流对象进行消息 Send
    • Close 时将不足 chunkSize 大小缓存区数据全部写入消息体,进行最后一次发送
  • StreamReader

    • 内部传入 grpc 服务端流对象,利用反射动态创建消息对象,并对指定 []byte 字段取值
    • 内部将每次读取的消息体进行缓存,直到外界将本次消息体的内容读完时,再进行消息 Recv
    • 消息全部读完时,返回 io.EOF,让外部调用知晓数据已读取完毕