Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a /system route #347

Closed
4 tasks done
mStirner opened this issue Nov 10, 2023 · 5 comments
Closed
4 tasks done

Create a /system route #347

mStirner opened this issue Nov 10, 2023 · 5 comments

Comments

@mStirner
Copy link
Member

mStirner commented Nov 10, 2023

create /system route for other sub-routes, like:

  • /logs
  • /notification
  • /info / /about
  • /version
  • etc.

  • Notifications
  • Sysem usage
  • Loggfiles
  • Information (connector(s), verions (backend/connector/database/node/etc.), etc. pp)
@mStirner
Copy link
Member Author

router.system.notifications.js (draft)

const process = require("process");
const express = require("express");
const ws = require("ws");

module.exports = (app, system) => {

    let router = express.Router();
    system.use("/notifications", router);

    // websocket server
    let wss = new ws.Server({
        noServer: true
    });

    // detect broken connections
    let interval = setInterval(() => {
        wss.clients.forEach((ws) => {

            if (!ws.isAlive) {
                ws.terminate();
                return;
            }

            ws.isAlive = false;
            ws.ping();

        });
    }, Number(process.env.API_WEBSOCKET_TIMEOUT));


    // if the server closes
    // clear the interval
    wss.on("close", () => {
        clearInterval(interval);
    });


    // listen for new ws connections
    wss.on("connection", (ws) => {

        let interval = setInterval(() => {
            ws.send(JSON.stringify({
                title: "Hello World",
                message: "This is a notification message",
                timestamp: Date.now()
            }));
        }, 5000);

        ws.once("close", () => {
            clearInterval(interval);
        });

    });


    // TODO: Add websocket handling/endpoint
    router.get("/", (req, res) => {
        if ((req.headers["upgrade"] && req.headers["connection"])) {

            // handle websocket upgrade
            wss.handleUpgrade(req, req.socket, req.headers, (ws) => {

                ws.isAlive = true;

                ws.on("pong", () => {
                    ws.isAlive = true;
                });

                wss.emit("connection", ws, req);

            });

        } else {

            // 501 or 400 status?!
            res.status(400).end();

        }
    });

};

@mStirner
Copy link
Member Author

mStirner commented Feb 20, 2024

router.system.info.js (draft)

const process = require("process");
const mongodb = require("mongodb");
const express = require("express");



module.exports = (app, system) => {

    let router = express.Router();
    system.use("/info", router);

    // TODO: Add websocket handling/endpoint
    router.get("/", (req, res) => {
        mongodb.client.admin().serverStatus((err, info) => {
            if (err) {

                res.status(500).json({
                    error: err
                });

            } else {

                res.json({
                    versions: {
                        node: process.versions.node,
                        mongodb: info.version
                    },
                    //uptime: Date.now(),
                    environment: process.env
                });

            }
        });
    });

};

Caution

Reavelaing secrets set via environment variables is a very bad idea!

@mStirner
Copy link
Member Author

mStirner commented Feb 20, 2024

router.system.usage.js (draft)

//const ENVIRONMENT = require("./system/environments.js");
const os = require("os");
const v8 = require("v8");
const process = require("process");
const express = require("express");

module.exports = (app, system) => {

    let router = express.Router();
    system.use("/usage", router);

    // TODO: Add websocket handling/endpoint
    router.get("/", (req, res) => {

        res.json({
            heap: v8.getHeapStatistics(),
            cpu: {
                cores: os.cpus(),
                //usage: calcCPUPrecentage()
            },
            ram: {
                free: os.freemem(),
                total: os.totalmem()
            },
            usage: process.resourceUsage()
        });

    });

};

mStirner pushed a commit to mStirner/backend that referenced this issue Apr 25, 2024
mStirner added a commit to mStirner/backend that referenced this issue Apr 28, 2024
mStirner added a commit to mStirner/backend that referenced this issue Apr 28, 2024
@mStirner
Copy link
Member Author

router.system.logs.js

const path = require("path");
const fs = require("fs");
const { exec } = require("child_process");
const { WebSocket } = require("ws");
const { Readable } = require("stream");
const { createInterface } = require("readline");

const {
    LOG_PATH
} = process.env;

const LOGFILE = path.resolve(LOG_PATH, "combined.log");

const logger = require("../system/logger");

// websocket server
const wss = new WebSocket.Server({
    noServer: true
});

// detect broken connections
const interval = setInterval(() => {
    wss.clients.forEach((ws) => {

        if (!ws.isAlive) {
            ws.terminate();
            return;
        }

        ws.isAlive = false;
        ws.ping();

    });
}, Number(process.env.API_WEBSOCKET_TIMEOUT));


// if the server closes
// clear the interval
wss.on("close", () => {
    clearInterval(interval);
});


module.exports = (router) => {

    // NOTE: this is for compatibility reasons
    // in v4 the content from router.api.logs.js is moved here
    //require("./router.api.logs.js")(null, router);

    router.get("/", (req, res) => {
        if ((!req.headers["upgrade"] || !req.headers["connection"])) {

            let {
                limit = 0,
                offset = 0
            } = req.query;

            let output = [];
            let lines = 0;

            let readable = fs.createReadStream(LOGFILE);
            let rl = createInterface({
                input: readable
            });

            rl.on("line", (line) => {

                // count lines
                lines += 1;

                if (output.length >= limit && limit > 0) {
                    rl.close();
                    return;
                }

                if (offset < lines) {
                    output.push(JSON.parse(line));
                }

            });

            rl.on("close", () => {
                res.json(output);
            });


        } else {

            // listen for websockt clients
            // keep sending new log entrys to client
            wss.once("connection", (ws) => {

                let controller = new AbortController(); // used to stop watcher
                let { signal } = controller;

                let input = new Readable({
                    read() { }
                });

                let rl = createInterface({
                    input
                });

                rl.on("line", (line) => {
                    ws.send(line);
                });

                // cleanup when ws connection is closed
                // everything triggers ws.terminate();
                // whitch results in emitting close
                ws.once("close", () => {
                    controller.abort(); // stop fs.watch()
                    rl.close(); // stop readline
                    input.destroy(); // destory readable
                });

                // https://gist.github.com/fkowal/3447400
                fs.open(LOGFILE, "r", (err, fd) => {

                    if (err) {
                        ws.terminate();
                        return;
                    }

                    let position = 0;
                    let prev_stats = null;

                    fs.stat(LOGFILE, (err, stats) => {
                        if (err) {

                            console.error(err);
                            ws.terminate();

                        } else {

                            // set position to end of file
                            // receive only new messages
                            prev_stats = stats;
                            position = stats.size;

                        }
                    });

                    let watcher = fs.watch(LOGFILE, {
                        signal
                    });

                    watcher.once("error", () => {
                        ws.terminate();
                    });

                    watcher.on("change", () => {

                        // could be possible that changes  happens before stats are available
                        // this would break and flood everything from the log file
                        if (!prev_stats) {
                            return;
                        }

                        fs.stat(LOGFILE, (err, stats) => {
                            if (err) {

                                console.error(err);
                                ws.terminate();

                            } else {

                                if (prev_stats.size > stats.size) {
                                    position = 0;
                                }

                                prev_stats = stats;

                            }
                        });

                        fs.read(fd, {
                            position,
                            encoding: "utf8"
                        }, (err, bytesRead, buffer) => {
                            if (err) {

                                console.error(err);
                                ws.terminate();

                            } else {

                                position += bytesRead;
                                input.push(buffer.slice(0, bytesRead));

                            }
                        });

                    });

                    // close event loop
                    watcher.unref();

                });

            });

            // handle request as websocket
            // perform websocket handshake 
            wss.handleUpgrade(req, req.socket, req.headers, (ws) => {

                ws.isAlive = true;

                ws.on("pong", () => {
                    ws.isAlive = true;
                });

                wss.emit("connection", ws, req);

            });

        }
    });

    router.delete("/", (req, res) => {
        fs.readdir(LOG_PATH, {
            recursive: true
        }, (err, files) => {
            if (err) {

                res.status(500).json({
                    error: err
                });

            } else {
                try {

                    // build absolute paths
                    let logfiles = files.filter((name) => {
                        return name !== ".gitkeep";
                    }).map((name) => {
                        return path.join(LOG_PATH, name);
                    }).filter((file) => {
                        return !fs.statSync(file).isDirectory();
                    });

                    for (let file of logfiles) {
                        if (req.query.delete === "true") {
                            fs.rmSync(file);
                        } else {
                            fs.truncateSync(file);
                        }
                    }

                    // feedback
                    logger.warn(`Logfiles ${req.query.delete === "true" ? "deleted" : "truncated"}!`);

                    res.json(logfiles);

                } catch (err) {

                    res.status(500).json({
                        error: err
                    });

                }
            }
        });
    });

    router.post("/export", (req, res) => {
        try {

            // TODO: Ensure that only admins can download logs
            // a logfile may contain sensitive information!

            // TODO: use absolute path to tar
            // see: https://github.com/OpenHausIO/backend/issues/432
            let tar = exec(`tar -czv *`, {
                cwd: LOG_PATH,
                encoding: "buffer"
            });

            if (process.env.NODE_ENV === "development") {
                tar.stderr.pipe(process.stderr);
            }

            res.setHeader("content-type", "application/tar+gzip");

            tar.once("exit", (code) => {
                console.log("exit code", code);
                res.end();
            });

            tar.stdout.pipe(res);

        } catch (err) {

            console.log(err);

            res.status(500).json({
                error: err
            });

        }
    });

};

Important

The above code was implemented, and runs on the dev system
The websocket endpoint of ws://<host>:<port>/api/system/logs is very buggy with the fs.watch.
Instead there were are refactor of the logger that implments a "export" stream
This stream is used to pipe it into websocket clients, see code below

Logger export stream:

            wss.once("connection", (ws) => {

                let input = new PassThrough();
                exporter.pipe(input);

                ws.once("close", () => {

                    // prevent memeory/event emitter leak
                    // wihtout enpipe, after 10 connections a memeory leak warning is printed
                    exporter.unpipe(input);

                    rl.close();

                });

                let rl = createInterface({
                    input
                });

                rl.on("line", (line) => {
                    ws.send(line);
                });

            });

mStirner added a commit to mStirner/backend that referenced this issue May 4, 2024
@mStirner
Copy link
Member Author

mStirner commented May 22, 2024

  • Update postman collection with new http routes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant