diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2bbdf1f
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,14 @@
+# Created by .ignore support plugin (hsz.mobi)
+### Maven template
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+
+### For Intellij
+.idea/
+**/*.iml
+
diff --git a/pom.xml b/pom.xml
index bd42266..bfa769d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
org.apache.flume.sources-taildirectory
flume-taildirectory-source
- 1.0.0
+ 1.1.0
jar
flume-taildirectory-source
diff --git a/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSource.java b/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSource.java
index aaa6adc..74037c6 100644
--- a/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSource.java
+++ b/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSource.java
@@ -39,7 +39,7 @@ public class DirectoryTailSource extends AbstractSource implements
private static final String CONFIG_PATH = "path";
private static final String UNLOCK_TIME = "unlockFileTime";
- private static final Logger logger = LoggerFactory
+ private static final Logger LOGGER = LoggerFactory
.getLogger(DirectoryTailSource.class);
private String confDirs;
private Set dirs;
@@ -47,8 +47,9 @@ public class DirectoryTailSource extends AbstractSource implements
private long timeToUnlockFile;
private DirectoryTailSourceCounter counter;
+ @Override
public void configure(Context context) {
- logger.info("Source Configuring..");
+ LOGGER.info("Source Configuring..");
confDirs = context.getString(CONFIG_DIRS).trim();
Preconditions.checkState(confDirs != null,
@@ -67,7 +68,7 @@ public void configure(Context context) {
+ "." + CONFIG_PATH);
dirs.add(path);
if (path == null) {
- logger.warn("Configuration is empty : " + CONFIG_DIRS + "."
+ LOGGER.warn("Configuration is empty : " + CONFIG_DIRS + "."
+ confDirArr[i] + "." + CONFIG_PATH);
continue;
}
@@ -79,7 +80,7 @@ public void configure(Context context) {
@Override
public void start() {
- logger.info("Source Starting..");
+ LOGGER.info("Source Starting..");
watchDirs = new HashSet();
counter.start();
@@ -87,11 +88,10 @@ public void start() {
for (String path : dirs) {
WatchDir watchDir = new WatchDir(FileSystems.getDefault()
.getPath(path), this, timeToUnlockFile, counter);
- watchDir.proccesEvents();
watchDirs.add(watchDir);
}
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error(e.getMessage(),e);
}
super.start();
@@ -99,12 +99,12 @@ public void start() {
@Override
public void stop() {
- super.stop();
counter.stop();
- logger.info("DirectoryTailSource {} stopped. Metrics: {}", getName(),
+ LOGGER.info("DirectoryTailSource {} stopped. Metrics: {}", getName(),
counter);
for (WatchDir watchDir : watchDirs) {
watchDir.stop();
}
+ super.stop();
}
}
diff --git a/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSourceCounter.java b/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSourceCounter.java
index e846a65..a3106aa 100644
--- a/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSourceCounter.java
+++ b/src/main/java/org/apache/flume/source/taildirectory/DirectoryTailSourceCounter.java
@@ -19,7 +19,7 @@ public class DirectoryTailSourceCounter extends MonitoredCounterGroup implements
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);
- public static final String[] ATTRIBUTES = { COUNTER_MESSAGE_SENT,
+ private static final String[] ATTRIBUTES = { COUNTER_MESSAGE_SENT,
COUNTER_MESSAGE_SENT_ERROR, CURRENT_THROUGHPUT, AVERAGE_THROUGHPUT };
public DirectoryTailSourceCounter(String name) {
@@ -32,26 +32,32 @@ public DirectoryTailSourceCounter(String name) {
TimeUnit.SECONDS);
}
+ @Override
public void increaseCounterMessageSent() {
increment(COUNTER_MESSAGE_SENT);
}
+ @Override
public long getCounterMessageSent() {
return get(COUNTER_MESSAGE_SENT);
}
+ @Override
public void increaseCounterMessageSentError() {
increment(COUNTER_MESSAGE_SENT_ERROR);
}
+ @Override
public long getCounterMessageSentError() {
return get(COUNTER_MESSAGE_SENT_ERROR);
}
+ @Override
public long getAverageThroughput() {
return get(AVERAGE_THROUGHPUT);
}
+ @Override
public long getCurrentThroughput() {
return get(CURRENT_THROUGHPUT);
}
@@ -75,11 +81,11 @@ public void run() {
if (currentTime > startTime) {
averageThroughput = currentMessages
- / ((currentTime - startTime));
+ / (currentTime - startTime);
}
set(AVERAGE_THROUGHPUT, averageThroughput);
previousMessages = currentMessages;
- }
+ }
}
}
}
diff --git a/src/main/java/org/apache/flume/source/taildirectory/FileKeys.java b/src/main/java/org/apache/flume/source/taildirectory/FileKeys.java
new file mode 100644
index 0000000..0e2e51e
--- /dev/null
+++ b/src/main/java/org/apache/flume/source/taildirectory/FileKeys.java
@@ -0,0 +1,32 @@
+package org.apache.flume.source.taildirectory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileKeys {
+
+ private static final Logger LOGGER= LoggerFactory
+ .getLogger(FileKeys.class);
+
+ private FileKeys(){
+ }
+
+ public static String getFileKey(Path path){
+ if (System.getProperty("os.name").toLowerCase().indexOf("win") >= 0)
+ return path.toString();
+ else
+ try{
+ return Files.readAttributes(path, BasicFileAttributes.class)
+ .fileKey().toString();
+ }catch (IOException e){
+ LOGGER.warn(e.getMessage(),e);
+ LOGGER.warn("File {} not found, maby removed/moved",path);
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flume/source/taildirectory/FileSet.java b/src/main/java/org/apache/flume/source/taildirectory/FileSet.java
index cbf10a6..3030346 100644
--- a/src/main/java/org/apache/flume/source/taildirectory/FileSet.java
+++ b/src/main/java/org/apache/flume/source/taildirectory/FileSet.java
@@ -29,12 +29,11 @@
import java.util.Map;
import org.apache.flume.Transaction;
-//import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileSet {
- private static final Logger logger = LoggerFactory.getLogger(FileSet.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileSet.class);
private BufferedReader bufferedReader;
private RandomAccessFile rReader;
private Transaction transaction;
@@ -42,30 +41,33 @@ public class FileSet {
private Map headers;
private long lastAppendTime;
private Path filePath;
+ private boolean fileIsOpen;
+ private File file;
public FileSet(Path filePath, String startFrom) throws IOException {
this.bufferList = new ArrayList();
+ this.headers = new HashMap();
this.lastAppendTime = System.currentTimeMillis();
this.filePath = filePath;
- File f = new File(filePath.toString());
-
- rReader = new RandomAccessFile(f, "r");
+ file = new File(filePath.toString());
+
+ if ("end".equals(startFrom)) {
+ fileIsOpen = false;
+ } else {
+ rReader = new RandomAccessFile(file, "r");
+ fileIsOpen = true;
+ if ("begin".equals(startFrom)) {
+ rReader.seek(0);
+ } else if ("lastLine".equals(startFrom)) {
+ seekToLastLine(rReader);
+ }
- if (startFrom.equals("begin")) {
- rReader.seek(0);
- } else if (startFrom.equals("end")) {
- rReader.seek(f.length());
- } else if (startFrom.equals("lastLine")) {
- seekToLastLine(rReader);
+ LOGGER.debug("File length --> " + file.length());
+ LOGGER.debug("File pointer --> " + rReader.getFilePointer());
+ LOGGER.debug("FileSet has been created " + filePath);
}
-
- logger.debug("File length --> " + f.length());
- logger.debug("File pointer --> " + rReader.getFilePointer());
-
- headers = new HashMap();
- logger.debug("FileSet has been created " + filePath);
}
// This method is use to avoid lost last line log
@@ -84,11 +86,9 @@ private void seekToLastLine(RandomAccessFile rReader) throws IOException {
posReached = true;
rReader.seek(filePointer);
}
- } else if (readByte == 0xD) {
- if (filePointer != fileLength - 1) {
- posReached = true;
- rReader.seek(filePointer);
- }
+ } else if (readByte == 0xD && filePointer != fileLength - 1) {
+ posReached = true;
+ rReader.seek(filePointer);
}
filePointer--;
@@ -120,9 +120,9 @@ public int getLineSize() {
return bufferList.size();
}
- public StringBuffer getAllLines() {
+ public StringBuilder getAllLines() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
for (int i = 0; i < bufferList.size(); i++) {
sb.append(bufferList.get(i));
@@ -144,6 +144,10 @@ public void clear() {
headers.clear();
}
+ public boolean isFileIsOpen() {
+ return fileIsOpen;
+ }
+
public Map getHeaders() {
return headers;
}
@@ -174,9 +178,21 @@ public void setBufferedReader(BufferedReader bufferedReader) {
public void close() throws IOException {
rReader.close();
+ fileIsOpen = false;
+ }
+
+ public void open() throws IOException {
+ rReader = new RandomAccessFile(file, "r");
+ seekToLastLine(rReader);
+ fileIsOpen = true;
}
public Path getFilePath() {
return filePath;
}
+
+ public void setFilePath(Path path) {
+ filePath = path;
+ file = new File(path.toString());
+ }
}
diff --git a/src/main/java/org/apache/flume/source/taildirectory/FileSetMap.java b/src/main/java/org/apache/flume/source/taildirectory/FileSetMap.java
new file mode 100644
index 0000000..9e9c3d0
--- /dev/null
+++ b/src/main/java/org/apache/flume/source/taildirectory/FileSetMap.java
@@ -0,0 +1,58 @@
+package org.apache.flume.source.taildirectory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileSetMap extends HashMap {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(FileSetMap.class);
+
+ private Map filePathsAndKeys;
+
+ FileSetMap(Map filePathsAndKeys){
+ super();
+ this.filePathsAndKeys = filePathsAndKeys;
+ }
+
+ public FileSet getFileSet(Path path) throws IOException {
+
+ String fileKey = FileKeys.getFileKey(path);
+
+ if (this.containsKey(fileKey)) {
+ return this.get(fileKey);
+ } else {
+ return addFileSetToMap(path, "lastLine");
+ }
+ }
+
+ public FileSet addFileSetToMap(Path path, String startFrom)
+ throws IOException {
+
+ FileSet fileSet;
+ String fileKey = FileKeys.getFileKey(path);
+
+ if (!this.containsKey(fileKey)) {
+ LOGGER.info("Scanning file: " + path.toString() + " with key: "
+ + fileKey);
+ fileSet = new FileSet(path, startFrom);
+ filePathsAndKeys.put(path.toString(), fileKey);
+ this.put(fileKey, fileSet);
+ } else{
+ fileSet = this.get(fileKey);
+
+ if (!fileSet.getFilePath().toString().equals(path.toString())){
+ fileSet.setFilePath(path);
+ }
+ }
+ return fileSet;
+ }
+}
diff --git a/src/main/java/org/apache/flume/source/taildirectory/WatchDir.java b/src/main/java/org/apache/flume/source/taildirectory/WatchDir.java
index d2cc34f..784b63f 100644
--- a/src/main/java/org/apache/flume/source/taildirectory/WatchDir.java
+++ b/src/main/java/org/apache/flume/source/taildirectory/WatchDir.java
@@ -19,40 +19,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- */
-
public class WatchDir {
private final WatchService watcher;
private final Map keys;
private AbstractSource source;
- private HashMap fileSetMap;
- private HashMap filePathsAndKeys;
+ private FileSetMap fileSetMap;
+ private Map filePathsAndKeys;
private long timeToUnlockFile;
- private static final Logger logger = LoggerFactory
+ private static final Logger LOGGER= LoggerFactory
.getLogger(WatchDir.class);
- private String OS;
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(2);
private DirectoryTailSourceCounter counter;
- @SuppressWarnings("unchecked")
- static WatchEvent cast(WatchEvent> event) {
- return (WatchEvent) event;
- }
-
/**
* Creates a WatchService and registers the given directory
*/
WatchDir(Path dir, AbstractSource source, long timeToUnlockFile,
DirectoryTailSourceCounter counter) throws IOException {
- logger.trace("WatchDir: WatchDir");
+ LOGGER.trace("WatchDir: WatchDir");
this.timeToUnlockFile = timeToUnlockFile;
this.counter = counter;
@@ -63,13 +53,13 @@ static WatchEvent cast(WatchEvent> event) {
this.keys = new HashMap();
this.filePathsAndKeys = new HashMap();
- this.fileSetMap = new HashMap();
-
- this.OS = getOSType();
+ this.fileSetMap = new FileSetMap(filePathsAndKeys);
- logger.info("Scanning directory: " + dir);
+ LOGGER.info("Scanning directory: " + dir);
registerAll(dir);
- logger.info("Done.");
+
+ Thread t = new Thread(new WatchDirRunnable());
+ t.start();
final Runnable lastAppend = new CheckLastTimeModified();
scheduler.scheduleAtFixedRate(lastAppend, 0, 1, TimeUnit.MINUTES);
@@ -77,23 +67,29 @@ static WatchEvent cast(WatchEvent> event) {
final Runnable printThroughput = new PrintThroughput();
scheduler.scheduleAtFixedRate(printThroughput, 0, 5, TimeUnit.SECONDS);
}
+
+ @SuppressWarnings("unchecked")
+ static WatchEvent cast(WatchEvent> event) {
+ return (WatchEvent) event;
+ }
+
+ /**
+ * Register the given directory, and all its sub-directories, with the
+ * WatchService.
+ */
+ private void registerAll(final Path start) throws IOException {
+
+ LOGGER.trace("WatchDir: registerAll");
- private String getOSType() {
- String osName = System.getProperty("os.name").toLowerCase();
-
- if (osName.indexOf("win") >= 0)
- OS = "win";
- else if (osName.indexOf("nix") >= 0 || osName.indexOf("nux") >= 0
- || osName.indexOf("aix") > 0)
- OS = "unix";
- else if (OS.indexOf("mac") >= 0)
- OS = "mac";
- else if (OS.indexOf("sunos") >= 0)
- OS = "solaris";
- else
- OS = "unknown";
-
- return OS;
+ // register directory and sub-directories
+ Files.walkFileTree(start, new SimpleFileVisitor() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir,
+ BasicFileAttributes attrs) throws IOException {
+ register(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
}
/**
@@ -101,76 +97,40 @@ else if (OS.indexOf("sunos") >= 0)
*/
private void register(Path dir) throws IOException {
- logger.trace("WatchDir: register");
+ LOGGER.trace("WatchDir: register");
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE,
ENTRY_MODIFY);
Path prev = keys.get(key);
- // TODO: Change this log lines, are not descriptive
- logger.info("prev: " + prev);
+ LOGGER.info("Previous directory: " + prev);
if (prev == null) {
- logger.info("register: " + dir);
+ LOGGER.info("Registering directory: " + dir);
} else {
if (!dir.equals(prev)) {
- logger.info("update: " + "-> " + prev + " " + dir);
+ LOGGER.info("Updating previous directory: " + "-> " + prev + " to " + dir);
}
}
keys.put(key, dir);
+
File folder = dir.toFile();
for (final File fileEntry : folder.listFiles()) {
if (!fileEntry.isDirectory()) {
- addFileSetToMap(fileEntry.toPath(), "end");
+ fileSetMap.addFileSetToMap(fileEntry.toPath(), "end");
} else {
- logger.warn("FileEntry found as directory --> TODO: debug this case");
- }
- }
- }
-
- /**
- * Register the given directory, and all its sub-directories, with the
- * WatchService.
- */
- private void registerAll(final Path start) throws IOException {
-
- logger.trace("WatchDir: registerAll");
-
- // register directory and sub-directories
- Files.walkFileTree(start, new SimpleFileVisitor() {
- @Override
- public FileVisitResult preVisitDirectory(Path dir,
- BasicFileAttributes attrs) throws IOException {
- register(dir);
- return FileVisitResult.CONTINUE;
+ LOGGER.warn("FileEntry found as directory --> TODO: debug this case");
}
- });
- }
-
- private void fileCreated(Path path) throws IOException,
- InterruptedException {
-
- logger.trace("WatchDir: fileCreated");
-
- if (Files.isDirectory(path, NOFOLLOW_LINKS))
- registerAll(path);
- else {
- addFileSetToMap(path, "begin");
}
}
-
- private void fileModified(Path path) throws IOException {
-
- logger.trace("WatchDir: fileModified");
-
+
+ private void readLines(FileSet fileSet) throws IOException{
String buffer;
- FileSet fileSet = getFileSet(path);
-
while ((buffer = fileSet.readLine()) != null) {
if (buffer.length() == 0) {
- logger.debug("Readed empty line");
+ LOGGER.debug("Readed empty line");
continue;
} else {
fileSet.appendLine(buffer);
@@ -182,81 +142,47 @@ private void fileModified(Path path) throws IOException {
}
}
- private FileSet getFileSet(Path path) throws IOException {
-
- logger.trace("WatchDir: getFileSet");
+ private void fileCreated(Path path) throws IOException{
- FileSet fileSet;
- String fileKey;
+ LOGGER.trace("WatchDir: fileCreated");
- if (OS == "win")
- fileKey = path.toString();
- else
- fileKey = Files.readAttributes(path, BasicFileAttributes.class)
- .fileKey().toString();
-
- if (fileSetMap.containsKey(fileKey)) {
- fileSet = fileSetMap.get(fileKey);
- } else {
- fileSet = addFileSetToMap(path, "lastLine");
- }
-
- return fileSet;
+ if (Files.isDirectory(path, NOFOLLOW_LINKS))
+ registerAll(path);
+ else {
+ FileSet fileSet = fileSetMap.addFileSetToMap(path, "begin");
+ if (fileSet.isFileIsOpen())
+ readLines(fileSet);
+ }
}
- private FileSet addFileSetToMap(Path path, String startFrom)
- throws IOException {
-
- logger.trace("WatchDir: addFileSetToMap");
-
- String fileKey;
- FileSet fileSet;
-
- if (OS == "win")
- fileKey = path.toString();
- else
- fileKey = Files.readAttributes(path, BasicFileAttributes.class)
- .fileKey().toString();
+ private void fileModified(Path path) throws IOException {
- if (!fileSetMap.containsKey(fileKey)) {
- logger.info("Scanning file: " + path.toString() + " with key: "
- + fileKey);
+ LOGGER.trace("WatchDir: fileModified");
- synchronized (fileSetMap) {
- fileSet = new FileSet(path, startFrom);
- filePathsAndKeys.put(path.toString(), fileKey);
- fileSetMap.put(fileKey, fileSet);
- }
- } else
- fileSet = fileSetMap.get(fileKey);
+ FileSet fileSet = fileSetMap.getFileSet(path);
+
+ if (!fileSet.isFileIsOpen())
+ fileSet.open();
- return fileSet;
+ readLines(fileSet);
}
private void fileDeleted(Path path) throws IOException {
- logger.trace("WatchDir: fileDeleted");
-
- String fileKey = null;
+ LOGGER.trace("WatchDir: fileDeleted");
- if (OS == "win") {
- fileKey = path.toString();
- } else {
- if (filePathsAndKeys.containsKey(path.toString())) {
- fileKey = filePathsAndKeys.get(path.toString());
- } else
- logger.error("File key of file " + path.toString()
- + " not found in filePathsAndKeys hashMap");
+ String fileKey = FileKeys.getFileKey(path);
+
+ if (fileKey == null){
+ fileKey = filePathsAndKeys.get(path.toString());
}
if (fileKey != null) {
- logger.info("Removing file: " + path + " with key: " + fileKey);
- synchronized (fileSetMap) {
- if (fileSetMap.containsKey(fileKey)) {
- fileSetMap.get(fileKey).clear();
- fileSetMap.get(fileKey).close();
- fileSetMap.remove(fileKey);
- }
+ FileSet fileSet = fileSetMap.get(fileKey);
+ if (fileSet.isFileIsOpen()){
+ fileSet.clear();
+ fileSet.close();
}
+
if (filePathsAndKeys.containsKey(path.toString())) {
filePathsAndKeys.remove(path.toString());
}
@@ -265,12 +191,12 @@ private void fileDeleted(Path path) throws IOException {
private void sendEvent(FileSet fileSet) {
- logger.trace("WatchDir: sendEvent");
+ LOGGER.trace("WatchDir: sendEvent");
if (fileSet.getBufferList().isEmpty())
return;
- StringBuffer sb = fileSet.getAllLines();
+ StringBuilder sb = fileSet.getAllLines();
Event event = EventBuilder.withBody(String.valueOf(sb).getBytes(),
fileSet.getHeaders());
source.getChannelProcessor().processEvent(event);
@@ -279,73 +205,17 @@ private void sendEvent(FileSet fileSet) {
fileSet.clear();
}
- public void proccesEvents() {
- logger.trace("WatchDir: run");
-
- try {
- for (;;) {
-
- // wait for key to be signaled
- WatchKey key;
- key = watcher.take();
- Path dir = keys.get(key);
-
- if (dir == null) {
- logger.error("WatchKey not recognized!!");
- continue;
- }
-
- for (WatchEvent> event : key.pollEvents()) {
- Kind> kind = event.kind();
-
- // Context for directory entry event is the file name of
- // entry
- WatchEvent ev = cast(event);
- Path name = ev.context();
- Path path = dir.resolve(name);
-
- // print out event
- logger.trace(event.kind().name() + ": " + path);
-
- if (kind == ENTRY_MODIFY) {
- fileModified(path);
- } else if (kind == ENTRY_CREATE) {
- fileCreated(path);
- } else if (kind == ENTRY_DELETE) {
- fileDeleted(path);
- }
- }
-
- // reset key and remove from set if directory no longer
- // accessible
- boolean valid = key.reset();
- if (!valid) {
- keys.remove(key);
- // all directories are inaccessible
- if (keys.isEmpty()) {
- break;
- }
- }
- }
- } catch (IOException x) {
- x.printStackTrace();
- } catch (InterruptedException x) {
- x.printStackTrace();
- return;
- }
- }
-
public void stop() {
- logger.trace("WatchDir: stop");
+ LOGGER.trace("WatchDir: stop");
try {
for (FileSet fileSet : fileSetMap.values()) {
- logger.debug("Closing file: " + fileSet.getFilePath());
+ LOGGER.debug("Closing file: " + fileSet.getFilePath());
fileSet.clear();
fileSet.close();
}
} catch (IOException x) {
- x.printStackTrace();
+ LOGGER.error(x.getMessage(),x);
}
}
@@ -364,25 +234,31 @@ public void run() {
for (String fileKey : fileKeySet) {
fileSet = fileSetMap.get(fileKey);
-
- lastAppendTime = fileSet.getLastAppendTime();
- currentTime = System.currentTimeMillis();
- logger.debug("Checking file: " + fileSet.getFilePath());
-
- if (currentTime - lastAppendTime > TimeUnit.MINUTES
- .toMillis(timeToUnlockFile)) {
- logger.info("File: " + fileSet.getFilePath()
- + " not modified after " + timeToUnlockFile
- + " minutes" + " removing from monitoring list");
- fileSetMap.get(fileKey).clear();
- fileSetMap.get(fileKey).close();
- filePathsAndKeys.remove(fileSet.getFilePath()
- .toString());
- fileSetMap.remove(fileKey);
+ if (fileSet.isFileIsOpen()){
+ lastAppendTime = fileSet.getLastAppendTime();
+ currentTime = System.currentTimeMillis();
+
+ LOGGER.trace("FILE: {}",fileSet.getFilePath());
+
+ Date expiry = new Date(lastAppendTime);
+ LOGGER.trace("LAST APPEND TIME {}", expiry);
+ expiry = new Date(currentTime);
+ LOGGER.trace("CURRENT TIME {}", currentTime);
+
+ LOGGER.debug("Checking file: " + fileSet.getFilePath());
+
+ if (currentTime - lastAppendTime > TimeUnit.MINUTES
+ .toMillis(timeToUnlockFile)) {
+ LOGGER.info("File: " + fileSet.getFilePath()
+ + " not modified after " + timeToUnlockFile
+ + " minutes" + " closing file");
+ fileSetMap.get(fileKey).clear();
+ fileSetMap.get(fileKey).close();
+ }
}
}
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error(e.getMessage(),e);
}
}
}
@@ -391,9 +267,64 @@ private class PrintThroughput implements Runnable {
@Override
public void run() {
- logger.debug("Current throughput: "
+ LOGGER.debug("Current throughput: "
+ counter.getCurrentThroughput());
}
}
+
+ private class WatchDirRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ for (;;) {
+ // wait for key to be signaled
+ WatchKey key;
+ key = watcher.take();
+ Path dir = keys.get(key);
+
+ if (dir == null) {
+ LOGGER.error("WatchKey not recognized!!");
+ continue;
+ }
+
+ for (WatchEvent> event : key.pollEvents()) {
+ Kind> kind = event.kind();
+
+ // Context for directory entry event is the file name of
+ // entry
+ WatchEvent ev = cast(event);
+ Path name = ev.context();
+ Path path = dir.resolve(name);
+
+ // print out event
+ LOGGER.trace(event.kind().name() + ": " + path);
+
+ if (kind == ENTRY_MODIFY) {
+ fileModified(path);
+ } else if (kind == ENTRY_CREATE) {
+ fileCreated(path);
+ } else if (kind == ENTRY_DELETE) {
+ fileDeleted(path);
+ }
+ }
+
+ // reset key and remove from set if directory no longer
+ // accessible
+ boolean valid = key.reset();
+ if (!valid) {
+ keys.remove(key);
+ // all directories are inaccessible
+ if (keys.isEmpty()) {
+ break;
+ }
+ }
+ }
+ } catch (Exception x) {
+ LOGGER.error(x.getMessage(), x);
+ }
+ }
+
+ }
}
\ No newline at end of file