Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #7 from keedio/development
Browse files Browse the repository at this point in the history
Merging development branch with master
  • Loading branch information
mvalleavila committed Mar 26, 2015
2 parents 86b3195 + 824ba25 commit d7e2284
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 255 deletions.
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Created by .ignore support plugin (hsz.mobi)
### Maven template
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml

### For Intellij
.idea/
**/*.iml

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.apache.flume.sources-taildirectory</groupId>
<artifactId>flume-taildirectory-source</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>
<packaging>jar</packaging>

<name>flume-taildirectory-source</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ public class DirectoryTailSource extends AbstractSource implements
private static final String CONFIG_PATH = "path";
private static final String UNLOCK_TIME = "unlockFileTime";

private static final Logger logger = LoggerFactory
private static final Logger LOGGER = LoggerFactory
.getLogger(DirectoryTailSource.class);
private String confDirs;
private Set<String> dirs;
private Set<WatchDir> watchDirs;
private long timeToUnlockFile;
private DirectoryTailSourceCounter counter;

@Override
public void configure(Context context) {
logger.info("Source Configuring..");
LOGGER.info("Source Configuring..");

confDirs = context.getString(CONFIG_DIRS).trim();
Preconditions.checkState(confDirs != null,
Expand All @@ -67,7 +68,7 @@ public void configure(Context context) {
+ "." + CONFIG_PATH);
dirs.add(path);
if (path == null) {
logger.warn("Configuration is empty : " + CONFIG_DIRS + "."
LOGGER.warn("Configuration is empty : " + CONFIG_DIRS + "."
+ confDirArr[i] + "." + CONFIG_PATH);
continue;
}
Expand All @@ -79,32 +80,31 @@ public void configure(Context context) {

@Override
public void start() {
logger.info("Source Starting..");
LOGGER.info("Source Starting..");
watchDirs = new HashSet<WatchDir>();
counter.start();

try {
for (String path : dirs) {
WatchDir watchDir = new WatchDir(FileSystems.getDefault()
.getPath(path), this, timeToUnlockFile, counter);
watchDir.proccesEvents();
watchDirs.add(watchDir);
}
} catch (IOException e) {
e.printStackTrace();
LOGGER.error(e.getMessage(),e);
}

super.start();
}

@Override
public void stop() {
super.stop();
counter.stop();
logger.info("DirectoryTailSource {} stopped. Metrics: {}", getName(),
LOGGER.info("DirectoryTailSource {} stopped. Metrics: {}", getName(),
counter);
for (WatchDir watchDir : watchDirs) {
watchDir.stop();
}
super.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class DirectoryTailSourceCounter extends MonitoredCounterGroup implements
private final ScheduledExecutorService scheduler = Executors
.newScheduledThreadPool(1);

public static final String[] ATTRIBUTES = { COUNTER_MESSAGE_SENT,
private static final String[] ATTRIBUTES = { COUNTER_MESSAGE_SENT,
COUNTER_MESSAGE_SENT_ERROR, CURRENT_THROUGHPUT, AVERAGE_THROUGHPUT };

public DirectoryTailSourceCounter(String name) {
Expand All @@ -32,26 +32,32 @@ public DirectoryTailSourceCounter(String name) {
TimeUnit.SECONDS);
}

@Override
public void increaseCounterMessageSent() {
increment(COUNTER_MESSAGE_SENT);
}

@Override
public long getCounterMessageSent() {
return get(COUNTER_MESSAGE_SENT);
}

@Override
public void increaseCounterMessageSentError() {
increment(COUNTER_MESSAGE_SENT_ERROR);
}

@Override
public long getCounterMessageSentError() {
return get(COUNTER_MESSAGE_SENT_ERROR);
}

@Override
public long getAverageThroughput() {
return get(AVERAGE_THROUGHPUT);
}

@Override
public long getCurrentThroughput() {
return get(CURRENT_THROUGHPUT);
}
Expand All @@ -75,11 +81,11 @@ public void run() {

if (currentTime > startTime) {
averageThroughput = currentMessages
/ ((currentTime - startTime));
/ (currentTime - startTime);
}
set(AVERAGE_THROUGHPUT, averageThroughput);
previousMessages = currentMessages;
}
}
}
}
}
32 changes: 32 additions & 0 deletions src/main/java/org/apache/flume/source/taildirectory/FileKeys.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.flume.source.taildirectory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileKeys {

private static final Logger LOGGER= LoggerFactory
.getLogger(FileKeys.class);

private FileKeys(){
}

public static String getFileKey(Path path){
if (System.getProperty("os.name").toLowerCase().indexOf("win") >= 0)
return path.toString();
else
try{
return Files.readAttributes(path, BasicFileAttributes.class)
.fileKey().toString();
}catch (IOException e){
LOGGER.warn(e.getMessage(),e);
LOGGER.warn("File {} not found, maby removed/moved",path);
return null;
}
}
}
64 changes: 40 additions & 24 deletions src/main/java/org/apache/flume/source/taildirectory/FileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,43 +29,45 @@
import java.util.Map;

import org.apache.flume.Transaction;
//import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSet {
private static final Logger logger = LoggerFactory.getLogger(FileSet.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FileSet.class);
private BufferedReader bufferedReader;
private RandomAccessFile rReader;
private Transaction transaction;
private List<String> bufferList;
private Map<String, String> headers;
private long lastAppendTime;
private Path filePath;
private boolean fileIsOpen;
private File file;

public FileSet(Path filePath, String startFrom) throws IOException {

this.bufferList = new ArrayList<String>();
this.headers = new HashMap<String, String>();
this.lastAppendTime = System.currentTimeMillis();
this.filePath = filePath;

File f = new File(filePath.toString());

rReader = new RandomAccessFile(f, "r");
file = new File(filePath.toString());

if ("end".equals(startFrom)) {
fileIsOpen = false;
} else {
rReader = new RandomAccessFile(file, "r");
fileIsOpen = true;
if ("begin".equals(startFrom)) {
rReader.seek(0);
} else if ("lastLine".equals(startFrom)) {
seekToLastLine(rReader);
}

if (startFrom.equals("begin")) {
rReader.seek(0);
} else if (startFrom.equals("end")) {
rReader.seek(f.length());
} else if (startFrom.equals("lastLine")) {
seekToLastLine(rReader);
LOGGER.debug("File length --> " + file.length());
LOGGER.debug("File pointer --> " + rReader.getFilePointer());
LOGGER.debug("FileSet has been created " + filePath);
}

logger.debug("File length --> " + f.length());
logger.debug("File pointer --> " + rReader.getFilePointer());

headers = new HashMap<String, String>();
logger.debug("FileSet has been created " + filePath);
}

// This method is use to avoid lost last line log
Expand All @@ -84,11 +86,9 @@ private void seekToLastLine(RandomAccessFile rReader) throws IOException {
posReached = true;
rReader.seek(filePointer);
}
} else if (readByte == 0xD) {
if (filePointer != fileLength - 1) {
posReached = true;
rReader.seek(filePointer);
}
} else if (readByte == 0xD && filePointer != fileLength - 1) {
posReached = true;
rReader.seek(filePointer);
}

filePointer--;
Expand Down Expand Up @@ -120,9 +120,9 @@ public int getLineSize() {
return bufferList.size();
}

public StringBuffer getAllLines() {
public StringBuilder getAllLines() {

StringBuffer sb = new StringBuffer();
StringBuilder sb = new StringBuilder();

for (int i = 0; i < bufferList.size(); i++) {
sb.append(bufferList.get(i));
Expand All @@ -144,6 +144,10 @@ public void clear() {
headers.clear();
}

public boolean isFileIsOpen() {
return fileIsOpen;
}

public Map<String, String> getHeaders() {
return headers;
}
Expand Down Expand Up @@ -174,9 +178,21 @@ public void setBufferedReader(BufferedReader bufferedReader) {

public void close() throws IOException {
rReader.close();
fileIsOpen = false;
}

public void open() throws IOException {
rReader = new RandomAccessFile(file, "r");
seekToLastLine(rReader);
fileIsOpen = true;
}

public Path getFilePath() {
return filePath;
}

public void setFilePath(Path path) {
filePath = path;
file = new File(path.toString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.flume.source.taildirectory;

import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSetMap extends HashMap<String, FileSet> {

private static final long serialVersionUID = 1L;

private static final Logger LOGGER = LoggerFactory
.getLogger(FileSetMap.class);

private Map<String, String> filePathsAndKeys;

FileSetMap(Map<String, String> filePathsAndKeys){
super();
this.filePathsAndKeys = filePathsAndKeys;
}

public FileSet getFileSet(Path path) throws IOException {

String fileKey = FileKeys.getFileKey(path);

if (this.containsKey(fileKey)) {
return this.get(fileKey);
} else {
return addFileSetToMap(path, "lastLine");
}
}

public FileSet addFileSetToMap(Path path, String startFrom)
throws IOException {

FileSet fileSet;
String fileKey = FileKeys.getFileKey(path);

if (!this.containsKey(fileKey)) {
LOGGER.info("Scanning file: " + path.toString() + " with key: "
+ fileKey);
fileSet = new FileSet(path, startFrom);
filePathsAndKeys.put(path.toString(), fileKey);
this.put(fileKey, fileSet);
} else{
fileSet = this.get(fileKey);

if (!fileSet.getFilePath().toString().equals(path.toString())){
fileSet.setFilePath(path);
}
}
return fileSet;
}
}
Loading

0 comments on commit d7e2284

Please sign in to comment.