Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ You can build the project by running "maven package" and it will build amazon-ki
| metricsLevel | Controls the number of metrics that are uploaded to CloudWatch. Expected pattern: none/summary/detailed | none |
| metricsGranuality | Controls the granularity of metrics that are uploaded to CloudWatch. Greater granularity produces more metrics. Expected pattern: global/stream/shard. | global |
| metricsNameSpace | The namespace to upload metrics under. | KinesisProducer |
| aggregration | With aggregation, multiple user records are packed into a single KinesisRecord. If disabled, each user record is sent in its own KinesisRecord.| true
| aggregration | With aggregation, multiple user records are packed into a single KinesisRecord. If disabled, each user record is sent in its own KinesisRecord.| true |
| appendLineBreak | If Enabled, a line break will be append to each record in Kafka message before inject into Kinesis. It is useful if records are saved to S3 using Kinesis Firehose, and use Apache HIVE for further processing | false |
2 changes: 2 additions & 0 deletions config/kinesis-streams-kafka-connector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregration=true
# Append Line break "\n" to every record
appendLineBreak=false
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,21 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
public static final String AGGREGRATION_ENABLED = "aggregration";

public static final String USE_PARTITION_AS_HASH_KEY = "usePartitionAsHashKey";

public static final String FLUSH_SYNC = "flushSync";

public static final String SINGLE_KINESIS_PRODUCER_PER_PARTITION = "singleKinesisProducerPerPartition";
public static final String PAUSE_CONSUMPTION = "pauseConsumption";

public static final String PAUSE_CONSUMPTION = "pauseConsumption";

public static final String OUTSTANDING_RECORDS_THRESHOLD = "outstandingRecordsThreshold";

public static final String SLEEP_PERIOD = "sleepPeriod";

public static final String SLEEP_CYCLES = "sleepCycles";

public static final String APPEND_LINE_BREAK = "appendLineBreak";

private String region;

private String streamName;
Expand All @@ -67,19 +69,21 @@ public class AmazonKinesisSinkConnector extends SinkConnector {
private String aggregration;

private String usePartitionAsHashKey;

private String flushSync;
private String singleKinesisProducerPerPartition;

private String singleKinesisProducerPerPartition;

private String pauseConsumption;

private String outstandingRecordsThreshold;

private String sleepPeriod;

private String sleepCycles;

private String appendLineBreak;

@Override
public void start(Map<String, String> props) {
region = props.get(REGION);
Expand All @@ -99,6 +103,7 @@ public void start(Map<String, String> props) {
outstandingRecordsThreshold = props.get(OUTSTANDING_RECORDS_THRESHOLD);
sleepPeriod = props.get(SLEEP_PERIOD);
sleepCycles = props.get(SLEEP_CYCLES);
appendLineBreak = props.get(APPEND_LINE_BREAK);
}

@Override
Expand Down Expand Up @@ -168,37 +173,42 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
config.put(USE_PARTITION_AS_HASH_KEY, usePartitionAsHashKey);
else
config.put(USE_PARTITION_AS_HASH_KEY, "false");

if(flushSync != null)
config.put(FLUSH_SYNC, flushSync);
else
config.put(FLUSH_SYNC, "true");

if(singleKinesisProducerPerPartition != null)
config.put(SINGLE_KINESIS_PRODUCER_PER_PARTITION, singleKinesisProducerPerPartition);
else
config.put(SINGLE_KINESIS_PRODUCER_PER_PARTITION, "false");

if(pauseConsumption != null)
config.put(PAUSE_CONSUMPTION, pauseConsumption);
else
config.put(PAUSE_CONSUMPTION, "true");

if(outstandingRecordsThreshold != null)
config.put(OUTSTANDING_RECORDS_THRESHOLD, outstandingRecordsThreshold);
else
config.put(OUTSTANDING_RECORDS_THRESHOLD, "500000");

if(sleepPeriod != null)
config.put(SLEEP_PERIOD, sleepPeriod);
else
config.put(SLEEP_PERIOD, "1000");

if(sleepCycles != null)
config.put(SLEEP_CYCLES, sleepCycles);
else
config.put(SLEEP_CYCLES, "10");


if (appendLineBreak != null)
config.put(APPEND_LINE_BREAK, appendLineBreak);
else
config.put(APPEND_LINE_BREAK, "false");

configs.add(config);

}
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class AmazonKinesisSinkTask extends SinkTask {

private int sleepCycles;

private boolean appendLineBreak;

private SinkTaskContext sinkTaskContext;

private Map<String, KinesisProducer> producerMap = new HashMap<String, KinesisProducer>();
Expand Down Expand Up @@ -211,17 +213,20 @@ private boolean validateOutStandingRecords() {
}

private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
boolean usePartitionAsHashKey, SinkRecord sinkRecord) {
boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

// the appendLineBreak ONLY works with String record
Object value = appendLineBreak ? sinkRecord.value().toString() + "\n" : sinkRecord.value();

// If configured use kafka partition key as explicit hash key
// This will be useful when sending data from same partition into
// same shard
if (usePartitionAsHashKey)
return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
DataUtility.parseValue(sinkRecord.valueSchema(), value));
else
return kp.addUserRecord(streamName, partitionKey,
DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
DataUtility.parseValue(sinkRecord.valueSchema(), value));

}

Expand Down Expand Up @@ -264,6 +269,8 @@ public void start(Map<String, String> props) {

sleepCycles = Integer.parseInt(props.get(AmazonKinesisSinkConnector.SLEEP_CYCLES));

appendLineBreak = Boolean.parseBoolean(props.get(AmazonKinesisSinkConnector.APPEND_LINE_BREAK));

if (!singleKinesisProducerPerPartition)
kinesisProducer = getKinesisProducer();

Expand Down Expand Up @@ -303,6 +310,10 @@ public void stop() {
private KinesisProducer getKinesisProducer() {
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion(regionName);
if (regionName.startsWith("cn-")) {
config.setCloudwatchEndpoint("monitoring." + regionName + ".amazonaws.com.cn");
config.setKinesisEndpoint("kinesis." + regionName + ".amazonaws.com.cn");
}
config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
config.setMaxConnections(maxConnections);

Expand Down
33 changes: 19 additions & 14 deletions src/main/java/com/amazon/kinesis/kafka/FirehoseSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.List;
import java.util.Map;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -34,9 +36,9 @@ public class FirehoseSinkTask extends SinkTask {
private AmazonKinesisFirehoseClient firehoseClient;

private boolean batch;

private int batchSize;

private int batchSizeInBytes;

@Override
Expand All @@ -62,17 +64,20 @@ public void put(Collection<SinkRecord> sinkRecords) {
public void start(Map<String, String> props) {

batch = Boolean.parseBoolean(props.get(FirehoseSinkConnector.BATCH));

batchSize = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE));

batchSizeInBytes = Integer.parseInt(props.get(FirehoseSinkConnector.BATCH_SIZE_IN_BYTES));

deliveryStreamName = props.get(FirehoseSinkConnector.DELIVERY_STREAM);

firehoseClient = new AmazonKinesisFirehoseClient(new DefaultAWSCredentialsProviderChain());

firehoseClient.setRegion(RegionUtils.getRegion(props.get(FirehoseSinkConnector.REGION)));

if (props.get(FirehoseSinkConnector.REGION).startsWith("cn-"))
firehoseClient.setEndpoint("firehose." + props.get(FirehoseSinkConnector.REGION) + ".amazonaws.com.cn");

// Validate delivery stream
validateDeliveryStream();
}
Expand Down Expand Up @@ -114,15 +119,15 @@ private PutRecordBatchResult putRecordBatch(List<Record> recordList) {

// Put Record Batch records. Max No.Of Records we can put in a
// single put record batch request is 500 and total size < 4MB
PutRecordBatchResult putRecordBatchResult = null;
PutRecordBatchResult putRecordBatchResult = null;
try {
putRecordBatchResult = firehoseClient.putRecordBatch(putRecordBatchRequest);
putRecordBatchResult = firehoseClient.putRecordBatch(putRecordBatchRequest);
}catch(AmazonKinesisFirehoseException akfe){
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
}catch(Exception e){
System.out.println("Connector Exception" + e.getLocalizedMessage());
System.out.println("Connector Exception" + e.getLocalizedMessage());
}
return putRecordBatchResult;
return putRecordBatchResult;
}

/**
Expand All @@ -138,7 +143,7 @@ private void putRecordsInBatch(Collection<SinkRecord> sinkRecords) {
recordList.add(record);
recordsInBatch++;
recordsSizeInBytes += record.getData().capacity();

if (recordsInBatch == batchSize || recordsSizeInBytes > batchSizeInBytes) {
putRecordBatch(recordList);
recordList.clear();
Expand All @@ -162,14 +167,14 @@ private void putRecords(Collection<SinkRecord> sinkRecords) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(deliveryStreamName);
putRecordRequest.setRecord(DataUtility.createRecord(sinkRecord));

PutRecordResult putRecordResult;
try {
firehoseClient.putRecord(putRecordRequest);
}catch(AmazonKinesisFirehoseException akfe){
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
System.out.println("Amazon Kinesis Firehose Exception:" + akfe.getLocalizedMessage());
}catch(Exception e){
System.out.println("Connector Exception" + e.getLocalizedMessage());
System.out.println("Connector Exception" + e.getLocalizedMessage());
}
}
}
Expand Down