Skip to content

ConduitIO/conduit-kafka-connect-wrapper

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Quality Gate Status

Conduit's Kafka connector wrapper

The goal of Conduit's Kafka connector wrapper is to make it possible to use existing Kafka connectors with Conduit.

Pre-requisites

  • JDK 20
  • Currently, only Unix-like OSes are supported.

Logging

The connector exposes a gRPC streaming method, plugin.GRPCStdio/StreamStdio, through which logs are sent to Conduit.

Development

The complete server-side code for this plugin is not committed to the repo. Rather, it's generated from the proto files, when the project is compiled.

In a few integration tests we use Aiven's JDBC connector. Since it's not available in public Maven repositories, it needs to be compiled and installed to a local Maven repository, so it can be used. A script is included in this repository which lets you clone and build the correct version of the connector. It can be run like this ./scripts/get-jdbc-connector.sh /path/to/repositories/.

After that, run mvn clean compile (this also generates needed code from proto files). Some IDEs may not automatically use the generated sources. If that's the case, you need to change the project settings in your IDE to include the generated source. In IntelliJ, for example, you do that by going to File > Project structure > Project Settings > Modules. Then, right-click on target/generated-source and select "Sources".

Code quality

Code analysis is done through SonarCloud, where we have a conduitio organization.

The sonarcloud-analysis GitHub workflow utilizes the Maven sonar:sonar goal, which requests an analysis on SonarCloud. Once the analysis is complete, SonarCloud will update the pull request's check. That can sometimes take a couple of minutes. That means that when this check passes, that doesn't automatically mean that the code quality check passed to.

When developing, you can the SonarLint plugin, which is available for major IDEs.

For the analysis to work, a Sonar token is needed. It's stored as a GitHub repository secret named SONAR_TOKEN. The token can be generated by going to the SonarCloud project, then Configure > Choose your Analysis Method: > With GitHub Actions. Here's the direct link at the time of writing this.

Note regarding code coverage: SonarCloud's built-in quality gate requires all new code to have at least 80% test coverage. However, in some cases that may not be required, e.g. when simply a variable was renamed. To work with those cases, we have a quality gate with 0% required coverage. Stating the obvious, that should only be used in rare cases, it should be noted in a PR, and the quality gate should be set back to the default one after the PR is done.

Building and using the connector

At a high level, to use this wrapper, you need to:

  1. Build it
  2. Put any Kafka connector JARs you may need into the libs directory.
  3. Create a Conduit connector configuration, where the plugin path is the path to conduit-kafka-connect-wrapper.
  4. Add the Kafka connector configuration you'd normally use.

Let's go into more details of this:

Run scripts/dist.sh to build an executable. scripts/dist.sh will create a directory called dist with following contents:

  1. A script (which runs the connector). This script starts a connector instance.
  2. The connector JAR itself
  3. Directory libs. This is where you put the Kafka connector JARs and their dependencies (if any).

When creating a Conduit connector, the plugin path you need to use is the path to conduit-kafka-connect-wrapper. Here's a full working example of a Conduit connector configuration:

{
  "type": "TYPE_SOURCE",
  "plugin": "/home/conduit-dev/projects/conduitio/conduit-kafka-connect-wrapper/dist/conduit-kafka-connect-wrapper",
  "pipeline_id": "f24c4a70-8664-4f80-9c27-204825442943",
  "config": {
    "name": "my-pg-source",
    "settings": {
      "wrapper.connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:postgresql://localhost/conduit-test-db",
      "connection.user": "username",
      "connection.password": "password",
      "incrementing.column.name": "id",
      "mode": "incrementing",
      "tables": "customers",
      "topic.prefix": "my_topic_prefix"
    }
  }
}

Note that the wrapper.connector.class should be a class which is present on the classpath, i.e. in one of the JARs in the libs directory. For more information, theck the Configuration section.

Download a connector and its dependencies

To download a connector from a Maven repository and all of its dependencies, you can use scripts/download-connector.sh. For example:

./scripts/download-connector.sh io.example jdbc-connector 2.1.3

For usage, run ./scripts/download-connector.sh --help.

Loading connectors

The plugin will load connectors and all the other dependencies from a libs directory, which is expected to be in the same directory as the plugin executable itself. For example, if the plugin executable is at /abc/def/conduit-kafka-connect-wrapper, then the dependencies are expected to be in /abc/def/libs.

The plugin will be able to find the dependencies as soon as they are put into libs. Please note that, a JDBC connector (generally) will require a database-specific driver to work (for example, PostgreSQL's driver can be found here).

Configuration

This plugin's configuration consists of the configuration of the requested Kafka connector, plus:

  • wrapper.connector.class

    • Description: The class of the requested connector. It needs to be found on the classpath, i.e. in a JAR in the libs directory.
    • Required: yes
    • Default: none
    • Example: io.aiven.connect.jdbc.JdbcSourceConnector
  • wrapper.log.level

    • Description: Root logging level for the wrapper and the requested connector.

      Note: For log4j, the logging level is currently hard-coded to INFO.

    • Required: no

    • Default: "INFO"

    • Example: DEBUG

  • wrapper.debezium.schema.save

    • Description: Saves the schema from Debezium's source field to the Conduit record's metadata. The metadata key is kafkaconnect.value.schema.
    • Required: no
    • Default: "false"
    • Example: true
  • wrapper.schema

    • Description: The schema of the records which will be written to a destination connector.
    • Required: the plugin doesn't require it, but the underlying Kafka connector may
    • Default: none
    • Example: {"type":"struct","fields":[{"type":"int32","optional":true,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"boolean","optional":true,"field":"trial"}],"name":"customers"}
  • wrapper.schema.autogenerate.enabled

    • Description: Automatically generate schemas (destination connector). Cannot be true if a schema is set.
    • Required: no
    • Default: false
    • Example: true
  • wrapper.schema.autogenerate.name

    • Description: Name of automatically generated schema.
    • Required: yes, if schema auto-generation is turned on
    • Default: none
    • Example: customers
  • wrapper.schema.autogenerate.overrides

    • Description: A (partial) schema which overrides types in the auto-generated schema.
    • Required: no
    • Default: none
    • Example: {"type":"struct","fields":[{"type":"boolean","optional":true,"field":"joined"}],"name":"customers"}

Here's a full example, for a new Conduit destination connector, backed up by a JDBC Kafka sink connector.

{
  "wrapper.connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
  "wrapper.log.level": "DEBUG",
  "connection.url": "jdbc:postgresql://localhost/test-db",
  "connection.user": "test-user",
  "connection.password": "Passw0rd",
  "incrementing.column.name": "id",
  "mode": "incrementing",
  "tables": "my_table",
  "topic.prefix": "my_topic_prefix"
}

All the configuration parameters prefixed with wrapper. belong to the Kafka Connect wrapper and is used to control its behavior. All other configuration parameters are forwarded to the underlying Kafka connector as-is. In this example, wrapper.connector.class is telling the wrapper to instantiate a JDBC source connector. connection.url and all the other parameters are specific to the JDBC source connector.

Schema auto-generation

If schema.autogenerate.enabled is set to true, the plugin will try to automatically generate Kafka connector schemas for a destination. If schema auto-generation is enabled, then a schema name must be provided (through the schema.autogenerate.name parameters).

Optionally, it's possible to override types for individual fields. This is useful in cases where the plugin's inferred type for a field is not suitable. To override types for individual fields, specify a schema through schema.autogenerate.overrides. The specified schema is, of course, partial.

Here's an example:

{
    "type": "struct",
    "fields":
    [
        {
            "type": "boolean",
            "optional": false,
            "field": "joined"
        }
    ],
    "name": "customers"
}

In this example we specify a partial schema, where a single field, joined, is defined. Schema generator will skip its own specifications for this field and instead use the provided one.

Schema auto-generation works differently for records with structured data and records with raw data.

  1. Records with structured data: A record with structured data contains a google.protobuf.Struct. The mappings are as follows:
google.protobuf.Struct (protobuf) Kafka schema
bool OPTIONAL_BOOLEAN_SCHEMA
number (only double is supported) OPTIONAL_FLOAT64_SCHEMA
string OPTIONAL_STRING_SCHEMA
NullValue STRUCT
ListValue ARRAY, where element types correspond to element type from the protobuf ListValue
  1. Records with raw data, in JSON format: The mappings are as follows:
JSON Kafka schema
OBJECT STRUCT
STRING OPTIONAL_STRING_SCHEMA
NUMBER the narrowest integer or float schema for the number
BOOLEAN boolean
  1. Records with raw data, and no schema at all - not supported (yet).