Skip to content

Commit

Permalink
backup when publish failure
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] committed Apr 14, 2017
1 parent 6680420 commit 44649e9
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 6 deletions.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.youzan</groupId>
<artifactId>nsq-flume-sink</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<version>0.1-SNAPSHOT</version>
<name>nsq-flume-sink</name>
<url>http://maven.apache.org</url>
<properties>
Expand Down Expand Up @@ -69,6 +69,11 @@
<version>1.7.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.youzan</groupId>
<artifactId>filebackup</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
Expand Down
80 changes: 80 additions & 0 deletions src/main/java/com/youzan/flume/sink/nsq/NSQLocalBackupQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.youzan.flume.sink.nsq;

import com.youzan.filebackup.context.BackupContext;
import com.youzan.filebackup.context.BackupScope;
import com.youzan.filebackup.context.BackupScopeBuilder;
import com.youzan.filebackup.context.DefaultBackupContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by lin on 17/4/13.
*/
public class NSQLocalBackupQueue {
private final static Logger logger = LoggerFactory.getLogger(NSQLocalBackupQueue.class);
private final BackupScope backupScope;
private volatile boolean init = false;
private AtomicLong cnt = new AtomicLong(0);
private final Object SYNC = new Object();

public NSQLocalBackupQueue(String backupPath, String scopeId, String contextName) {
BackupContext context = new DefaultBackupContext(contextName);
backupScope = BackupScopeBuilder.create(backupPath, scopeId)
.setBackupContext(context)
.build();
backupScope.init();
try {
backupScope.openWrite();
} catch (IOException e) {
logger.error("Fail to open write, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName);
}

try {
backupScope.openRead();
} catch (IOException e) {
logger.error("Fail to open read, Path: {}, ScopeId: {}, ContextName: {}", backupPath, scopeId, contextName);
}
init = true;
logger.info("Local backup initialized.");
}

public boolean isInit() {
return this.init;
}

public int writeBackup(byte[] messageContent) throws IOException {
if(!isInit())
throw new IllegalStateException("BackupQueue is not initialized for write.");
int count = this.backupScope.tryWrite(messageContent);
if(count > 0) {
cnt.incrementAndGet();
synchronized (SYNC) {
logger.info("Try notifying one waiting read process.");
SYNC.notify();
}
}
return count;
}

public byte[] readBackup() throws IOException, InterruptedException {
if(!isInit())
throw new IllegalArgumentException("BackupQueue is not initialized for read.");
byte[] messaege = backupScope.tryRead();
if(null != messaege) {
return messaege;
} else {
logger.info("Read process wait for write.");
SYNC.wait();
return readBackup();
}
}

public void close() throws IOException {
backupScope.closeWrite();
//TODO: simply close read
backupScope.closeRead();
}
}
65 changes: 60 additions & 5 deletions src/main/java/com/youzan/flume/sink/nsq/NSQSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by lin on 17/4/6.
*/
Expand All @@ -19,31 +23,69 @@ public class NSQSink extends AbstractSink implements Configurable {

private String lookupAddresses = null;
private String defaultTopic = null;
private String backupPath;
private String scopeId;
private String backupContext;
private NSQConfig config = null;
private Producer producer = null;
private NSQLocalBackupQueue backupQueue = null;
private ExecutorService exec = Executors.newSingleThreadExecutor();
private final Runnable READTASK = () -> {
while(true) {
byte[] message = null;
try {
message = backupQueue.readBackup();
if(null != message) {
logger.info("Read one backup message.");
producer.publish(message, defaultTopic);
logger.info("backup message published.");
}
} catch (IOException e) {
logger.error("Fail to read from backup file");
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for notify.");
} catch (NSQException e) {
logger.error("Fail to publish to NSQd, write back to backup.");
try {
backupQueue.writeBackup(message);
} catch (IOException e1) {
logger.error("Fail to write to backup queue after read out from backup. Message {}", new String(message));
}
}
}
};

@Override
public Status process() throws EventDeliveryException {
Status status;
Status status = null;
String topic;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
Event event = ch.take();
byte[] message = event.getBody();
try{
Event event = ch.take();
String topicText = event.getHeaders().get("topic");
if(null != topicText)
topic = topicText;
else
topic = defaultTopic;
//send
producer.publish(event.getBody(), topic);
producer.publish(message, topic);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
logger.error("Fail to publish to NSQ.", t);
txn.rollback();
status = Status.BACKOFF;
try {
backupQueue.writeBackup(message);
logger.warn("Failed message write to backup file.");
txn.commit();
status = Status.READY;
} catch (IOException e) {
logger.error("Fail to write to backup queue. Message {}", new String(message));
txn.rollback();
status = Status.BACKOFF;
}
if (t instanceof Error) {
throw (Error)t;
}
Expand All @@ -61,11 +103,20 @@ public void start() {
} catch (NSQException e) {
logger.error("Fail to start nsq producer.", e);
}
backupQueue = new NSQLocalBackupQueue(backupPath, scopeId, backupContext);
final NSQLocalBackupQueue queue = this.backupQueue;
exec.submit(READTASK);
}

@Override
public void stop () {
producer.close();
exec.shutdown();
try {
backupQueue.close();
} catch (IOException e) {
logger.error("Fail to close backup queue");
}
}

@Override
Expand All @@ -81,5 +132,9 @@ public void configure(Context context) {
config = new NSQConfig();
config.setLookupAddresses(lookupAddresses);
logger.info("NSQConfig initialized with lookupAddresses:{}, topicName:{}", lookupAddresses, topicName);

backupPath = context.getString("backupPath");
scopeId = context.getString("scopeId");
backupContext = context.getString("backupContext");
}
}
4 changes: 4 additions & 0 deletions src/test/resources/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ a1.sources.sequence.type = seq
a1.sinks.nsq.type = com.youzan.flume.sink.nsq.NSQSink
a1.sinks.nsq.topic = flume_sink
a1.sinks.nsq.lookupdAddresses = sqs-qa.s.qima-inc.com:4161
a1.sinks.nsq.backupPath = /tmp/backup
a1.sinks.nsq.scopeId = nsq_msg_backup
a1.sinks.nsq.backupContext = flume_sink_backup


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
Expand Down

0 comments on commit 44649e9

Please sign in to comment.