Skip to content

Commit

Permalink
Add additional stream message compression unit tests (#1879)
Browse files Browse the repository at this point in the history
Co-authored-by: George Barnett <gbarnett@apple.com>
  • Loading branch information
gjcairo and glbrntt committed May 14, 2024
1 parent fe71322 commit 07744c7
Showing 1 changed file with 187 additions and 28 deletions.
215 changes: 187 additions & 28 deletions Tests/GRPCHTTP2CoreTests/GRPCStreamStateMachineTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ extension HPACKHeaders {
GRPCHTTP2Keys.encoding.rawValue: "deflate",
GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
]
fileprivate static let serverInitialMetadataWithGZIPCompression: Self = [
GRPCHTTP2Keys.status.rawValue: "200",
GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
GRPCHTTP2Keys.encoding.rawValue: "gzip",
GRPCHTTP2Keys.acceptEncoding.rawValue: "deflate",
]
fileprivate static let serverTrailers: Self = [
GRPCHTTP2Keys.status.rawValue: "200",
GRPCHTTP2Keys.contentType.rawValue: ContentType.grpc.canonicalValue,
Expand Down Expand Up @@ -331,6 +337,93 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
}
}

func testReceiveInitialMetadataWhenServerIdle_ClientUnsupportedEncoding() throws {
// Create client with deflate compression enabled
var stateMachine = self.makeClientStateMachine(
targetState: .clientOpenServerIdle,
compressionEnabled: true
)

// Try opening server with gzip compression, which client does not support.
let action = try stateMachine.receive(
headers: .serverInitialMetadataWithGZIPCompression,
endStream: false
)

XCTAssertEqual(
action,
.receivedStatusAndMetadata(
status: Status(
code: .internalError,
message:
"The server picked a compression algorithm ('gzip') the client does not know about."
),
metadata: [
":status": "200",
"content-type": "application/grpc",
"grpc-encoding": "gzip",
"grpc-accept-encoding": "deflate",
]
)
)
}

func testReceiveMessage_ClientCompressionEnabled() throws {
// Enable deflate compression on client
var stateMachine = self.makeClientStateMachine(
targetState: .clientOpenServerOpen,
compressionEnabled: true
)

let originalMessage = [UInt8]([42, 42, 43, 43])

// Receiving uncompressed message should still work.
let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none)
XCTAssertNoThrow(try stateMachine.receive(buffer: receivedUncompressedBytes, endStream: false))
var receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .noMoreMessages, .awaitMoreMessages:
XCTFail("Should have received message")
case .receiveMessage(let receivedMessaged):
XCTAssertEqual(originalMessage, receivedMessaged)
}

// Receiving compressed message with deflate should work
let receivedDeflateCompressedBytes = try self.frameMessage(
originalMessage,
compression: .deflate
)
XCTAssertNoThrow(
try stateMachine.receive(buffer: receivedDeflateCompressedBytes, endStream: false)
)
receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .noMoreMessages, .awaitMoreMessages:
XCTFail("Should have received message")
case .receiveMessage(let receivedMessaged):
XCTAssertEqual(originalMessage, receivedMessaged)
}

// Receiving compressed message with gzip (unsupported) should throw error
let receivedGZIPCompressedBytes = try self.frameMessage(originalMessage, compression: .gzip)
XCTAssertThrowsError(
ofType: RPCError.self,
try stateMachine.receive(buffer: receivedGZIPCompressedBytes, endStream: false)
) { error in
XCTAssertEqual(error.code, .internalError)
XCTAssertEqual(error.message, "Decompression error")
}
receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .awaitMoreMessages:
()
case .noMoreMessages:
XCTFail("Should be awaiting for more messages")
case .receiveMessage:
XCTFail("Should not have received message")
}
}

func testReceiveInitialMetadataWhenServerIdle() throws {
for targetState in [
TargetStateMachineState.clientOpenServerIdle, .clientClosedServerIdle,
Expand Down Expand Up @@ -681,7 +774,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))

let request = try stateMachine.nextOutboundFrame()
let framedMessage = try self.frameMessage(originalMessage, compress: true)
let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
}

Expand All @@ -697,7 +790,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))

let request = try stateMachine.nextOutboundFrame()
let framedMessage = try self.frameMessage(originalMessage, compress: true)
let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
XCTAssertEqual(request, .sendFrame(frame: framedMessage, promise: nil))
}

Expand Down Expand Up @@ -806,7 +899,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
)

let originalMessage = [UInt8]([42, 42, 43, 43])
let receivedBytes = try self.frameMessage(originalMessage, compress: true)
let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate)
XCTAssertEqual(
try stateMachine.receive(buffer: receivedBytes, endStream: false),
.readInbound
Expand Down Expand Up @@ -920,7 +1013,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)

let message = [UInt8]([1, 2, 3, 4])
let framedMessage = try self.frameMessage(message, compress: false)
let framedMessage = try self.frameMessage(message, compression: .none)
try stateMachine.send(message: message, promise: nil)
XCTAssertEqual(
try stateMachine.nextOutboundFrame(),
Expand All @@ -932,9 +1025,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)

let firstResponseBytes = [UInt8]([5, 6, 7])
let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
let secondResponseBytes = [UInt8]([8, 9, 10])
let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
XCTAssertEqual(
try stateMachine.receive(buffer: firstResponse, endStream: false),
.readInbound
Expand Down Expand Up @@ -993,7 +1086,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)

let message = [UInt8]([1, 2, 3, 4])
let framedMessage = try self.frameMessage(message, compress: false)
let framedMessage = try self.frameMessage(message, compression: .none)
XCTAssertNoThrow(try stateMachine.send(message: message, promise: nil))
XCTAssertNoThrow(try stateMachine.closeOutbound())
XCTAssertEqual(
Expand All @@ -1020,9 +1113,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)

let firstResponseBytes = [UInt8]([5, 6, 7])
let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
let secondResponseBytes = [UInt8]([8, 9, 10])
let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
XCTAssertEqual(
try stateMachine.receive(buffer: firstResponse, endStream: false),
.readInbound
Expand Down Expand Up @@ -1078,7 +1171,7 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(try stateMachine.nextOutboundFrame(), .awaitMoreMessages)

let message = [UInt8]([1, 2, 3, 4])
let framedMessage = try self.frameMessage(message, compress: false)
let framedMessage = try self.frameMessage(message, compression: .none)
try stateMachine.send(message: message, promise: nil)
XCTAssertEqual(
try stateMachine.nextOutboundFrame(),
Expand Down Expand Up @@ -1107,9 +1200,9 @@ final class GRPCStreamClientStateMachineTests: XCTestCase {
XCTAssertEqual(stateMachine.nextInboundMessage(), .awaitMoreMessages)

let firstResponseBytes = [UInt8]([5, 6, 7])
let firstResponse = try self.frameMessage(firstResponseBytes, compress: false)
let firstResponse = try self.frameMessage(firstResponseBytes, compression: .none)
let secondResponseBytes = [UInt8]([8, 9, 10])
let secondResponse = try self.frameMessage(secondResponseBytes, compress: false)
let secondResponse = try self.frameMessage(secondResponseBytes, compression: .none)
XCTAssertEqual(
try stateMachine.receive(buffer: firstResponse, endStream: false),
.readInbound
Expand Down Expand Up @@ -1757,9 +1850,6 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
}
}

//TODO: add more encoding-related validation tests (for both client and server)
// and message encoding tests

func testReceiveMetadataWhenClientOpenAndServerIdle() throws {
var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerIdle)

Expand Down Expand Up @@ -1881,6 +1971,62 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
}
}

func testReceiveMessage_ServerCompressionEnabled() throws {
// Enable deflate compression on server
var stateMachine = self.makeServerStateMachine(
targetState: .clientOpenServerOpen,
compressionEnabled: true
)

let originalMessage = [UInt8]([42, 42, 43, 43])

// Receiving uncompressed message should still work.
let receivedUncompressedBytes = try self.frameMessage(originalMessage, compression: .none)
XCTAssertNoThrow(try stateMachine.receive(buffer: receivedUncompressedBytes, endStream: false))
var receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .noMoreMessages, .awaitMoreMessages:
XCTFail("Should have received message")
case .receiveMessage(let receivedMessaged):
XCTAssertEqual(originalMessage, receivedMessaged)
}

// Receiving compressed message with deflate should work
let receivedDeflateCompressedBytes = try self.frameMessage(
originalMessage,
compression: .deflate
)
XCTAssertNoThrow(
try stateMachine.receive(buffer: receivedDeflateCompressedBytes, endStream: false)
)
receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .noMoreMessages, .awaitMoreMessages:
XCTFail("Should have received message")
case .receiveMessage(let receivedMessaged):
XCTAssertEqual(originalMessage, receivedMessaged)
}

// Receiving compressed message with gzip (unsupported) should throw error
let receivedGZIPCompressedBytes = try self.frameMessage(originalMessage, compression: .gzip)
XCTAssertThrowsError(
ofType: RPCError.self,
try stateMachine.receive(buffer: receivedGZIPCompressedBytes, endStream: false)
) { error in
XCTAssertEqual(error.code, .internalError)
XCTAssertEqual(error.message, "Decompression error")
}
receivedAction = stateMachine.nextInboundMessage()
switch receivedAction {
case .awaitMoreMessages:
()
case .noMoreMessages:
XCTFail("Should be awaiting for more messages")
case .receiveMessage:
XCTFail("Should not have received message")
}
}

func testReceiveMessageWhenClientOpenAndServerClosed() {
var stateMachine = self.makeServerStateMachine(targetState: .clientOpenServerClosed)

Expand Down Expand Up @@ -1993,7 +2139,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
XCTAssertNoThrow(try stateMachine.send(message: originalMessage, promise: nil))

let response = try stateMachine.nextOutboundFrame()
let framedMessage = try self.frameMessage(originalMessage, compress: true)
let framedMessage = try self.frameMessage(originalMessage, compression: .deflate)
XCTAssertEqual(response, .sendFrame(frame: framedMessage, promise: nil))
}

Expand Down Expand Up @@ -2125,7 +2271,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
)

let originalMessage = [UInt8]([42, 42, 43, 43])
let receivedBytes = try self.frameMessage(originalMessage, compress: true)
let receivedBytes = try self.frameMessage(originalMessage, compression: .deflate)

XCTAssertEqual(
try stateMachine.receive(buffer: receivedBytes, endStream: false),
Expand Down Expand Up @@ -2247,7 +2393,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {

// Client sends messages
let deframedMessage = [UInt8]([1, 2, 3, 4])
let completeMessage = try self.frameMessage(deframedMessage, compress: false)
let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
// Split message into two parts to make sure the stitching together of the frames works well
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
Expand Down Expand Up @@ -2276,7 +2422,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
try stateMachine.send(message: secondResponse, promise: secondPromise)

// Make sure messages are outbound
let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
let framedMessages = try self.frameMessages(
[firstResponse, secondResponse],
compression: .none
)

guard
case .sendFrame(let nextOutboundByteBuffer, let nextOutboundPromise) =
Expand Down Expand Up @@ -2326,7 +2475,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {

// Client sends messages
let deframedMessage = [UInt8]([1, 2, 3, 4])
let completeMessage = try self.frameMessage(deframedMessage, compress: false)
let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
// Split message into two parts to make sure the stitching together of the frames works well
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
Expand Down Expand Up @@ -2368,7 +2517,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
try stateMachine.send(message: secondResponse, promise: nil)

// Make sure messages are outbound
let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
let framedMessages = try self.frameMessages(
[firstResponse, secondResponse],
compression: .none
)
XCTAssertEqual(
try stateMachine.nextOutboundFrame(),
.sendFrame(frame: framedMessages, promise: nil)
Expand Down Expand Up @@ -2400,7 +2552,7 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {

// Client sends messages
let deframedMessage = [UInt8]([1, 2, 3, 4])
let completeMessage = try self.frameMessage(deframedMessage, compress: false)
let completeMessage = try self.frameMessage(deframedMessage, compression: .none)
// Split message into two parts to make sure the stitching together of the frames works well
let firstMessage = completeMessage.getSlice(at: 0, length: 4)!
let secondMessage = completeMessage.getSlice(at: 4, length: completeMessage.readableBytes - 4)!
Expand Down Expand Up @@ -2442,7 +2594,10 @@ final class GRPCStreamServerStateMachineTests: XCTestCase {
try stateMachine.send(message: secondResponse, promise: nil)

// Make sure messages are outbound
let framedMessages = try self.frameMessages([firstResponse, secondResponse], compress: false)
let framedMessages = try self.frameMessages(
[firstResponse, secondResponse],
compression: .none
)
XCTAssertEqual(
try stateMachine.nextOutboundFrame(),
.sendFrame(frame: framedMessages, promise: nil)
Expand Down Expand Up @@ -2473,16 +2628,20 @@ extension XCTestCase {
try expression(trailers)
}

func frameMessage(_ message: [UInt8], compress: Bool) throws -> ByteBuffer {
try frameMessages([message], compress: compress)
func frameMessage(_ message: [UInt8], compression: CompressionAlgorithm) throws -> ByteBuffer {
try frameMessages([message], compression: compression)
}

func frameMessages(_ messages: [[UInt8]], compress: Bool) throws -> ByteBuffer {
func frameMessages(_ messages: [[UInt8]], compression: CompressionAlgorithm) throws -> ByteBuffer
{
var framer = GRPCMessageFramer()
let compressor: Zlib.Compressor? = {
if compress {
switch compression {
case .deflate:
return Zlib.Compressor(method: .deflate)
} else {
case .gzip:
return Zlib.Compressor(method: .gzip)
default:
return nil
}
}()
Expand Down

0 comments on commit 07744c7

Please sign in to comment.