Skip to content

Commit

Permalink
Speed up bind functionality (#2286)
Browse files Browse the repository at this point in the history
Move from 3 loops (prepareValue, check for buffers, write param types, write param values) to a single loop. This speeds up the insert benchmark by around 100 queries per second. Performance improvement depends on number of parameters being bound.
  • Loading branch information
brianc committed Nov 4, 2020
1 parent 78a14a1 commit 07988f9
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 82 deletions.
4 changes: 2 additions & 2 deletions packages/pg-protocol/src/buffer-writer.ts
Expand Up @@ -5,7 +5,7 @@ export class Writer {
private offset: number = 5
private headerPosition: number = 0
constructor(private size = 256) {
this.buffer = Buffer.alloc(size)
this.buffer = Buffer.allocUnsafe(size)
}

private ensure(size: number): void {
Expand All @@ -15,7 +15,7 @@ export class Writer {
// exponential growth factor of around ~ 1.5
// https://stackoverflow.com/questions/2269063/buffer-growth-strategy
var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size
this.buffer = Buffer.alloc(newSize)
this.buffer = Buffer.allocUnsafe(newSize)
oldBuffer.copy(this.buffer)
}
}
Expand Down
29 changes: 29 additions & 0 deletions packages/pg-protocol/src/outbound-serializer.test.ts
Expand Up @@ -110,6 +110,10 @@ describe('serializer', () => {
var expectedBuffer = new BufferList()
.addCString('bang') // portal name
.addCString('woo') // statement name
.addInt16(4)
.addInt16(0)
.addInt16(0)
.addInt16(0)
.addInt16(0)
.addInt16(4)
.addInt32(1)
Expand All @@ -125,6 +129,31 @@ describe('serializer', () => {
})
})

it('with custom valueMapper', function () {
const actual = serialize.bind({
portal: 'bang',
statement: 'woo',
values: ['1', 'hi', null, 'zing'],
valueMapper: () => null,
})
var expectedBuffer = new BufferList()
.addCString('bang') // portal name
.addCString('woo') // statement name
.addInt16(4)
.addInt16(0)
.addInt16(0)
.addInt16(0)
.addInt16(0)
.addInt16(4)
.addInt32(-1)
.addInt32(-1)
.addInt32(-1)
.addInt32(-1)
.addInt16(0)
.join(true, 'B')
assert.deepEqual(actual, expectedBuffer)
})

it('with named statement, portal, and buffer value', function () {
const actual = serialize.bind({
portal: 'bang',
Expand Down
80 changes: 45 additions & 35 deletions packages/pg-protocol/src/serializer.ts
Expand Up @@ -101,56 +101,66 @@ const parse = (query: ParseOpts): Buffer => {
return writer.flush(code.parse)
}

type ValueMapper = (param: any, index: number) => any

type BindOpts = {
portal?: string
binary?: boolean
statement?: string
values?: any[]
// optional map from JS value to postgres value per parameter
valueMapper?: ValueMapper
}

const paramWriter = new Writer()

// make this a const enum so typescript will inline the value
const enum ParamType {
STRING = 0,
BINARY = 1,
}

const writeValues = function (values: any[], valueMapper?: ValueMapper): void {
for (let i = 0; i < values.length; i++) {
const mappedVal = valueMapper ? valueMapper(values[i], i) : values[i]
if (mappedVal == null) {
// add the param type (string) to the writer
writer.addInt16(ParamType.STRING)
// write -1 to the param writer to indicate null
paramWriter.addInt32(-1)
} else if (mappedVal instanceof Buffer) {
// add the param type (binary) to the writer
writer.addInt16(ParamType.BINARY)
// add the buffer to the param writer
paramWriter.addInt32(mappedVal.length)
paramWriter.add(mappedVal)
} else {
// add the param type (string) to the writer
writer.addInt16(ParamType.STRING)
paramWriter.addInt32(Buffer.byteLength(mappedVal))
paramWriter.addString(mappedVal)
}
}
}

const bind = (config: BindOpts = {}): Buffer => {
// normalize config
const portal = config.portal || ''
const statement = config.statement || ''
const binary = config.binary || false
var values = config.values || emptyArray
var len = values.length
const values = config.values || emptyArray
const len = values.length

var useBinary = false
// TODO(bmc): all the loops in here aren't nice, we can do better
for (var j = 0; j < len; j++) {
useBinary = useBinary || values[j] instanceof Buffer
}
writer.addCString(portal).addCString(statement)
writer.addInt16(len)

var buffer = writer.addCString(portal).addCString(statement)
if (!useBinary) {
buffer.addInt16(0)
} else {
buffer.addInt16(len)
for (j = 0; j < len; j++) {
buffer.addInt16(values[j] instanceof Buffer ? 1 : 0)
}
}
buffer.addInt16(len)
for (var i = 0; i < len; i++) {
var val = values[i]
if (val === null || typeof val === 'undefined') {
buffer.addInt32(-1)
} else if (val instanceof Buffer) {
buffer.addInt32(val.length)
buffer.add(val)
} else {
buffer.addInt32(Buffer.byteLength(val))
buffer.addString(val)
}
}
writeValues(values, config.valueMapper)

if (binary) {
buffer.addInt16(1) // format codes to use binary
buffer.addInt16(1)
} else {
buffer.addInt16(0) // format codes to use text
}
writer.addInt16(len)
writer.add(paramWriter.flush())

// format code
writer.addInt16(binary ? ParamType.BINARY : ParamType.STRING)
return writer.flush(code.bind)
}

Expand Down
59 changes: 31 additions & 28 deletions packages/pg/bench.js
Expand Up @@ -45,37 +45,40 @@ const run = async () => {
console.log('warmup done')
const seconds = 5

let queries = await bench(client, params, seconds * 1000)
console.log('')
console.log('little queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 733 qps')
for (let i = 0; i < 4; i++) {
let queries = await bench(client, params, seconds * 1000)
console.log('')
console.log('little queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 733 qps')

console.log('')
queries = await bench(client, seq, seconds * 1000)
console.log('sequence queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 1309 qps')
console.log('')
queries = await bench(client, seq, seconds * 1000)
console.log('sequence queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 1309 qps')

console.log('')
queries = await bench(client, insert, seconds * 1000)
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 6303 qps')
console.log('')
queries = await bench(client, insert, seconds * 1000)
console.log('insert queries:', queries)
console.log('qps', queries / seconds)
console.log('on my laptop best so far seen 6445 qps')

console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
console.log('')
console.log('Warming up bytea test')
await client.query({
text: 'INSERT INTO buf(name, data) VALUES ($1, $2)',
values: ['test', Buffer.allocUnsafe(104857600)],
})
console.log('bytea warmup done')
const start = Date.now()
const results = await client.query('SELECT * FROM buf')
const time = Date.now() - start
console.log('bytea time:', time, 'ms')
console.log('bytea length:', results.rows[0].data.byteLength, 'bytes')
console.log('on my laptop best so far seen 1107ms and 104857600 bytes')
await new Promise((resolve) => setTimeout(resolve, 250))
}

await client.end()
await client.end()
Expand Down
28 changes: 14 additions & 14 deletions packages/pg/lib/query.js
Expand Up @@ -197,22 +197,22 @@ class Query extends EventEmitter {
})
}

if (this.values) {
try {
this.values = this.values.map(utils.prepareValue)
} catch (err) {
this.handleError(err, connection)
return
}
// because we're mapping user supplied values to
// postgres wire protocol compatible values it could
// throw an exception, so try/catch this section
try {
connection.bind({
portal: this.portal,
statement: this.name,
values: this.values,
binary: this.binary,
valueMapper: utils.prepareValue,
})
} catch (err) {
this.handleError(err, connection)
return
}

connection.bind({
portal: this.portal,
statement: this.name,
values: this.values,
binary: this.binary,
})

connection.describe({
type: 'P',
name: this.portal || '',
Expand Down
7 changes: 4 additions & 3 deletions packages/pg/lib/utils.js
Expand Up @@ -38,6 +38,10 @@ function arrayString(val) {
// note: you can override this function to provide your own conversion mechanism
// for complex types, etc...
var prepareValue = function (val, seen) {
// null and undefined are both null for postgres
if (val == null) {
return null
}
if (val instanceof Buffer) {
return val
}
Expand All @@ -58,9 +62,6 @@ var prepareValue = function (val, seen) {
if (Array.isArray(val)) {
return arrayString(val)
}
if (val === null || typeof val === 'undefined') {
return null
}
if (typeof val === 'object') {
return prepareObject(val, seen)
}
Expand Down

0 comments on commit 07988f9

Please sign in to comment.