Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc_util: Throws error if message is larger than MaxRecvMsgSize #6999

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 35 additions & 5 deletions rpc_util.go
Expand Up @@ -815,17 +815,47 @@
//
// TODO: If we ensure that the buffer size is the same as the DecompressedSize,
// we can also utilize the recv buffer pool here.
buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
bufferSize := uint64(size) + bytes.MinRead
if bufferSize > math.MaxInt {
bufferSize = math.MaxInt
}

Check warning on line 821 in rpc_util.go

View check run for this annotation

Codecov / codecov/patch

rpc_util.go#L820-L821

Added lines #L820 - L821 were not covered by tests
buf := bytes.NewBuffer(make([]byte, 0, int(bufferSize)))

bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)))
if err != nil {
return nil, int(bytesRead), err
}

if err = checkReceiveMessageOverflow(bytesRead, int64(maxReceiveMessageSize), dcReader); err != nil {
return nil, size + 1, err
}

Check warning on line 831 in rpc_util.go

View check run for this annotation

Codecov / codecov/patch

rpc_util.go#L830-L831

Added lines #L830 - L831 were not covered by tests

return buf.Bytes(), int(bytesRead), err
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will also need fixes below here in case DecompressedSize is not implemented by the compressor.

As @arvindbr8 mentioned, we'll also want tests for overflowing the max message size with both kinds of compressors, too (some may already exist), and also covering the case where the max size is set to MaxInt. I guess we can't test overflowing with MaxInt very easily, though. Maybe 2GB (on a 32-bit system) isn't asking too much(?), but even that could be problematic.

I think a unit test of decompress would be ideal here, with an internal way to override MaxInt to a smaller value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @dfawley,
regarding the fixes for compressor not implementing DecompressedSize, could you please elaborate on what the intended behaviour should be?
I had the following changes in mind:

if sizer, ok := compressor.(interface {
	DecompressedSize(compressedBytes []byte) int
}); ok {
	...
} else {
	log.Println("warn: compressor does not implement method DecompressedSize()")
}

d, err = io.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)))

if len(d) == maxReceiveMessageSize {
	b := make([]byte, 1)
	if n, err := dcReader.Read(b); n > 0 || err != io.EOF {
		return nil, len(d) + n, fmt.Errorf("overflow: message larger than max size receivable by client (%v bytes)", maxReceiveMessageSize)
	}
}
return d, len(d), err

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, something like that LGTM. It would be great if you can refactor the code to remove the duplication, though, since both code paths are very similar.

// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
d, err = io.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
// Read from LimitReader & check for overflow
d, err = io.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)))
if err != nil {
return nil, len(d), err
}

Check warning on line 840 in rpc_util.go

View check run for this annotation

Codecov / codecov/patch

rpc_util.go#L839-L840

Added lines #L839 - L840 were not covered by tests

if err = checkReceiveMessageOverflow(int64(len(d)), int64(maxReceiveMessageSize), dcReader); err != nil {
return nil, len(d) + 1, err
}

Check warning on line 844 in rpc_util.go

View check run for this annotation

Codecov / codecov/patch

rpc_util.go#L843-L844

Added lines #L843 - L844 were not covered by tests

return d, len(d), err
}

func checkReceiveMessageOverflow(readBytes, maxReceiveMessageSize int64, dcReader io.Reader) error {
if readBytes == maxReceiveMessageSize {
b := make([]byte, 1)
if n, err := dcReader.Read(b); n > 0 || err != io.EOF {
return fmt.Errorf("overflow: message larger than max size receivable by client (%v bytes)", maxReceiveMessageSize)
}

Check warning on line 854 in rpc_util.go

View check run for this annotation

Codecov / codecov/patch

rpc_util.go#L851-L854

Added lines #L851 - L854 were not covered by tests
}
return nil
}

// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
Expand Down