Skip to content

Commit

Permalink
Merge pull request #536 from brianc/packet-reader
Browse files Browse the repository at this point in the history
This moves the packet reading and chunking into a separate module
  • Loading branch information
brianc committed Mar 14, 2014
2 parents 2716f95 + 79f0394 commit 8d44a39
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 64 deletions.
83 changes: 21 additions & 62 deletions lib/connection.js
Expand Up @@ -5,6 +5,7 @@ var util = require('util');

var utils = require(__dirname + '/utils');
var Writer = require('buffer-writer');
var Reader = require('packet-reader');

var TEXT_MODE = 0;
var BINARY_MODE = 1;
Expand All @@ -23,6 +24,10 @@ var Connection = function(config) {
this._ending = false;
this._mode = TEXT_MODE;
this._emitMessage = false;
this._reader = new Reader({
headerSize: 1,
lengthPadding: -4
});
var self = this;
this.on('newListener', function(eventName) {
if(eventName == 'message') {
Expand Down Expand Up @@ -87,17 +92,19 @@ Connection.prototype.connect = function(port, host) {
};

Connection.prototype.attachListeners = function(stream) {
var self = this;
stream.on('data', function(buff) {
this.setBuffer(buff);
var msg = this.parseMessage();
while(msg) {
if(this._emitMessage) {
this.emit('message', msg);
self._reader.addChunk(buff);
var packet = self._reader.read();
while(packet) {
var msg = self.parseMessage(packet);
if(self._emitMessage) {
self.emit('message', msg);
}
this.emit(msg.name, msg);
msg = this.parseMessage();
self.emit(msg.name, msg);
packet = self._reader.read();
}
}.bind(this));
});
};

Connection.prototype.requestSsl = function(config) {
Expand Down Expand Up @@ -306,63 +313,16 @@ Connection.prototype.sendCopyFail = function (msg) {
this._send(0x66);
};

//parsing methods
Connection.prototype.setBuffer = function(buffer) {
if(this.lastBuffer) { //we have unfinished biznaz
//need to combine last two buffers
var remaining = this.lastBuffer.length - this.lastOffset;
var combinedBuffer = new Buffer(buffer.length + remaining);
this.lastBuffer.copy(combinedBuffer, 0, this.lastOffset);
buffer.copy(combinedBuffer, remaining, 0);
buffer = combinedBuffer;
}
this.lastBuffer = false;
this.buffer = buffer;
this.offset = 0;
};

Connection.prototype.readSslResponse = function() {
var remaining = this.buffer.length - (this.offset);
if(remaining < 1) {
this.lastBuffer = this.buffer;
this.lastOffset = this.offset;
return false;
}
return {
name: 'sslresponse',
text: this.buffer[this.offset++]
};
};

var Message = function(name, length) {
this.name = name;
this.length = length;
};

Connection.prototype.parseMessage = function() {
var remaining = this.buffer.length - (this.offset);
if(remaining < 5) {
//cannot read id + length without at least 5 bytes
//just abort the read now
this.lastBuffer = this.buffer;
this.lastOffset = this.offset;
return false;
}

//read message id code
var id = this.buffer[this.offset++];
var buffer = this.buffer;
//read message length
var length = this.parseInt32(buffer);

if(remaining <= length) {
this.lastBuffer = this.buffer;
//rewind the last 5 bytes we read
this.lastOffset = this.offset-5;
return false;
}
Connection.prototype.parseMessage = function(buffer) {

switch(id)
this.offset = 0;
var length = buffer.length + 4;
switch(this._reader.header)
{

case 0x52: //R
Expand Down Expand Up @@ -422,7 +382,6 @@ Connection.prototype.parseMessage = function() {
case 0x64: //d
return this.parsed(buffer, length);
}
return false;
};

Connection.prototype.parseR = function(buffer, length) {
Expand All @@ -440,7 +399,7 @@ Connection.prototype.parseR = function(buffer, length) {
if(code === 5) { //md5 required
msg.name = 'authenticationMD5Password';
msg.salt = new Buffer(4);
this.buffer.copy(msg.salt, 0, this.offset, this.offset + 4);
buffer.copy(msg.salt, 0, this.offset, this.offset + 4);
this.offset += 4;
return msg;
}
Expand Down Expand Up @@ -610,7 +569,7 @@ Connection.prototype.parseH = function(buffer, length) {
};

Connection.prototype.parseGH = function (buffer, msg) {
var isBinary = this.buffer[this.offset] !== 0;
var isBinary = buffer[this.offset] !== 0;
this.offset++;
msg.binary = isBinary;
var columnCount = this.parseInt16(buffer);
Expand Down
3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -21,7 +21,8 @@
"generic-pool": "2.0.3",
"buffer-writer": "1.0.0",
"pgpass": "0.0.1",
"nan": "~0.6.0"
"nan": "~0.6.0",
"packet-reader": "0.2.0"
},
"devDependencies": {
"jshint": "1.1.0",
Expand Down
4 changes: 3 additions & 1 deletion test/unit/client/typed-query-results-tests.js
Expand Up @@ -121,7 +121,9 @@ test('typed results', function() {
dataTypeID: 1082,
actual: '2010-10-31',
expected: function(val) {
assert.UTCDate(val, 2010, 9, 31, 0, 0, 0, 0);
var now = new Date(2010, 9, 31)
assert.UTCDate(val, 2010, now.getUTCMonth(), now.getUTCDate(), now.getUTCHours(), 0, 0, 0);
assert.equal(val.getHours(), now.getHours())
}
},{
name: 'interval time',
Expand Down

0 comments on commit 8d44a39

Please sign in to comment.