Skip to content

TiVo/wombat

Repository files navigation

Overview
--------
Wombat is a program that replicates data from MySQL to Apache Kafka. It replicates row-level data in near real-time.

It uses the MySQL replication protocol to speak to a MySQL server and act as a MySQL replication follower. Wombat receives a stream of row-level changes, and writes each change to Kafka. If a row is inserted/updated, Wombat sends the row's new data to Kafka. If a row is deleted, Wombat sends a delete marker to Kafka.

Key features of Wombat:
* Works with any MySQL/MariaDB-compatible server (tested against MariaDB 5.5 and MariaDB 10.0 (haven't done a lot of testing).
* Safe. It replicates from the master MySQL database with just read-only, replication-only permissions. (need to test this)
* Low impact on performance. It replicates asynchronously.
* Transactionally consistent output. Writes only appear if they have been successfully committed to MySQL; aborted transactions do not emit messages. Writes appear in the same order as they were committed to MySQL.
* Fault-tolerant. If Wombat crashes, it will resume replication at the point where it failed. It will not lose any data.

Demo
----
[See demo_video.mov file]

Building
--------
To compile wombat:
$ ./gradlew build fatJar

To build the docker quickstart images:
$ docker build -f replicator/Dockerfile.quickstart  -t wombat-quickstart replicator
$ docker build -t mariadb-quickstart mariadb-quickstart

Runtime requirements
--------------------
MySQL configuration requirements:
* row-based binlog replication enabled (https://dev.mysql.com/doc/refman/5.1/en/replication-howto-masterbaseconfig.html)
* --binlog-row-image=full (http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#sysvar_binlog_row_image)
* all tables must have primary keys
* Only replicates file/offset replication, does not replicate systems using GTIDs.

Quickstart
----------
The easiest way to start playing with Wombat is to use the pre-built and pre-configured Docker images. They have a hardcoded configuration and are not intended for production use.

  $ docker run -d --name zookeeper --hostname zookeeper confluent/zookeeper
  $ docker run -d --name kafka --hostname kafka --link zookeeper:zookeeper --env KAFKA_LOG_CLEANUP_POLICY=compact confluent/kafka
  $ docker run -d --name mariadb mariadb-quickstart

  # run wombat
  $ docker run --rm -it --link kafka:kafka --link zookeeper:zookeeper --link mariadb:mariadb --name wombat wombat-quickstart

  # interact. consume from kafka.
  $ docker run -it --rm --link zookeeper:zookeeper --link kafka:kafka confluent/tools /bin/bash -c '/usr/bin/kafka-console-consumer --zookeeper ${ZOOKEEPER_PORT_2181_TCP_ADDR}:${ZOOKEEPER_PORT_2181_TCP_PORT} --topic prefix.test.users --property print.key=true --from-beginning'

  # insert some data
  $ docker run --rm -it  --link mariadb:mariadb mariadb:5.5 sh -c 'mysql --user=root --password=${MARIADB_ENV_MYSQL_ROOT_PASSWORD} --host=${MARIADB_PORT_3306_TCP_ADDR} --port=${MARIADB_PORT_3306_TCP_PORT}'
    MariaDB [(none)]> create database test;
    MariaDB [(none)]> create table test.users (userId int auto_increment primary key, name char(128));
    MariaDB [(none)]> insert into test.users (name) values ("James");
    MariaDB [(none)]> update test.users set name = "James Cheng" where name = "James";
    MariaDB [(none)]> delete from test.users where name = "James Cheng";

  # clean up
  $ docker rm -f wombat  mariadb kafka zookeeper

Consuming
---------
Wombat encodes the row data using JSON, and then wraps it in an HTTP envelope. The envelope headers are used for monitoring replication progress.

Each table is replicated to a different Kafka topic.

Given the following table structure:
CREATE TABLE users (userId INT AUTO_INCREMENT PRIMARY KEY, name CHAR(128));

Inserts:
INSERT INTO users (name) VALUES ("James");

BL/1 143 27
ServerId: mysqlhost.domainname
BinlogEventStartPosition: 8288
BinlogFilename: mariadb-bin.000008
BinlogTransactionStartPosition: 8173

{"name":"James","userId":1}

Updates:
UPDATE users SET name = "James Cheng" WHERE name="James";

BL/1 143 33
ServerId: mysqlhost.domainname
BinlogEventStartPosition: 8470
BinlogFilename: mariadb-bin.000008
BinlogTransactionStartPosition: 8355

{"name":"James Cheng","userId":1}

Deletes:
DELETE FROM users WHERE name = "James Cheng";

null

Deletes are represented as a message with a null payload. 

Status
------
Wombat is pre-alpha software. It has only been tested against a limit set of inputs.

A lot of thought has been put into its design, but not everything is implemented. I am looking for feedback to help validate the design and guide the implementation.

Caveats/issues
--------------
* MySQL schema changes are not handled.
* Not all MySQL data types are handled properly.
* Not tolerant to disk failure (it saves its MySQL replication position into a local file).
* Errors are not handled. Since it is still in development mode, it prints errors to stderr and System.exit()'s whenever it encounters an error.

Inspiration
-----------
Databus - https://github.com/linkedin/databus

License
-------
Copyright 2015 TiVo Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License in the enclosed file called LICENSE.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Contributing
------------
See CONTRIBUTING.md

Running manually
----------------
How to run it manually, without docker.


Wombat takes configuration from two config files that are passed in at startup.

$ cat config.properties
MYSQL_SOURCE_IDENTIFIER=myServerId
MYSQL_SOURCE_HOST=192.168.1.1
MYSQL_SOURCE_PORT=3306
MYSQL_SOURCE_USER=replication-user
MYSQL_SOURCE_PASSWORD=replication-password
KAFKA_BROKER=192.168.1.1
KAFKA_PORT=9092
KAFKA_TOPIC_BASE=topic.prefix

$ cat checkpoint.txt
BINLOG_FILENAME=mariadb-bin.000003
BINLOG_POSITION=4

$ export CONFIG_FILE=config.properties
$ export CHECKPOINT_FILE=checkpoint.txt

$ java -jar replication/build/libs/replication.jar 

Configuration
-------------
MYSQL_SOURCE_IDENTIFIER
        An arbitrary string that will be added to each message that originates from this MySQL server
MYSQL_SOURCE_HOST
        Hostname or IP address of the MySQL server to replicate from.
MYSQL_SOURCE_PORT
        The port that MySQL listens on
MYSQL_SOURCE_USER
        The username used to connect to MySQL
MYSQL_SOURCE_PASSWORD
        The password used to connect to MySQL
SERVER_ID
        MySQL's server id setting

KAFKA_BROKER
        IP/Hostname of one of the Kafka brokers
KAFKA_PORT
        Port of the Kafka broker

KAFKA_TOPIC_BASE
        An arbitrary string. A MySQL database and table nane are
        appended to this string to determine to which topic that
        table's rows should be sent. For example, a prefix of
        "server1" could be chosen and a table db.users would be
        written to the topic "server1.db.users"

About

Replication from MySQL to Kafka

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published