Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,20 @@ To use this feature you have to set ``scylladb.ttl`` config with time(in seconds
--------------------------------
Offset tracking Support in Kafka
--------------------------------
This connector support two types of offset tracking support.
This connector supports two types of offset tracking, but always stores them at least on Kafka.
They will appear in internal ``__consumer_offsets`` topic and can be tracked by checking connector's consumer group
using `kafka-consumer-groups` tool.

**Offset stored in ScyllaDB Table**

This is the default behaviour of the connector. Here, the offset is stored in the ScyllaDB table.
This is the default behaviour of the connector. The offsets will be additionally stored in table defined by `scylladb.offset.storage.table` property.
Useful when all offsets need to be accessible in Scylla.

**Offset stored in Kafka**

If you want that offset should be managed in kafka then you must specify ``scylladb.offset.storage.table.enable=false``. By default, this property is true (in this case offset will be stored in the ScyllaDB table).
For offsets to be managed only on Kafka, you must specify `scylladb.offset.storage.table.enable=false`.
This will result in less total writes. Recommended option.


-------------------
Delivery guarantees
Expand Down
55 changes: 28 additions & 27 deletions documentation/CONFIG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#ScyllaDB Sink Connector
# ScyllaDB Sink Connector

Configuration Properties
------------------------
Expand All @@ -9,7 +9,7 @@ To use this connector, specify the name of the connector class in the ``connecto

Connector-specific configuration properties are described below.

###Connection
### Connection

``scylladb.contact.points``

Expand Down Expand Up @@ -87,7 +87,7 @@ Connector-specific configuration properties are described below.
* Default: false
* Importance: high

###SSL
### SSL

``scylladb.ssl.truststore.path``

Expand All @@ -114,7 +114,7 @@ Connector-specific configuration properties are described below.
* Valid Values: [JDK, OPENSSL, OPENSSL_REFCNT]
* Importance: low

###Keyspace
### Keyspace

**Note**: Both keyspace and table names consist of only alphanumeric characters,
cannot be empty and are limited in size to 48 characters (that limit exists
Expand Down Expand Up @@ -149,7 +149,7 @@ can be forced by using double-quotes ("myTable" is different from mytable).
* Valid Values: [1,...]
* Importance: high

###Table
### Table

``scylladb.table.manage.enabled``

Expand All @@ -176,15 +176,17 @@ can be forced by using double-quotes ("myTable" is different from mytable).
* Importance: Low
* Default: kafka_connect_offsets

###Topic to Table
### Topic to Table

These configurations can be specified for multiple Kafka topics from which records are being processed.
Also, these topic level configurations will be override the behavior of Connector level configurations such as
Also, these topic level configurations will override the behavior of Connector level configurations such as
``scylladb.consistency.level``, ``scylladb.deletes.enabled`` and ``scylladb.ttl``

``topic.my_topic.my_ks.my_table.mapping``
``topic.<my_topic>.<my_ks>.<my_table>.mapping``

For mapping topic and fields from Kafka record's key, value and headers to ScyllaDB table and its columns.
`my_topic` should refer to the topic name as seen in a SinkRecord passed to the Connector - which means after all Single Message Transforms.
For example if you're using RegexRouter to change the topic name from `top1` to `top2` you would use `topic.top2.ks.table.mapping=...` property.

**Note**: Ensure that the data type of the Kafka record's fields are compatible with the data type of the ScyllaDB column.
In the Kafka topic mapping, you can optionally specify which column should be used as the ttl (time-to-live) and
Expand All @@ -209,7 +211,7 @@ Also, these topic level configurations will be override the behavior of Connecto
should processed as delete request.


###Write
### Write

``scylladb.consistency.level``

Expand Down Expand Up @@ -249,35 +251,38 @@ Also, these topic level configurations will be override the behavior of Connecto

``scylladb.offset.storage.table.enable``

If true, Kafka consumer offsets will be stored in ScyllaDB table. If false, connector will skip writing offset
If true, Kafka consumer offsets will be additionally stored in ScyllaDB table. If false, connector will skip writing offset
information into ScyllaDB.

* Type: Boolean
* Importance: Medium
* Default Value: True

###ScyllaDB
### ScyllaDB

``behavior.on.error``

Error handling behavior setting. Must be configured to one of the following:
Error handling behavior setting. Must be configured to one of the following:

``fail``
The Connector throws ConnectException and stops processing records when an error occurs while processing or inserting records into ScyllaDB.
``fail``

``ignore``
Continues to process next set of records when error occurs while processing or inserting records into ScyllaDB.
The Connector throws ConnectException and stops processing records when an error occurs while processing or inserting records into ScyllaDB.

``ignore``

Continues to process next set of records when error occurs while processing or inserting records into ScyllaDB.

``log``
Logs the error via connect-reporter when an error occurs while processing or inserting records into ScyllaDB and continues to process next set of records, available in the kafka topics.
``log``

* Type: string
* Default: FAIL
* Valid Values: [FAIL, LOG, IGNORE]
* Importance: medium
Logs the error via connect-reporter when an error occurs while processing or inserting records into ScyllaDB and continues to process next set of records, available in the kafka topics.

* Type: string
* Default: FAIL
* Valid Values: [FAIL, LOG, IGNORE]
* Importance: medium

###Confluent Platform Configurations.

### Confluent Platform Configurations.

``tasks.max``

Expand All @@ -301,7 +306,3 @@ The name of the topics to consume data from and write to ScyllaDB.
* Type: list
* Default: localhost:9092
* Importance: high

------------------------


78 changes: 78 additions & 0 deletions documentation/EXAMPLES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Example setups
Below are examples of non-trivial connector configurations.
For simpler step-by-step instructions with environment setup check out [quickstart](QUICKSTART.md) first.


## One topic to many tables

This can be achieved by running multiple instances of the connector.

#### Environment
Running Kafka cluster and dockerized Scylla (contact point `172.17.0.2`)
Test data generated using [Datagen Source Connector](https://www.confluent.io/hub/confluentinc/kafka-connect-datagen)
with following configuration:
```
name = DatagenConnectorExample_1
connector.class = io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic = usersTopic
quickstart = users
```

#### Connectors configuration
We will use 2 connectors for this example.

Connector1.properties:
```
name = ScyllaDbSinkConnectorExample_1
connector.class = io.connect.scylladb.ScyllaDbSinkConnector
transforms = createKey
topics = usersTopic
transforms.createKey.type = org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields = userid
scylladb.contact.points = 172.17.0.2
scylladb.consistency.level = ONE
scylladb.keyspace = example_ks
scylladb.keyspace.replication.factor = 1
scylladb.offset.storage.table = kafka_connect_offsets
```
Connector2.properties:
```
name = ScyllaDbSinkConnectorExample_2
connector.class = io.connect.scylladb.ScyllaDbSinkConnector
transforms = createKey, ChangeTopic
topics = usersTopic
transforms.createKey.type = org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields = userid
transforms.ChangeTopic.type = org.apache.kafka.connect.transforms.RegexRouter
transforms.ChangeTopic.regex = usersTopic
transforms.ChangeTopic.replacement = ChangedTopic
scylladb.contact.points = 172.17.0.2
scylladb.consistency.level = ONE
scylladb.keyspace = example_ks
scylladb.keyspace.replication.factor = 1
scylladb.offset.storage.table = kafka_connect_offsets_2
topic.ChangedTopic.example_ks.ChangedTopic.mapping = mappedUserIdCol=key.userid,mappedGenderCol=value.gender
```
This setup results in creation of 4 tables in `example_ks` keyspace. Two different for keeping offsets and two different for data.

Connector1 creates `userstopic` which should look like table below. Keeps its offsets in `kafka_connect_offsets`.
<pre> <font color="#C01C28"><b>userid</b></font> | <font color="#A347BA"><b>gender</b></font> | <font color="#A347BA"><b>regionid</b></font> | <font color="#A347BA"><b>registertime</b></font>
--------+--------+----------+---------------
<font color="#A2734C"><b>User_3</b></font> | <font color="#A2734C"><b>MALE</b></font> | <font color="#A2734C"><b>Region_3</b></font> | <font color="#26A269"><b>1497171901434</b></font>
<font color="#A2734C"><b>User_1</b></font> | <font color="#A2734C"><b>OTHER</b></font> | <font color="#A2734C"><b>Region_2</b></font> | <font color="#26A269"><b>1515602353163</b></font>
<font color="#A2734C"><b>User_6</b></font> | <font color="#A2734C"><b>OTHER</b></font> | <font color="#A2734C"><b>Region_3</b></font> | <font color="#26A269"><b>1512008940490</b></font>
<font color="#A2734C"><b>User_7</b></font> | <font color="#A2734C"><b>MALE</b></font> | <font color="#A2734C"><b>Region_7</b></font> | <font color="#26A269"><b>1507294138815</b></font>
<font color="#A2734C"><b>User_2</b></font> | <font color="#A2734C"><b>FEMALE</b></font> | <font color="#A2734C"><b>Region_2</b></font> | <font color="#26A269"><b>1493737097490</b></font>
</pre>
Connector2 uses RegexRouter SMT to change topic name to `changedtopic`. This results in creation of `changedtopic` table. Additionally it makes use of custom mapping property (`topic.ChangedTopic.example_ks...`) to define a different structure for this table.
This results in following:
<pre>cqlsh&gt; SELECT * FROM example_ks.changedtopic
... ;

<font color="#C01C28"><b>mappedUserIdCol</b></font> | <font color="#A347BA"><b>mappedGenderCol</b></font>
-----------------+-----------------
<font color="#A2734C"><b>User_3</b></font> | <font color="#A2734C"><b>OTHER</b></font>
<font color="#A2734C"><b>User_1</b></font> | <font color="#A2734C"><b>OTHER</b></font>
<font color="#A2734C"><b>User_6</b></font> | <font color="#A2734C"><b>OTHER</b></font>
<font color="#A2734C"><b>User_7</b></font> | <font color="#A2734C"><b>MALE</b></font>
</pre>
54 changes: 27 additions & 27 deletions documentation/QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ Command to start ScyllaDB docker container:
$ docker run --name some-scylla --hostname some-scylla -d scylladb/scylla
```
Running `docker ps` will show you the exposed ports, which should look something like the following:
```

```
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
26cc6d47efe3 replace-with-image-name "/docker-entrypoint.…" 4 hours ago Up 23 seconds 0.0.0.0:32777->1883/tcp, 0.0.0.0:32776->9001/tcp anonymous_my_1
Expand All @@ -26,13 +26,13 @@ CONTAINER ID IMAGE COMMAND CREATED
If you are new to Confluent then follow this [link](https://www.confluent.io/download) to download the Confluent Platform .


1 - Click on DOWNLOAD FREE under Self managed software.
1. Click on DOWNLOAD FREE under Self managed software.

2 - Click on Zip archive then fill the Email address then Accept the T&C and lastly click on Download Version 5.X.X.
2. Click on Zip archive then fill the Email address then Accept the T&C and lastly click on Download Version 5.X.X.

3 - Extract the downloaded file and paste it to the desired location.
3. Extract the downloaded file and paste it to the desired location.

4 - Now follow this [link](https://docs.confluent.io/current/quickstart/ce-quickstart.html#ce-quickstart) to complete the installation.
4. Now follow this [link](https://docs.confluent.io/current/quickstart/ce-quickstart.html#ce-quickstart) to complete the installation.


### Manual Installation Of The Connector
Expand All @@ -51,10 +51,10 @@ Note: To run Integration Tests there is no need to run Confluent. Use docker-com
``docker-compose -f docker-compose.yml up``

After completion of the above steps, a folder by the name of ‘components’ will be created in the target folder of the source code folder.
The Connector jar files are present in ``{source-code-folder}/target/components/packages/[jar-files]``
The Connector's full package is present in ``{source-code-folder}/target/components/packages/ScyllaDB-kafka-connect-scylladb-<version>``

Create a folder by the name of ScyllaDB-Sink-Connector and copy the jar files in it.
Navigate to your Confluent Platform installation directory and place this folder in {confluent-directory}/share/java.
Navigate to your Confluent Platform installation directory and place this folder in `{confluent-directory}/share/java`.
In case of different Kafka Connect installations you can modify `plugin.path` property so that it includes Connector's package folder.

## Sink Connector

Expand Down Expand Up @@ -87,11 +87,11 @@ Your output should resemble:
"io.connect.scylladb.ScyllaDbSinkConnector"
```

#####Connector Configuration
#### Connector Configuration

Save these configs in a file *kafka-connect-scylladb.json* and run the following command:

```
```json
{
"name" : "scylladb-sink-connector",
"config" : {
Expand All @@ -100,11 +100,11 @@ Save these configs in a file *kafka-connect-scylladb.json* and run the following
"topics" : "topic1,topic2,topic3",
"scylladb.contact.points" : "scylladb-hosts",
"scylladb.keyspace" : "test"
}
}
}
```

Use this command to load the connector :
Use this command to load the connector:

```
curl -s -X POST -H 'Content-Type: application/json' --data @kafka-connect-scylladb.json http://localhost:8083/connectors
Expand All @@ -121,21 +121,21 @@ Once the Connector is up and running, use the command ``kafka-avro-console-produ
Example:

```
kafka-avro-console-producer
--broker-list localhost:9092
--topic topic1
--property parse.key=true
--property key.schema='{"type":"record",name":"key_schema","fields":[{"name":"id","type":"int"}]}'
--property "key.separator=$"
--property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},
{"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'
kafka-avro-console-producer \
--broker-list localhost:9092 \
--topic topic1 \
--property parse.key=true \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property "key.separator=$" \
--property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},{"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'
{"id":1}${"id":1,"firstName":"first","lastName":"last"}
```

Output upon running the select query in ScyllaDB:
select * from test.topic1;

```
select * from test.topic1;

id | firstname | lastname

----+-----------+----------
Expand All @@ -144,9 +144,9 @@ select * from test.topic1;
```


##Modes in ScyllaDB
## Modes in ScyllaDB

###Standard
### Standard

Use this command to load the connector in :

Expand All @@ -165,7 +165,7 @@ example.

**Distributed Mode JSON**

```
```json
{
"name" : "scylladb-sink-connector",
"config" : {
Expand All @@ -175,7 +175,7 @@ example.
"scylladb.contact.points" : "scylladb-hosts",
"scylladb.keyspace" : "test",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter"
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
"value.converter.schemas.enable" : "true",

Expand Down Expand Up @@ -250,7 +250,7 @@ example.

**Distributed Mode**

```
```json
{
"name" : "scylladbSinkConnector",
"config" : {
Expand Down Expand Up @@ -281,7 +281,7 @@ scylladb.username=example
scylladb.password=password
```

###Logging
### Logging

To check logs for the Confluent Platform use:

Expand Down