diff --git a/src/ChannelWrapper.js b/src/ChannelWrapper.js index 39e40cc..cb9be08 100644 --- a/src/ChannelWrapper.js +++ b/src/ChannelWrapper.js @@ -355,11 +355,12 @@ export default class ChannelWrapper extends EventEmitter { .then(() => { const encodedMessage = this._json ? new Buffer.from(JSON.stringify(message.content)) : message.content; + let result = true; const sendPromise = (() => { switch (message.type) { case 'publish': return new Promise(function(resolve, reject) { - const result = channel.publish(message.exchange, message.routingKey, encodedMessage, + result = channel.publish(message.exchange, message.routingKey, encodedMessage, message.options, err => { if(err) { @@ -371,7 +372,7 @@ export default class ChannelWrapper extends EventEmitter { }); case 'sendToQueue': return new Promise(function(resolve, reject) { - const result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => { + result = channel.sendToQueue(message.queue, encodedMessage, message.options, err => { if(err) { reject(err); } else { @@ -379,16 +380,17 @@ export default class ChannelWrapper extends EventEmitter { } }); }); - /* istanbul ignore next */ default: throw new Error(`Unhandled message type ${message.type}`); } })(); - // Send some more! - this._publishQueuedMessages(workerNumber); - + if(result) { + this._publishQueuedMessages(workerNumber); + } else { + this._channel.once('drain', ()=>this._publishQueuedMessages(workerNumber)) + } return sendPromise; }) .then( @@ -482,4 +484,4 @@ function removeUnconfirmedMessage(arr, message) { } const removed = arr.splice(toRemove, 1); return removed[0]; -} \ No newline at end of file +}