Skip to content

Latest commit

History

History

influxdb-and-kafka

Folders and files

NameName
Last commit message
Last commit date

parent directory

..

Getting data into InfluxDB from Kafka with Kafka Connect

JSON, using embedded Kafka Connect schema and a map field type for tags.

Load test data:

docker exec -i kafkacat kafkacat -b kafka-1:39092 -P -t json_01 <<EOF
{ "schema": { "type": "struct", "fields": [ { "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false}, { "field": "stock", "type": "double", "optional": true } ], "optional": false, "version": 1 }, "payload": { "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 } }
EOF

Check data is there:

docker exec kafkacat kafkacat -b kafka-1:39092 -C -u -t json_01

Create the connector:

curl -i -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_json_01/config \
        -d '{
            "connector.class"               : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",
            "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                        : "json_01",
            "influxdb.url"                  : "http://influxdb:8086",
            "influxdb.db"                   : "my_db",
            "measurement.name.format"       : "${topic}"
        }'

Make sure that the connector鈥檚 running

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
sink  |  sink_influx_json_01  |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector

Check that the data鈥檚 made it to InfluxDB:

$ docker exec -it influxdb influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01

$ docker exec -it influxdb influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product

$ docker exec -it influxdb influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579779810974000000 500
1579779820028000000 500
1579779833143000000 500

Avro

Load test data:

docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --broker-list kafka-1:39092 --topic avro_01 --property value.schema='{ "type": "record", "name": "myrecord", "fields": [ { "name": "tags", "type": { "type": "map", "values": "string" } }, { "name": "stock", "type": "double" } ] }' <<EOF
{ "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 }
EOF

Check the data鈥檚 there (I鈥檓 using kafkacat just to be contrary; you use use kafka-avro-console-consumer too):

$ docker exec -i kafkacat kafkacat -b kafka-1:39092 -C -t avro_01 -r http://schema-registry:8081 -s avro
{"tags": {"host": "FOO", "product": "wibble"}, "stock": 500.0}

Create the connector:

curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_avro_01/config \
        -d '{
            "connector.class"                    : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                             : "avro_01",
            "influxdb.url"                       : "http://influxdb:8086",
            "influxdb.db"                        : "my_db",
            "measurement.name.format"            : "${topic}"
        }'

Make sure that the connector鈥檚 running

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
sink  |  sink_influx_avro_01  |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector

Check that the data鈥檚 made it to InfluxDB

$ docker exec -it influxdb influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579781680622000000 500