Skip to content

conduitio-labs/conduit-connector-rabbitmq

Repository files navigation

Conduit Connector for RabbitMQ

The RabbitMQ connector is one of Conduit standalone plugins. It provides both a source and a destination connector for RabbitMQ.

It uses the AMQP 0-9-1 Model to connect to RabbitMQ.

What data does the OpenCDC record consist of?

Field Description
record.Position json object with the delivery tag and the queue name from where the record was read from.
record.Operation currently fixed as "create".
record.Metadata a string to string map, with keys prefixed as rabbitmq.{DELIVERY_PROPERTY}.
record.Key the message id from the read message.
record.Payload.Before
record.Payload.After the message body

How to Build?

Run make build to compile the connector.

Testing

Execute make test to perform all non-tls tests. Execute make test-tls for the TLS tests. Both commands use docker files located at test/docker-compose.yml and test/docker-compose-tls.yml respectively. Tests require docker-compose v2.

Source Configuration Parameters

Name Description Required Default Value
url The RabbitMQ server's URL. Yes
tls.enabled Flag to enable or disable TLS. false false
tls.clientCert Path to the client certificate for TLS. No
tls.clientKey Path to the client's key for TLS. No
tls.caCert Path to the CA (Certificate Authority) certificate for TLS. No
queue.name The name of the RabbitMQ queue to consume messages from. Yes
queue.durable Specifies whether the queue is durable. No true
queue.autoDelete If the queue will auto-delete. No false
queue.exclusive If the queue is exclusive. No false
queue.noWait If the queue is declared without waiting for server reply. No false
consumer.name The name of the consumer. No
consumer.autoAck If the server should consider messages acknowledged once delivered. No false
consumer.exclusive If the consumer should be exclusive. No false
consumer.noLocal If the server should not deliver messages published by the same connection. No false
consumer.noWait If the consumer should be declared without waiting for server confirmation. No false

Destination Configuration Parameters

Name Description Required Default Value
url The RabbitMQ server's URL. Yes
tls.enabled Flag to enable or disable TLS. false false
tls.clientCert Path to the client certificate for TLS. No
tls.clientKey Path to the client's key for TLS. No
tls.caCert Path to the CA (Certificate Authority) certificate for TLS. No
queue.name The name of the RabbitMQ queue where messages will be published to. Yes
queue.durable Specifies whether the queue is durable. No true
queue.autoDelete If the queue will auto-delete. No false
queue.exclusive If the queue is exclusive. No false
queue.noWait If the queue is declared without waiting for server reply. No false
contentType The MIME content type of the messages written to RabbitMQ. No text/plain
delivery.contentEncoding The content encoding for the message. No
delivery.deliveryMode Delivery mode of the message. Non-persistent (1) or persistent (2). No 2
delivery.priority The priority of the message. No 0
delivery.correlationID The correlation id associated with the message. No
delivery.replyTo Address to reply to. No
delivery.messageTypeName The type name of the message. No
delivery.userID The user id associated with the message. No
delivery.appID The application id associated with the message. No
delivery.mandatory Indicates if this message is mandatory. No false
delivery.immediate Indicates if this message should be treated as immediate. No false
delivery.expiration Indicates the message expiration time, if any. No
exchange.name The name of the exchange to publish to. No
exchange.type The type of the exchange to publish to. No direct
exchange.durable Specifies whether the exchange is durable. No true
exchange.autoDelete If the exchange will auto-delete. No false
exchange.internal If the exchange is internal. No false
exchange.noWait If the exchange is declared without waiting for server reply. No false
routingKey The routing key to use when publishing to an exchange. No

Example pipeline.yml file

Here's an example of a pipeline.yml file using file to RabbitMQ and RabbitMQ to file pipelines:

version: 2.0
pipelines:
  - id: file-to-rabbitmq
    status: running
    connectors:
      - id: file.in
        type: source
        plugin: builtin:file
        name: file-destination
        settings:
          path: ./file.in
      - id: rabbitmq.out
        type: destination
        plugin: standalone:rabbitmq
        name: rabbitmq-source
        settings:
          url: amqp://guest:guest@localhost:5672/
          queue.name: demo-queue
          sdk.record.format: template
          sdk.record.format.options: '{{ printf "%s" .Payload.After }}'

  - id: rabbitmq-to-file
    status: running
    connectors:
      - id: rabbitmq.in
        type: source
        plugin: standalone:rabbitmq
        name: rabbitmq-source
        settings:
          url: amqp://guest:guest@localhost:5672/
          queue.name: demo-queue

      - id: file.out
        type: destination
        plugin: builtin:file
        name: file-destination
        settings:
          path: ./file.out
          sdk.record.format: template
          sdk.record.format.options: '{{ printf "%s" .Payload.After }}'