diff --git a/pom.xml b/pom.xml
index 2c65437..95202fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.youzan
nsq-flume-sink
jar
- 1.0-SNAPSHOT
+ 0.1-SNAPSHOT
nsq-flume-sink
http://maven.apache.org
@@ -69,6 +69,11 @@
1.7.0
provided
+
+ com.youzan
+ filebackup
+ 0.1-SNAPSHOT
+
org.apache.flume
flume-ng-core
diff --git a/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java b/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java
new file mode 100644
index 0000000..2207eb8
--- /dev/null
+++ b/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java
@@ -0,0 +1,80 @@
+package com.youzan.flume.sink.nsq;
+
+import com.youzan.filebackup.context.BackupContext;
+import com.youzan.filebackup.context.BackupScope;
+import com.youzan.filebackup.context.BackupScopeBuilder;
+import com.youzan.filebackup.context.DefaultBackupContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Created by lin on 17/4/13.
+ */
+public class NSQLocalBackupQueue {
+ private final static Logger logger = LoggerFactory.getLogger(NSQLocalBackupQueue.class);
+ private final BackupScope backupScope;
+ private volatile boolean init = false;
+ private AtomicLong cnt = new AtomicLong(0);
+ private final Object SYNC = new Object();
+
+ public NSQLocalBackupQueue(String backupPath, String scopeId, String contextName) {
+ BackupContext context = new DefaultBackupContext(contextName);
+ backupScope = BackupScopeBuilder.create(backupPath, scopeId)
+ .setBackupContext(context)
+ .build();
+ backupScope.init();
+ try {
+ backupScope.openWrite();
+ } catch (IOException e) {
+ logger.error("Fail to open write, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName);
+ }
+
+ try {
+ backupScope.openRead();
+ } catch (IOException e) {
+ logger.error("Fail to open read, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName);
+ }
+ init = true;
+ logger.info("Local backup initialized.");
+ }
+
+ public boolean isInit() {
+ return this.init;
+ }
+
+ public int writeBackup(byte[] messageContent) throws IOException {
+ if(!isInit())
+ throw new IllegalStateException("BackupQueue is not initialized for write.");
+ int count = this.backupScope.tryWrite(messageContent);
+ if(count > 0) {
+ cnt.incrementAndGet();
+ synchronized (SYNC) {
+ logger.info("Try notifying one waiting read process.");
+ SYNC.notify();
+ }
+ }
+ return count;
+ }
+
+ public byte[] readBackup() throws IOException, InterruptedException {
+ if(!isInit())
+ throw new IllegalArgumentException("BackupQueue is not initialized for read.");
+ byte[] messaege = backupScope.tryRead();
+ if(null != messaege) {
+ return messaege;
+ } else {
+ logger.info("Read process wait for write.");
+ SYNC.wait();
+ return readBackup();
+ }
+ }
+
+ public void close() throws IOException {
+ backupScope.closeWrite();
+ //TODO: simply close read
+ backupScope.closeRead();
+ }
+}
diff --git a/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java
index 6626f93..8c7d268 100644
--- a/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java
+++ b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java
@@ -10,6 +10,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* Created by lin on 17/4/6.
*/
@@ -19,31 +23,69 @@ public class NSQSink extends AbstractSink implements Configurable {
private String lookupAddresses = null;
private String defaultTopic = null;
+ private String backupPath;
+ private String scopeId;
+ private String backupContext;
private NSQConfig config = null;
private Producer producer = null;
+ private NSQLocalBackupQueue backupQueue = null;
+ private ExecutorService exec = Executors.newSingleThreadExecutor();
+ private final Runnable READTASK = () -> {
+ while(true) {
+ byte[] message = null;
+ try {
+ message = backupQueue.readBackup();
+ if(null != message) {
+ logger.info("Read one backup message.");
+ producer.publish(message, defaultTopic);
+ logger.info("backup message published.");
+ }
+ } catch (IOException e) {
+ logger.error("Fail to read from backup file");
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for notify.");
+ } catch (NSQException e) {
+ logger.error("Fail to publish to NSQd, write back to backup.");
+ try {
+ backupQueue.writeBackup(message);
+ } catch (IOException e1) {
+ logger.error("Fail to write to backup queue after read out from backup. Message {}", new String(message));
+ }
+ }
+ }
+ };
@Override
public Status process() throws EventDeliveryException {
- Status status;
+ Status status = null;
String topic;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
+ Event event = ch.take();
+ byte[] message = event.getBody();
try{
- Event event = ch.take();
String topicText = event.getHeaders().get("topic");
if(null != topicText)
topic = topicText;
else
topic = defaultTopic;
//send
- producer.publish(event.getBody(), topic);
+ producer.publish(message, topic);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
logger.error("Fail to publish to NSQ.", t);
- txn.rollback();
- status = Status.BACKOFF;
+ try {
+ backupQueue.writeBackup(message);
+ logger.warn("Failed message write to backup file.");
+ txn.commit();
+ status = Status.READY;
+ } catch (IOException e) {
+ logger.error("Fail to write to backup queue. Message {}", new String(message));
+ txn.rollback();
+ status = Status.BACKOFF;
+ }
if (t instanceof Error) {
throw (Error)t;
}
@@ -61,11 +103,20 @@ public void start() {
} catch (NSQException e) {
logger.error("Fail to start nsq producer.", e);
}
+ backupQueue = new NSQLocalBackupQueue(backupPath, scopeId, backupContext);
+ final NSQLocalBackupQueue queue = this.backupQueue;
+ exec.submit(READTASK);
}
@Override
public void stop () {
producer.close();
+ exec.shutdown();
+ try {
+ backupQueue.close();
+ } catch (IOException e) {
+ logger.error("Fail to close backup queue");
+ }
}
@Override
@@ -81,5 +132,9 @@ public void configure(Context context) {
config = new NSQConfig();
config.setLookupAddresses(lookupAddresses);
logger.info("NSQConfig initialized with lookupAddresses:{}, topicName:{}", lookupAddresses, topicName);
+
+ backupPath = context.getString("backupPath");
+ scopeId = context.getString("scopeId");
+ backupContext = context.getString("backupContext");
}
}
diff --git a/src/test/resources/test.conf b/src/test/resources/test.conf
index d34c558..c63b093 100644
--- a/src/test/resources/test.conf
+++ b/src/test/resources/test.conf
@@ -12,6 +12,10 @@ a1.sources.sequence.type = seq
a1.sinks.nsq.type = com.youzan.flume.sink.nsq.NSQSink
a1.sinks.nsq.topic = flume_sink
a1.sinks.nsq.lookupdAddresses = sqs-qa.s.qima-inc.com:4161
+a1.sinks.nsq.backupPath = /tmp/backup
+a1.sinks.nsq.scopeId = nsq_msg_backup
+a1.sinks.nsq.backupContext = flume_sink_backup
+
# Use a channel which buffers events in memory
a1.channels.c1.type = memory