Skip to content

Commit

Permalink
Switch to sync zlib with 512k chunks, adjustable compression level (#174
Browse files Browse the repository at this point in the history
)

* Switch to sync zlib with 512k chunks, adjustable compression level

* update serverPlayer
  • Loading branch information
extremeheat committed Feb 5, 2022
1 parent 2aa3b98 commit 14af5fe
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 54 deletions.
6 changes: 4 additions & 2 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ Returns a `Client` instance and connects to the server.
| skipPing | *optional* | Whether pinging the server to check its version should be skipped. |
| conLog | *optional* | Where to log connection information (server join, kick messages to). Defaults to console.log, set to `null` to not log anywhere. |
| useNativeRaknet | *optional* | Whether to use the C++ version of RakNet. Set to false to use JS. |
| authTitle | *optional* | The client ID to sign in as, defaults to Minecraft for Nintendo Switch. Set false to sign in through Azure. See prismarine-auth |
| deviceType | *optional* | The device type to sign in as, defaults to "Nintendo". See prismarine-auth |
| compressionLevel | *optional* | What zlib compression level to use, default to **7** |
| batchingInterval | *optional* | How frequently, in milliseconds to flush and write the packet queue (default: 20ms) |

The following events are emitted by the client:
* 'status' - When the client's login sequence status has changed
Expand All @@ -30,6 +30,8 @@ The following events are emitted by the client:
* 'kick' - The server has kicked the client
* 'close' - The server has closed the connection
* 'error' - An recoverable exception has happened. Not catching will throw an exception
* 'connect_allowed' - Emitted after the client has pinged the server and gets version information.
* 'heartbeat' - Emitted after two successful tick_sync (keepalive) packets have been sent bidirectionally

## be.createServer(options) : Server

Expand Down
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ declare module "bedrock-protocol" {
useNativeRaknet?: boolean,
// If using JS implementation of RakNet, should we use workers? (This only affects the client)
useRaknetWorker?: boolean
// Compression level for zlib, default to 7
compressionLevel?: number
// How frequently the packet queue should be flushed in milliseconds, defaults to 20ms
batchingInterval?: number
}

export interface ClientOptions extends Options {
Expand Down
8 changes: 7 additions & 1 deletion src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Client extends Connection {
if (this.options.protocolVersion < Options.MIN_VERSION) {
throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`)
}
this.compressionLevel = this.options.compressionLevel || 7
}

get entityId () {
Expand Down Expand Up @@ -140,7 +141,12 @@ class Client extends Connection {
if (this.status === ClientStatus.Initializing && this.options.autoInitPlayer === true) {
if (statusPacket.status === 'player_spawn') {
this.status = ClientStatus.Initialized
this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId })
if (!this.entityId) {
// We need to wait for start_game in the rare event we get a player_spawn before start_game race condition
this.on('start_game', () => this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId }))
} else {
this.write('set_local_player_as_initialized', { runtime_entity_id: this.entityId })
}
this.emit('spawn')
}
}
Expand Down
45 changes: 24 additions & 21 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Connection extends EventEmitter {
write (name, params) {
this.outLog?.(name, params)
if (name === 'start_game') this.updateItemPalette(params.itemstates)
const batch = new Framer()
const batch = new Framer(this.compressionLevel)
const packet = this.serializer.createPacketBuffer({ name, params })
batch.addEncodedPacket(packet)

Expand All @@ -84,29 +84,33 @@ class Connection extends EventEmitter {
this.sendIds.push(name)
}

_tick () {
if (this.sendQ.length) {
const batch = new Framer(this.compressionLevel)
batch.addEncodedPackets(this.sendQ)
this.sendQ = []
this.sendIds = []
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
} else {
this.sendDecryptedBatch(batch)
}
}
}

onTick = this._tick.bind(this)

startQueue () {
this.sendQ = []
this.loop = setInterval(() => {
if (this.sendQ.length) {
const batch = new Framer()
batch.addEncodedPackets(this.sendQ)
this.sendQ = []
this.sendIds = []
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
} else {
this.sendDecryptedBatch(batch)
}
}
}, 20)
this.loop = setInterval(this.onTick, this.options.batchingInterval || 20)
}

/**
* Sends a MCPE packet buffer
*/
sendBuffer (buffer, immediate = false) {
if (immediate) {
const batch = new Framer()
const batch = new Framer(this.compressionLevel)
batch.addEncodedPacket(buffer)
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
Expand All @@ -121,7 +125,7 @@ class Connection extends EventEmitter {

sendDecryptedBatch (batch) {
// send to raknet
batch.encode(buf => this.sendMCPE(buf, true))
this.sendMCPE(batch.encode(), true)
}

sendEncryptedBatch (batch) {
Expand Down Expand Up @@ -158,11 +162,10 @@ class Connection extends EventEmitter {
if (this.encryptionEnabled) {
this.decrypt(buffer.slice(1))
} else {
Framer.decode(buffer, packets => {
for (const packet of packets) {
this.readPacket(packet)
}
})
const packets = Framer.decode(buffer)
for (const packet of packets) {
this.readPacket(packet)
}
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/createClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ function createClient (options) {
const ad = advertisement.fromServerName(data)
client.options.version = options.version ?? (Versions[ad.version] ? ad.version : CURRENT_VERSION)
if (client.conLog) client.conLog(`Connecting to server ${ad.motd} (${ad.name}), version ${ad.version}`, client.options.version !== ad.version ? ` (as ${client.options.version})` : '')
client.emit('connect_allowed')
connect(client)
}, client)
}
Expand Down Expand Up @@ -46,15 +47,17 @@ function connect (client) {
sleep(500).then(() => client.queue('request_chunk_radius', { chunk_radius: client.viewDistance || 10 }))
})

const KEEPALIVE_INTERVAL = 10 // Send tick sync packets every 10 ticks
// Send tick sync packets every 10 ticks
const keepAliveInterval = 10
const keepAliveIntervalBig = BigInt(keepAliveInterval)
let keepalive
client.tick = 0n
client.once('spawn', () => {
keepalive = setInterval(() => {
// Client fills out the request_time and the server does response_time in its reply.
client.queue('tick_sync', { request_time: client.tick, response_time: 0n })
client.tick += BigInt(KEEPALIVE_INTERVAL)
}, 50 * KEEPALIVE_INTERVAL)
client.tick += keepAliveIntervalBig
}, 50 * keepAliveInterval)

client.on('tick_sync', async packet => {
client.emit('heartbeat', packet.response_time)
Expand Down
1 change: 1 addition & 0 deletions src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Server extends EventEmitter {
if (this.options.protocolVersion < Options.MIN_VERSION) {
throw new Error(`Protocol version < ${Options.MIN_VERSION} : ${this.options.protocolVersion}, too old`)
}
this.compressionLevel = this.options.compressionLevel || 7
}

onOpenConnection = (conn) => {
Expand Down
1 change: 1 addition & 0 deletions src/serverPlayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Player extends Connection {
this.deserializer = server.deserializer
this.connection = connection
this.options = server.options
this.compressionLevel = server.compressionLevel

KeyExchange(this, server, server.options)
Login(this, server, server.options)
Expand Down
16 changes: 6 additions & 10 deletions src/transforms/encryption.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ function createEncryptor (client, iv) {
// The send counter is represented as a little-endian 64-bit long and incremented after each packet.

function process (chunk) {
Zlib.deflateRaw(chunk, { level: 7 }, (err, buffer) => {
if (err) throw err
const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)])
client.sendCounter++
client.cipher.write(packet)
})
const buffer = Zlib.deflateRawSync(chunk, { level: client.compressionLevel })
const packet = Buffer.concat([buffer, computeCheckSum(buffer, client.sendCounter, client.secretKeyBytes)])
client.sendCounter++
client.cipher.write(packet)
}

client.cipher.on('data', client.onEncryptedPacket)
Expand Down Expand Up @@ -72,10 +70,8 @@ function createDecryptor (client, iv) {
return
}

Zlib.inflateRaw(chunk, { chunkSize: 1024 * 1024 * 2 }, (err, buffer) => {
if (err) throw err
client.onDecryptedPacket(buffer)
})
const buffer = Zlib.inflateRawSync(chunk, { chunkSize: 512000 })
client.onDecryptedPacket(buffer)
}

client.decipher.on('data', verify)
Expand Down
30 changes: 13 additions & 17 deletions src/transforms/framer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,30 @@ const zlib = require('zlib')

// Concatenates packets into one batch packet, and adds length prefixs.
class Framer {
constructor () {
constructor (compressionLevel) {
// Encoding
this.packets = []
this.compressionLevel = 7
this.compressionLevel = compressionLevel
}

static decode (buf, cb) {
static decode (buf) {
// Read header
if (buf[0] !== 0xfe) throw Error('bad batch packet header ' + buf[0])
const buffer = buf.slice(1)

// Decode the payload
zlib.inflateRaw(buffer, { chunkSize: 1024 * 1024 * 2 }, (err, inflated) => {
if (err) { // Try to decode without compression
Framer.getPackets(buffer)
return
}
cb(Framer.getPackets(inflated))
})
// Decode the payload with 512kb buffer
try {
const inflated = zlib.inflateRawSync(buffer, { chunkSize: 512000 })
return Framer.getPackets(inflated)
} catch (e) { // Try to decode without compression
return Framer.getPackets(buffer)
}
}

encode (cb) {
encode () {
const buf = Buffer.concat(this.packets)
zlib.deflateRaw(buf, { level: this.compressionLevel }, (err, def) => {
if (err) throw err
const ret = Buffer.concat([Buffer.from([0xfe]), def])
cb(ret)
})
const def = zlib.deflateRawSync(buf, { level: this.compressionLevel })
return Buffer.concat([Buffer.from([0xfe]), def])
}

addEncodedPacket (chunk) {
Expand Down

0 comments on commit 14af5fe

Please sign in to comment.