Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Apr 6, 2017
0 parents commit b1e9226
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea/
data/
*.iml
nsq-sink/
.DS_Store
target/
Empty file added README.md
Empty file.
126 changes: 126 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.youzan</groupId>
<artifactId>nsq-flume-sink</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>nsq-flume-sink</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<slf4j.version>1.7.21</slf4j.version>
<logback.version>1.1.7</logback.version>
<nsqsdk.version>2.3.20170406-RELEASE</nsqsdk.version>
<pluginDir>./nsq-sink</pluginDir>
<dependenciesOutputDir>${pluginDir}/libext</dependenciesOutputDir>
<libOutputDir>${pluginDir}/lib</libOutputDir>
</properties>
<parent>
<artifactId>parent-pom</artifactId>
<groupId>com.youzan</groupId>
<version>1.0.2</version>
</parent>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- java.util.logging , NOTE ON PERFORMANCE -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- commons-logging.jar -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.9.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.youzan</groupId>
<artifactId>NSQ-Client</artifactId>
<version>${nsqsdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.7.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<includeScope>runtime</includeScope>
<includeScope>compile</includeScope>
<excludeArtifactIds>log4j,slf4j-log4j</excludeArtifactIds>
<outputDirectory>${dependenciesOutputDir}</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<outputDirectory>${libOutputDir}</outputDirectory>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${dependenciesOutputDir}</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
86 changes: 86 additions & 0 deletions src/main/java/com/youzan/flume/sink/nsq/NSQSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.youzan.flume.sink.nsq;

import com.youzan.nsq.client.Producer;
import com.youzan.nsq.client.ProducerImplV2;
import com.youzan.nsq.client.entity.NSQConfig;
import com.youzan.nsq.client.entity.Topic;
import com.youzan.nsq.client.exception.NSQException;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by lin on 17/4/6.
*/
public class NSQSink extends AbstractSink implements Configurable {

private final static Logger logger = LoggerFactory.getLogger(NSQSink.class);

private String lookupAddresses = null;
private Topic defaultTopic = null;
private NSQConfig config = null;
private Producer producer = null;

@Override
public Status process() throws EventDeliveryException {
Status status;
Topic topic;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try{
Event event = ch.take();
String topicText = event.getHeaders().get("topic");
if(null != topicText)
topic = new Topic(topicText);
else
topic = defaultTopic;
//send
producer.publish(event.getBody(), topic);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
logger.error("Fail to publish to NSQ.", t);
txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error)t;
}
}finally {
txn.close();
}
return status;
}

@Override
public void start() {
producer = new ProducerImplV2(config);
try {
producer.start();
} catch (NSQException e) {
logger.error("Fail to start nsq producer.", e);
}
}

@Override
public void stop () {
producer.close();
}

@Override
public void configure(Context context) {
lookupAddresses = context.getString("lookupdAddresses");
if(null == lookupAddresses || lookupAddresses.isEmpty())
throw new IllegalArgumentException("Illegal lookupd addresses not accepted. " + lookupAddresses);
String topicName = context.getString("topic");
if (null == topicName || topicName.isEmpty()) {
throw new IllegalArgumentException("Illegal default topic name is not accepted. " + topicName);
}
defaultTopic = new Topic(topicName);
config = new NSQConfig()
.setLookupAddresses(lookupAddresses);
logger.info("NSQConfig initialized with lookupAddresses:{}, topicName:{}", lookupAddresses, topicName);
}
}
23 changes: 23 additions & 0 deletions src/test/resources/test.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = sequence
a1.sinks = nsq
a1.channels = c1

# Describe/configure the source
a1.sources.sequence.type = seq

# Describe the sink
a1.sinks.nsq.type = com.youzan.flume.sink.nsq.NSQSink
a1.sinks.nsq.topic = flume_sink
a1.sinks.nsq.lookupdAddresses = sqs-qa.s.qima-inc.com:4161

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.sequence.channels = c1
a1.sinks.nsq.channel = c1

0 comments on commit b1e9226

Please sign in to comment.