Skip to content

Robust Listeners

Vitaly Tomilov edited this page May 27, 2024 · 33 revisions

Below is a complete demo application that shows how to automatically re-connect a global direct connection with permanent event listeners in it, should the physical connection fail.

The demo makes use of option onLost of Database.connect method.

The application sets up one global connection with a notification listener in it, and keeps sending a notification into the channel every second, while the connection is available. When the connection breaks, it executes a simple strategy for restoring the connection, trying 10 times, after each 5 seconds, and if fails to restore the connection - it exits the process.

const cn = {
    // your connection details
};

const pgp = require('pg-promise')({
    // Initialization Options
});

const db = pgp(cn); // Database Object

const channel = 'my-channel'; // LISTEN - channel name
let connection; // global connection for permanent event listeners
let counter = 0; // payload counter, just for kicks

function onNotification(data) {
    console.log(`Received Payload ${++counter}: ${data.payload}`);
}

function setupListeners(client) {
    client.on('notification', onNotification);
    return connection.none('LISTEN ${channel:name}', {channel})
        .catch(error => {
            console.log(error); // unlikely to ever happen
        });
}

function removeListeners(client) {
    client.removeListener('notification', onNotification);
}

function onConnectionLost(err, e) {
    console.log('Connectivity Problem:', err);
    connection = null; // prevent use of the broken connection
    removeListeners(e.client);
    reconnect(5000, 10) // retry 10 times, with 5-second intervals
        .then(() => {
            console.log('Successfully Reconnected');
        })
        .catch(() => {
            // failed after 10 attempts
            console.log('Connection Lost Permanently');
            process.exit(); // exiting the process
        });
}

function reconnect(delay, maxAttempts) {
    delay = delay > 0 ? parseInt(delay) : 0;
    maxAttempts = maxAttempts > 0 ? parseInt(maxAttempts) : 1;
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            db.connect({direct: true, onLost: onConnectionLost})
                .then(obj => {
                    connection = obj; // global connection is now available
                    resolve(obj);
                    return setupListeners(obj.client);
                })
                .catch(error => {
                    console.log('Error Reconnecting:', error);
                    if (--maxAttempts) {
                        reconnect(delay, maxAttempts)
                            .then(resolve)
                            .catch(reject);
                    } else {
                        reject(error);
                    }
                });
        }, delay);
    });
}

function sendNotifications() {
    // send a notification to our listener every second:
    setInterval(() => {
        if (connection) {
            // We execute NOTIFY here just to emulate what's normally done by the server:
            const d = new Date();
            const payload = `my-payload (${d.getHours()}:${d.getMinutes()}:${d.getSeconds()})`;
            connection.none('NOTIFY ${channel:name}, ${payload}', {channel, payload})
                .catch(error => {
                    console.log('Failed to Notify:', error); // unlikely to ever happen
                })
        }
    }, 1000);
}

reconnect() // = same as reconnect(0, 1)
    .then(obj => {
        console.log('Successful Initial Connection');
        // obj.done(); - releases the connection
        sendNotifications();
    })
    .catch(error => {
        console.log('Failed Initial Connection:', error);
    });

In order to test this code properly, you will need to manually disrupt the communications.

Here are some ways of how this can be done while connected to your local database server:

  • To temporarily kill all connections to your test database, execute the following SQL:
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='my-database-name';

You must provide the correct database name in the query, and make sure to execute it from a connection to a different database, or else you will be killing the current connection, which is likely to crash your pgAdmin UI.

  • To permanently disconnect, locate your PostgreSQL Server in the list of services, and stop it.

  • There are many TCP utilities that can help you disrupt communications on TCP level for your database port.