From b1e9226c69d720cb1159f1ffd8d28eb3d0d93f0e Mon Sep 17 00:00:00 2001 From: "doraalin@163.com" Date: Thu, 6 Apr 2017 17:46:52 +0800 Subject: [PATCH] first commit --- .gitignore | 6 + README.md | 0 pom.xml | 126 ++++++++++++++++++ .../com/youzan/flume/sink/nsq/NSQSink.java | 86 ++++++++++++ src/test/resources/test.conf | 23 ++++ 5 files changed, 241 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/com/youzan/flume/sink/nsq/NSQSink.java create mode 100644 src/test/resources/test.conf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..27df63d --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea/ +data/ +*.iml +nsq-sink/ +.DS_Store +target/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..a679eb3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,126 @@ + + 4.0.0 + com.youzan + nsq-flume-sink + jar + 1.0-SNAPSHOT + nsq-flume-sink + http://maven.apache.org + + UTF-8 + UTF-8 + 1.7.21 + 1.1.7 + 2.3.20170406-RELEASE + ./nsq-sink + ${pluginDir}/libext + ${pluginDir}/lib + + + parent-pom + com.youzan + 1.0.2 + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + org.slf4j + jul-to-slf4j + ${slf4j.version} + + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + ch.qos.logback + logback-core + ${logback.version} + test + + + ch.qos.logback + logback-classic + ${logback.version} + test + + + org.testng + testng + 6.9.10 + test + + + com.youzan + NSQ-Client + ${nsqsdk.version} + + + org.apache.flume + flume-ng-sdk + 1.7.0 + provided + + + org.apache.flume + flume-ng-core + 1.7.0 + provided + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + runtime + compile + log4j,slf4j-log4j + ${dependenciesOutputDir} + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.0.2 + + ${libOutputDir} + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${dependenciesOutputDir} + + + + + + + diff --git a/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java new file mode 100644 index 0000000..8cf301a --- /dev/null +++ b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java @@ -0,0 +1,86 @@ +package com.youzan.flume.sink.nsq; + +import com.youzan.nsq.client.Producer; +import com.youzan.nsq.client.ProducerImplV2; +import com.youzan.nsq.client.entity.NSQConfig; +import com.youzan.nsq.client.entity.Topic; +import com.youzan.nsq.client.exception.NSQException; +import org.apache.flume.*; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.AbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by lin on 17/4/6. + */ +public class NSQSink extends AbstractSink implements Configurable { + + private final static Logger logger = LoggerFactory.getLogger(NSQSink.class); + + private String lookupAddresses = null; + private Topic defaultTopic = null; + private NSQConfig config = null; + private Producer producer = null; + + @Override + public Status process() throws EventDeliveryException { + Status status; + Topic topic; + Channel ch = getChannel(); + Transaction txn = ch.getTransaction(); + txn.begin(); + try{ + Event event = ch.take(); + String topicText = event.getHeaders().get("topic"); + if(null != topicText) + topic = new Topic(topicText); + else + topic = defaultTopic; + //send + producer.publish(event.getBody(), topic); + txn.commit(); + status = Status.READY; + } catch (Throwable t) { + logger.error("Fail to publish to NSQ.", t); + txn.rollback(); + status = Status.BACKOFF; + if (t instanceof Error) { + throw (Error)t; + } + }finally { + txn.close(); + } + return status; + } + + @Override + public void start() { + producer = new ProducerImplV2(config); + try { + producer.start(); + } catch (NSQException e) { + logger.error("Fail to start nsq producer.", e); + } + } + + @Override + public void stop () { + producer.close(); + } + + @Override + public void configure(Context context) { + lookupAddresses = context.getString("lookupdAddresses"); + if(null == lookupAddresses || lookupAddresses.isEmpty()) + throw new IllegalArgumentException("Illegal lookupd addresses not accepted. " + lookupAddresses); + String topicName = context.getString("topic"); + if (null == topicName || topicName.isEmpty()) { + throw new IllegalArgumentException("Illegal default topic name is not accepted. " + topicName); + } + defaultTopic = new Topic(topicName); + config = new NSQConfig() + .setLookupAddresses(lookupAddresses); + logger.info("NSQConfig initialized with lookupAddresses:{}, topicName:{}", lookupAddresses, topicName); + } +} diff --git a/src/test/resources/test.conf b/src/test/resources/test.conf new file mode 100644 index 0000000..d34c558 --- /dev/null +++ b/src/test/resources/test.conf @@ -0,0 +1,23 @@ +# example.conf: A single-node Flume configuration + +# Name the components on this agent +a1.sources = sequence +a1.sinks = nsq +a1.channels = c1 + +# Describe/configure the source +a1.sources.sequence.type = seq + +# Describe the sink +a1.sinks.nsq.type = com.youzan.flume.sink.nsq.NSQSink +a1.sinks.nsq.topic = flume_sink +a1.sinks.nsq.lookupdAddresses = sqs-qa.s.qima-inc.com:4161 + +# Use a channel which buffers events in memory +a1.channels.c1.type = memory +a1.channels.c1.capacity = 1000 +a1.channels.c1.transactionCapacity = 100 + +# Bind the source and sink to the channel +a1.sources.sequence.channels = c1 +a1.sinks.nsq.channel = c1 \ No newline at end of file