|
| 1 | +--- |
| 2 | +title: Migrating from MySQL with Kafka Connect |
| 3 | +sidebar_label: 'MySQL → Databend: Kafka Connect' |
| 4 | +--- |
| 5 | + |
| 6 | +> **Capabilities**: CDC, Incremental, Full Load |
| 7 | +
|
| 8 | +This tutorial shows how to build a real-time data pipeline from MySQL to Databend using Kafka Connect. |
| 9 | + |
| 10 | +## Overview |
| 11 | + |
| 12 | +Kafka Connect is a tool for streaming data between Apache Kafka and other systems reliably and at scale. It simplifies building real-time data pipelines by standardizing data movement in and out of Kafka. For MySQL to Databend migration, Kafka Connect provides a seamless solution that enables: |
| 13 | + |
| 14 | +- Real-time data synchronization from MySQL to Databend |
| 15 | +- Automatic schema evolution and table creation |
| 16 | +- Support for both new data capture and updates to existing data |
| 17 | + |
| 18 | +The migration pipeline consists of two main components: |
| 19 | + |
| 20 | +- **MySQL JDBC Source Connector**: Reads data from MySQL and publishes it to Kafka topics |
| 21 | +- **Databend Sink Connector**: Consumes data from Kafka topics and writes it to Databend |
| 22 | + |
| 23 | +## Prerequisites |
| 24 | + |
| 25 | +- MySQL database with data you want to migrate |
| 26 | +- Apache Kafka installed ([Kafka quickstart guide](https://kafka.apache.org/quickstart)) |
| 27 | +- Databend instance running |
| 28 | +- Basic knowledge of SQL and command line |
| 29 | + |
| 30 | +## Step 1: Set Up Kafka Connect |
| 31 | + |
| 32 | +Kafka Connect supports two execution modes: Standalone and Distributed. For this tutorial, we'll use Standalone mode which is simpler for testing. |
| 33 | + |
| 34 | +### Configure Kafka Connect |
| 35 | + |
| 36 | +Create a basic worker configuration file `connect-standalone.properties` in your Kafka `config` directory: |
| 37 | + |
| 38 | +```properties |
| 39 | +bootstrap.servers=localhost:9092 |
| 40 | +key.converter=org.apache.kafka.connect.json.JsonConverter |
| 41 | +value.converter=org.apache.kafka.connect.json.JsonConverter |
| 42 | +key.converter.schemas.enable=true |
| 43 | +value.converter.schemas.enable=true |
| 44 | +offset.storage.file.filename=/tmp/connect.offsets |
| 45 | +offset.flush.interval.ms=10000 |
| 46 | +``` |
| 47 | + |
| 48 | +## Step 2: Configure MySQL Source Connector |
| 49 | + |
| 50 | +### Install Required Components |
| 51 | + |
| 52 | +1. Download the [Kafka Connect JDBC](https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc) plugin from Confluent Hub and extract it to your Kafka `libs` directory |
| 53 | + |
| 54 | +2. Download the [MySQL JDBC Driver](https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/) and copy the JAR file to the same `libs` directory |
| 55 | + |
| 56 | +### Create MySQL Source Configuration |
| 57 | + |
| 58 | +Create a file `mysql-source.properties` in your Kafka `config` directory with the following content: |
| 59 | + |
| 60 | +```properties |
| 61 | +name=mysql-source |
| 62 | +connector.class=io.confluent.connect.jdbc.JdbcSourceConnector |
| 63 | +tasks.max=1 |
| 64 | + |
| 65 | +# Connection settings |
| 66 | +connection.url=jdbc:mysql://localhost:3306/your_database?useSSL=false |
| 67 | +connection.user=your_username |
| 68 | +connection.password=your_password |
| 69 | + |
| 70 | +# Table selection and topic mapping |
| 71 | +table.whitelist=your_database.your_table |
| 72 | +topics=mysql_data |
| 73 | + |
| 74 | +# Sync mode configuration |
| 75 | +mode=incrementing |
| 76 | +incrementing.column.name=id |
| 77 | + |
| 78 | +# Polling frequency |
| 79 | +poll.interval.ms=5000 |
| 80 | +``` |
| 81 | + |
| 82 | +Replace the following values with your actual MySQL configuration: |
| 83 | +- `your_database`: Your MySQL database name |
| 84 | +- `your_username`: MySQL username |
| 85 | +- `your_password`: MySQL password |
| 86 | +- `your_table`: The table you want to migrate |
| 87 | + |
| 88 | +### Sync Modes |
| 89 | + |
| 90 | +The MySQL Source Connector supports three synchronization modes: |
| 91 | + |
| 92 | +1. **Incrementing Mode**: Best for tables with an auto-incrementing ID column |
| 93 | + ```properties |
| 94 | + mode=incrementing |
| 95 | + incrementing.column.name=id |
| 96 | + ``` |
| 97 | + |
| 98 | +2. **Timestamp Mode**: Best for capturing both inserts and updates |
| 99 | + ```properties |
| 100 | + mode=timestamp |
| 101 | + timestamp.column.name=updated_at |
| 102 | + ``` |
| 103 | + |
| 104 | +3. **Timestamp+Incrementing Mode**: Most reliable for all changes |
| 105 | + ```properties |
| 106 | + mode=timestamp+incrementing |
| 107 | + incrementing.column.name=id |
| 108 | + timestamp.column.name=updated_at |
| 109 | + ``` |
| 110 | + |
| 111 | +## Step 3: Configure Databend Sink Connector |
| 112 | + |
| 113 | +### Install Required Components |
| 114 | + |
| 115 | +1. Download the [Databend Kafka Connector](https://github.com/databendcloud/databend-kafka-connect/releases) and place it in your Kafka `libs` directory |
| 116 | + |
| 117 | +2. Download the [Databend JDBC Driver](https://central.sonatype.com/artifact/com.databend/databend-jdbc/) and copy it to your Kafka `libs` directory |
| 118 | + |
| 119 | +### Create Databend Sink Configuration |
| 120 | + |
| 121 | +Create a file `databend-sink.properties` in your Kafka `config` directory: |
| 122 | + |
| 123 | +```properties |
| 124 | +name=databend-sink |
| 125 | +connector.class=com.databend.kafka.connect.DatabendSinkConnector |
| 126 | + |
| 127 | +# Connection settings |
| 128 | +connection.url=jdbc:databend://localhost:8000 |
| 129 | +connection.user=databend |
| 130 | +connection.password=databend |
| 131 | +connection.database=default |
| 132 | + |
| 133 | +# Topic to table mapping |
| 134 | +topics=mysql_data |
| 135 | +table.name.format=${topic} |
| 136 | + |
| 137 | +# Table management |
| 138 | +auto.create=true |
| 139 | +auto.evolve=true |
| 140 | + |
| 141 | +# Write behavior |
| 142 | +insert.mode=upsert |
| 143 | +pk.mode=record_value |
| 144 | +pk.fields=id |
| 145 | +batch.size=1000 |
| 146 | +``` |
| 147 | + |
| 148 | +Adjust the Databend connection settings as needed for your environment. |
| 149 | + |
| 150 | +## Step 4: Start the Migration Pipeline |
| 151 | + |
| 152 | +Start Kafka Connect with both connector configurations: |
| 153 | + |
| 154 | +```shell |
| 155 | +bin/connect-standalone.sh config/connect-standalone.properties \ |
| 156 | + config/mysql-source.properties \ |
| 157 | + config/databend-sink.properties |
| 158 | +``` |
| 159 | + |
| 160 | +## Step 5: Verify the Migration |
| 161 | + |
| 162 | +### Check Data Synchronization |
| 163 | + |
| 164 | +1. **Monitor Kafka Connect Logs** |
| 165 | + |
| 166 | + ```shell |
| 167 | + tail -f /path/to/kafka/logs/connect.log |
| 168 | + ``` |
| 169 | + |
| 170 | +2. **Verify Data in Databend** |
| 171 | + |
| 172 | + Connect to your Databend instance and run: |
| 173 | + |
| 174 | + ```sql |
| 175 | + SELECT * FROM mysql_data LIMIT 10; |
| 176 | + ``` |
| 177 | + |
| 178 | +### Test Schema Evolution |
| 179 | + |
| 180 | +If you add a new column to your MySQL table, the schema change will automatically propagate to Databend: |
| 181 | + |
| 182 | +1. **Add a column in MySQL** |
| 183 | + |
| 184 | + ```sql |
| 185 | + ALTER TABLE your_table ADD COLUMN new_field VARCHAR(100); |
| 186 | + ``` |
| 187 | + |
| 188 | +2. **Verify schema update in Databend** |
| 189 | + |
| 190 | + ```sql |
| 191 | + DESC mysql_data; |
| 192 | + ``` |
| 193 | + |
| 194 | +### Test Update Operations |
| 195 | + |
| 196 | +To test updates, ensure you're using timestamp or timestamp+incrementing mode: |
| 197 | + |
| 198 | +1. **Update your MySQL connector configuration** |
| 199 | + |
| 200 | + Edit `mysql-source.properties` to use timestamp+incrementing mode if your table has a timestamp column. |
| 201 | + |
| 202 | +2. **Update data in MySQL** |
| 203 | + |
| 204 | + ```sql |
| 205 | + UPDATE your_table SET some_column='new value' WHERE id=1; |
| 206 | + ``` |
| 207 | + |
| 208 | +3. **Verify the update in Databend** |
| 209 | + |
| 210 | + ```sql |
| 211 | + SELECT * FROM mysql_data WHERE id=1; |
| 212 | + ``` |
| 213 | + |
| 214 | +## Key Features of Databend Kafka Connect |
| 215 | + |
| 216 | +1. **Automatic Table and Column Creation**: With `auto.create` and `auto.evolve` settings, tables and columns are created automatically based on Kafka topic data |
| 217 | + |
| 218 | +2. **Schema Support**: Supports Avro, JSON Schema, and Protobuf input data formats (requires Schema Registry) |
| 219 | + |
| 220 | +3. **Multiple Write Modes**: Supports both `insert` and `upsert` write modes |
| 221 | + |
| 222 | +4. **Multi-task Support**: Can run multiple tasks to improve performance |
| 223 | + |
| 224 | +5. **High Availability**: In distributed mode, workload is automatically balanced with dynamic scaling and fault tolerance |
| 225 | + |
| 226 | +## Troubleshooting |
| 227 | + |
| 228 | +- **Connector Not Starting**: Check Kafka Connect logs for errors |
| 229 | +- **No Data in Databend**: Verify topic exists and contains data using Kafka console consumer |
| 230 | +- **Schema Issues**: Ensure `auto.create` and `auto.evolve` are set to `true` |
0 commit comments