Skip to content

A lightweight Java message queue library built on your RDBMS.

License

Notifications You must be signed in to change notification settings

hailuand/corgimq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Corgi Message Queue (CorgiMQ)

mascot.jpg

Workflow Status
Build & test build codecov
Quality OpenSSF Scorecard codeql
Packages Maven Central Version
Docs License javadoc

A lightweight message queue library built using Java's JDBC API. Similar in spirit to AWS SQS and Redis Simple Message Queue, but entirely on top of your DBMS.


Features

  • Lightweight: bring just your DBMS. πŸš€
  • Batteries included: sensible out-of-the-box defaults with a few optional knobs to get you dangerous fast. πŸ”‹
  • Transactional: shared access of JDBC Connection available to provide transactional message handling. 🀝
  • Auditable: audit information in the queue captures if, when, and who has read a message. πŸ•΅οΈ
  • Guaranteed exactly-once delivery of a message to a reader - if someone's currently reading it, no one else receives it.
  • Messages remain in queue until removed.

Index


Compatability

Java

CorgiMQ Version JDK
0.1+ 21

RDMS

RDBMS in this list have been tested for library compatability and are included in the test suites.

DBMS Status
H2 βœ…
CockroachDB βœ…
MySQL βœ…
Postgres βœ…

Get started

Creating a queue

A message queue is managed by an instance of MessageQueue.

Connection conn = // ...
MessageQueue messageQueue = MessageQueue.of(MessageQueueConfig.of("poneglyphs"), conn); // Name of queue, table will have '_q' suffix
messageQueue.createTableWithSchemaIfNotExists();
postgres=# \dt mq.*
           List of relations
 Schema |     Name     | Type  |  Owner
--------+--------------+-------+---------
 mq     | poneglyphs_q | table | shanks
(1 row)

Pushing messages

Once a MessageQueue is created, data to enqueue is wrapped as a Message. A Message is enqueued with push(List):

Message message1 = Message.of("Whole Cake Island");
Message message2 = Message.of("Zou");
messageQueue.push(List.of(message1, message2));

Table after push:

postgres=# select * from mq.poneglyphs_q;
                  id                  |        data       |        message_time        | read_count | read_by | processing_time
--------------------------------------+-----------------+----------------------------+------------+---------+-----------------
 9245867e-f7f1-40e4-9142-bb1457aff9ec | Whole Cake Island | 2024-02-27 10:12:57.486346 |          0 |         |
 a174f9d1-d3a9-4583-9396-d3ed575a4ebf | Zou               | 2024-02-27 10:12:57.486346 |          0 |         |
(2 rows)

Reading messages

Unread Messages in the queue are read in ascending message_time with an instance of MessageHandler:

MessageHandler messageHandler = MessageHandler.of(messageQueue, MessageHandlerConfig.of(1)); // Read one message at a time
Supplier<Connection> connectionSupplier = // Code to acquire a connection to database
messageHandler.listen(connectionSupplier, messageHandlerBatch -> {
    for (Message message : messageHandlerBatch.messages()) {
        System.out.printf("Shanks - we found a road poneglyph at %s!%n", message.data());
    }
    return messageBatch.messages(); // List of Messages to be popped
    });
// "Shanks - we found a road poneglyph at Whole Cake Island!"

Table after handler execution:

postgres=# select * from mq.poneglyphs_q;
                  id                  |        data       |        message_time        | read_count | read_by  |      processing_time
--------------------------------------+-----------------+----------------------------+------------+----------+----------------------------
 a174f9d1-d3a9-4583-9396-d3ed575a4ebf | Zou               | 2024-02-27 10:12:57.486346 |          0 |          |
 9245867e-f7f1-40e4-9142-bb1457aff9ec | Whole Cake Island | 2024-02-27 10:12:57.486346 |          1 | beckman  | 2024-02-27 10:14:29.911197
(2 rows)

After read, Messages have their read_count, read_by, and processing_time updated. Subsequent calls to listen() no longer receive them.


βš™οΈ Configuration

Message queue

πŸ”€ queueName

Name of message queue. Each queue has its own table within the mq schema, suffixed with _q.

Message handler

πŸ”’ messageBatchSize

Maximum number of Messages to serve to a MessageHandler in a single batch.

Default: 10

✏️ Notes

Locking

A Message currently being read by a MessageHandler has its row locked until the function completes. If multiple MessageHandlers are operating on the same queue, locked rows are skipped to prevent the same message being received by different handlers. This is achieved using the RDBMS's SELECT FOR UPDATE command.

Transactional message handling

The transaction's Connection can be accessed through the MessageHandlerBatch argument passed to the Function's input:

MessageHandler messageHandler = MessageHandler.of(messageQueue, MessageHandlerConfig.of(1));
Supplier<Connection> connectionSupplier = // ...
messageHandler.listen(connectionSupplier, messageHandlerBatch -> {
    try(Statement statement : messageHandlerBatch.transactionConnection()) {
        // Do something with Statement to participate in transaction
    } catch (SQLException e) {
        throw new RuntimeException(e);
    }
    
    return messageBatch.messages();
    });

References