From 44649e930ac5044208d6760f2188090a937a0b66 Mon Sep 17 00:00:00 2001 From: "doraalin@163.com" Date: Fri, 14 Apr 2017 15:22:09 +0800 Subject: [PATCH] backup when publish failure Signed-off-by: doraalin@163.com --- pom.xml | 7 +- .../flume/sink/nsq/NSQLocalBackupQueue.java | 80 +++++++++++++++++++ .../com/youzan/flume/sink/nsq/NSQSink.java | 65 +++++++++++++-- src/test/resources/test.conf | 4 + 4 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java diff --git a/pom.xml b/pom.xml index 2c65437..95202fa 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.youzan nsq-flume-sink jar - 1.0-SNAPSHOT + 0.1-SNAPSHOT nsq-flume-sink http://maven.apache.org @@ -69,6 +69,11 @@ 1.7.0 provided + + com.youzan + filebackup + 0.1-SNAPSHOT + org.apache.flume flume-ng-core diff --git a/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java b/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java new file mode 100644 index 0000000..2207eb8 --- /dev/null +++ b/src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java @@ -0,0 +1,80 @@ +package com.youzan.flume.sink.nsq; + +import com.youzan.filebackup.context.BackupContext; +import com.youzan.filebackup.context.BackupScope; +import com.youzan.filebackup.context.BackupScopeBuilder; +import com.youzan.filebackup.context.DefaultBackupContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Created by lin on 17/4/13. + */ +public class NSQLocalBackupQueue { + private final static Logger logger = LoggerFactory.getLogger(NSQLocalBackupQueue.class); + private final BackupScope backupScope; + private volatile boolean init = false; + private AtomicLong cnt = new AtomicLong(0); + private final Object SYNC = new Object(); + + public NSQLocalBackupQueue(String backupPath, String scopeId, String contextName) { + BackupContext context = new DefaultBackupContext(contextName); + backupScope = BackupScopeBuilder.create(backupPath, scopeId) + .setBackupContext(context) + .build(); + backupScope.init(); + try { + backupScope.openWrite(); + } catch (IOException e) { + logger.error("Fail to open write, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName); + } + + try { + backupScope.openRead(); + } catch (IOException e) { + logger.error("Fail to open read, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName); + } + init = true; + logger.info("Local backup initialized."); + } + + public boolean isInit() { + return this.init; + } + + public int writeBackup(byte[] messageContent) throws IOException { + if(!isInit()) + throw new IllegalStateException("BackupQueue is not initialized for write."); + int count = this.backupScope.tryWrite(messageContent); + if(count > 0) { + cnt.incrementAndGet(); + synchronized (SYNC) { + logger.info("Try notifying one waiting read process."); + SYNC.notify(); + } + } + return count; + } + + public byte[] readBackup() throws IOException, InterruptedException { + if(!isInit()) + throw new IllegalArgumentException("BackupQueue is not initialized for read."); + byte[] messaege = backupScope.tryRead(); + if(null != messaege) { + return messaege; + } else { + logger.info("Read process wait for write."); + SYNC.wait(); + return readBackup(); + } + } + + public void close() throws IOException { + backupScope.closeWrite(); + //TODO: simply close read + backupScope.closeRead(); + } +} diff --git a/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java index 6626f93..8c7d268 100644 --- a/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java +++ b/src/main/java/com/youzan/flume/sink/nsq/NSQSink.java @@ -10,6 +10,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Created by lin on 17/4/6. */ @@ -19,31 +23,69 @@ public class NSQSink extends AbstractSink implements Configurable { private String lookupAddresses = null; private String defaultTopic = null; + private String backupPath; + private String scopeId; + private String backupContext; private NSQConfig config = null; private Producer producer = null; + private NSQLocalBackupQueue backupQueue = null; + private ExecutorService exec = Executors.newSingleThreadExecutor(); + private final Runnable READTASK = () -> { + while(true) { + byte[] message = null; + try { + message = backupQueue.readBackup(); + if(null != message) { + logger.info("Read one backup message."); + producer.publish(message, defaultTopic); + logger.info("backup message published."); + } + } catch (IOException e) { + logger.error("Fail to read from backup file"); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for notify."); + } catch (NSQException e) { + logger.error("Fail to publish to NSQd, write back to backup."); + try { + backupQueue.writeBackup(message); + } catch (IOException e1) { + logger.error("Fail to write to backup queue after read out from backup. Message {}", new String(message)); + } + } + } + }; @Override public Status process() throws EventDeliveryException { - Status status; + Status status = null; String topic; Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); + Event event = ch.take(); + byte[] message = event.getBody(); try{ - Event event = ch.take(); String topicText = event.getHeaders().get("topic"); if(null != topicText) topic = topicText; else topic = defaultTopic; //send - producer.publish(event.getBody(), topic); + producer.publish(message, topic); txn.commit(); status = Status.READY; } catch (Throwable t) { logger.error("Fail to publish to NSQ.", t); - txn.rollback(); - status = Status.BACKOFF; + try { + backupQueue.writeBackup(message); + logger.warn("Failed message write to backup file."); + txn.commit(); + status = Status.READY; + } catch (IOException e) { + logger.error("Fail to write to backup queue. Message {}", new String(message)); + txn.rollback(); + status = Status.BACKOFF; + } if (t instanceof Error) { throw (Error)t; } @@ -61,11 +103,20 @@ public void start() { } catch (NSQException e) { logger.error("Fail to start nsq producer.", e); } + backupQueue = new NSQLocalBackupQueue(backupPath, scopeId, backupContext); + final NSQLocalBackupQueue queue = this.backupQueue; + exec.submit(READTASK); } @Override public void stop () { producer.close(); + exec.shutdown(); + try { + backupQueue.close(); + } catch (IOException e) { + logger.error("Fail to close backup queue"); + } } @Override @@ -81,5 +132,9 @@ public void configure(Context context) { config = new NSQConfig(); config.setLookupAddresses(lookupAddresses); logger.info("NSQConfig initialized with lookupAddresses:{}, topicName:{}", lookupAddresses, topicName); + + backupPath = context.getString("backupPath"); + scopeId = context.getString("scopeId"); + backupContext = context.getString("backupContext"); } } diff --git a/src/test/resources/test.conf b/src/test/resources/test.conf index d34c558..c63b093 100644 --- a/src/test/resources/test.conf +++ b/src/test/resources/test.conf @@ -12,6 +12,10 @@ a1.sources.sequence.type = seq a1.sinks.nsq.type = com.youzan.flume.sink.nsq.NSQSink a1.sinks.nsq.topic = flume_sink a1.sinks.nsq.lookupdAddresses = sqs-qa.s.qima-inc.com:4161 +a1.sinks.nsq.backupPath = /tmp/backup +a1.sinks.nsq.scopeId = nsq_msg_backup +a1.sinks.nsq.backupContext = flume_sink_backup + # Use a channel which buffers events in memory a1.channels.c1.type = memory