-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc_util: Reduce extra byte from limit reader #4957
Conversation
Currently removing the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done to distinguish whether a message is exactly the same size as the limit or over the limit. Is there a test that covers a message of exactly the limit? I kind of doubt it, since I would have expected it to fail with this change. If not, please add one, thank you!
There is no such test I believe, will add one. |
This would be best as a test in |
Got it, I will try and add it by this weekend, a little caught up in official tasks. |
@dfawley I am able to implement the Should I push the changes I have done so far? |
@uds5501 I would just have the compressor implement a func (f *fakeCompressor) Decompress(r io.Reader) (io.Reader, error) {
// Not sure if this needs to read from r at all.. if so, ioutil.ReadAll(r) and ignore the result.
return bytes.NewBuffer(make([]byte, f.size)), nil
} |
798f00c
to
c648182
Compare
@dfawley I have created a small sample test to see if the implementation is as expected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for writing this test. Comments inline.
@@ -6730,6 +6730,73 @@ func testRPCTimeout(t *testing.T, e env) { | |||
} | |||
} | |||
|
|||
type FakeCompressor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to export. It would be nice to parameterize the size (or add a payload buffer directly here?) for reusability.
} | ||
|
||
func (c *FakeCompressor) Compress(w io.Writer) (io.WriteCloser, error) { | ||
customCloser := &customWriterCloser{w} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use field names: Writer: w
. Also, omit local variable and return directly.
@@ -6730,6 +6730,73 @@ func testRPCTimeout(t *testing.T, e env) { | |||
} | |||
} | |||
|
|||
type FakeCompressor struct { | |||
name string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is unnecessary, or should be returned from Name()
.
|
||
func (s) TestDecompressionWithMaximumBytes(t *testing.T) { | ||
encoding.RegisterCompressor(&FakeCompressor{name: "fakeCompressor"}) | ||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10098) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This payload is actually unnecessary.
} | ||
|
||
func (c *FakeCompressor) Decompress(r io.Reader) (io.Reader, error) { | ||
return bytes.NewBuffer(make([]byte, 10098)), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will decompress zeroes -- will it work with protobuf to deserialize this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure actually, i couldn't study the test server well enough. I went through a few examples used in tests so copied one of them, I'll admit that I don't see how decompression will work here. Will research more on this.
s := grpc.NewServer() | ||
testpb.RegisterTestServiceServer(s, ss) | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
t.Fatalf("Failed to create listener: %v", err) | ||
} | ||
|
||
go func() { | ||
s.Serve(lis) | ||
}() | ||
defer s.Stop() | ||
|
||
dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer dcancel() | ||
cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.UseCompressor("fakeCompressor"), grpc.MaxCallRecvMsgSize(10098))) | ||
if err != nil { | ||
t.Fatalf("Failed to dial server") | ||
} | ||
defer cc.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are using protobuf..assuming it works..why not use ss.Start(nil, <dial options>)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, not sure.
Will need some guidance to understand where is our decompression actually coming into play.
|
||
dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer dcancel() | ||
cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.UseCompressor("fakeCompressor"), grpc.MaxCallRecvMsgSize(10098))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to test with a max recv msg size of N-1 and ensure it fails. This is where I believe your PR is incorrect.
@uds5501 : Looks like the tests are broken. |
This PR is labeled as requiring an update from the reporter, and no update has been received after 6 days. If no update is provided in the next 7 days, this issue will be automatically closed. |
Fixes #4552
RELEASE NOTES: None