Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions src/main/java/info/fetter/logstashforwarder/FileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
*/

import info.fetter.logstashforwarder.util.AdapterException;
import info.fetter.logstashforwarder.util.LogFile;
import info.fetter.logstashforwarder.util.RandomAccessFile;
import info.fetter.logstashforwarder.util.NamedPipe;

import java.io.File;
import java.io.IOException;
//import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -75,11 +76,11 @@ private int readFile(FileState state, int spaceLeftInSpool) {
long pointer = state.getPointer();
int numberOfEvents = 0;
try {
if(state.isDeleted() || state.getRandomAccessFile() == null) { // Don't try to read this file
if(state.isDeleted() || state.getLogFile() == null) { // Don't try to read this file
if(logger.isTraceEnabled()) {
logger.trace("File : " + file + " has been deleted");
}
} else if(state.getRandomAccessFile().isEmpty()) {
} else if(state.getLogFile().isEmpty()) {
if(logger.isTraceEnabled()) {
logger.trace("File : " + file + " is empty");
}
Expand All @@ -104,8 +105,10 @@ private int readFile(FileState state, int spaceLeftInSpool) {
}

private boolean isCompressedFile(FileState state) {
RandomAccessFile reader = state.getRandomAccessFile();

LogFile logFile = state.getLogFile();
if (!(logFile instanceof RandomAccessFile)) return false;
RandomAccessFile reader = (RandomAccessFile) logFile;

try {
for(byte[] magic : MAGICS) {
byte[] fileBytes = new byte[magic.length];
Expand Down Expand Up @@ -143,14 +146,15 @@ private static void copyLineToBuffer(byte[] line, ByteBuffer byteBuffer) {
}

private long readLines(FileState state, int spaceLeftInSpool) {
RandomAccessFile reader = state.getRandomAccessFile();
LogFile reader = state.getLogFile();
long pos = state.getPointer();
Multiline multiline = state.getMultiline();
if (spaceLeftInSpool < 1) return pos;
try {
reader.seek(pos);
byte[] line = readLine(reader);
bufferedLines.clear();

if(multiline != null && multiline.isPrevious()) {
spaceLeftInSpool--;
}
Expand Down Expand Up @@ -201,7 +205,7 @@ private long readLines(FileState state, int spaceLeftInSpool) {
}
}
}
line = readLine(reader);
if (spaceLeftInSpool > 0) line = readLine(reader);
}
if(bufferedLines.position() > 0) {
addEvent(state, pos, extractBytes(bufferedLines)); // send any buffered lines left
Expand All @@ -213,7 +217,7 @@ private long readLines(FileState state, int spaceLeftInSpool) {
return pos;
}

private byte[] readLine(RandomAccessFile reader) throws IOException {
private byte[] readLine(LogFile reader) throws IOException {
byteBuffer.clear();
int ch;
boolean seenCR = false;
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/info/fetter/logstashforwarder/FileSigner.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package info.fetter.logstashforwarder;

import info.fetter.logstashforwarder.util.RandomAccessFile;

import java.io.IOException;
//import java.io.RandomAccessFile;
import java.util.zip.Adler32;

import info.fetter.logstashforwarder.util.LogFile;
import info.fetter.logstashforwarder.util.RandomAccessFile;


public class FileSigner {
private static final Adler32 adler32 = new Adler32();

public static long computeSignature(RandomAccessFile file, int signatureLength) throws IOException {

public static long computeSignature(LogFile logFile, int signatureLength) throws IOException {
if (!(logFile instanceof RandomAccessFile)) return 0;

RandomAccessFile reader = (RandomAccessFile) logFile;
adler32.reset();
byte[] input = new byte[signatureLength];
file.seek(0);
file.read(input);
reader.seek(0);
reader.read(input);
adler32.update(input);
return adler32.getValue();
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/java/info/fetter/logstashforwarder/FileState.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*
*/

import info.fetter.logstashforwarder.util.LogFile;
import info.fetter.logstashforwarder.util.RandomAccessFile;
import info.fetter.logstashforwarder.util.NamedPipe;

import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -47,7 +49,7 @@ public class FileState {
@JsonIgnore
private boolean changed = false;
@JsonIgnore
private RandomAccessFile randomAccessFile;
private LogFile logFile;
private long pointer = 0;
@JsonIgnore
private FileState oldFileState;
Expand All @@ -67,15 +69,19 @@ public FileState(File file) throws IOException {
this.file = file;
directory = file.getCanonicalFile().getParent();
fileName = file.getName();
randomAccessFile = new RandomAccessFile(file.getPath(), "r");
if (file.isFile()) {
logFile = new RandomAccessFile(file.getPath(), "r");
} else {
logFile = new NamedPipe(file);
}
lastModified = file.lastModified();
size = file.length();
}

private void setFileFromDirectoryAndName() throws FileNotFoundException {
file = new File(directory + File.separator + fileName);
if(file.exists()) {
randomAccessFile = null;
logFile = null;
lastModified = file.lastModified();
size = file.length();
} else {
Expand Down Expand Up @@ -141,8 +147,8 @@ public void setSignature(long signature) {
this.signature = signature;
}

public RandomAccessFile getRandomAccessFile() {
return randomAccessFile;
public LogFile getLogFile() {
return logFile;
}

public long getPointer() {
Expand Down Expand Up @@ -172,7 +178,7 @@ public void setOldFileState(FileState oldFileState) {

public void deleteOldFileState() {
try {
oldFileState.getRandomAccessFile().close();
oldFileState.getLogFile().close();
oldFileState = null;
} catch(Exception e) {}
}
Expand Down
40 changes: 31 additions & 9 deletions src/main/java/info/fetter/logstashforwarder/FileWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

/*
* Copyright 2015 Didier Fetter
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -136,7 +137,7 @@ private void processModifications() throws IOException {
logger.trace("Same signature size and value : file is the same");
continue;
} else if(oldState.getSignatureLength() < state.getSignatureLength()){
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength());
long signature = FileSigner.computeSignature(state.getLogFile(), oldState.getSignatureLength());
if(signature == oldState.getSignature()) {
state.setOldFileState(oldState);
logger.trace("Same signature : file is the same");
Expand All @@ -163,7 +164,7 @@ private void processModifications() throws IOException {
logger.trace("Same signature size and value : file is the same");
break;
} else if(otherState.getSignatureLength() < state.getSignatureLength()){
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength());
long signature = FileSigner.computeSignature(state.getLogFile(), otherState.getSignatureLength());
if(signature == otherState.getSignature()) {
state.setOldFileState(otherState);
logger.trace("Same signature : file is the same");
Expand Down Expand Up @@ -194,7 +195,7 @@ private void processModifications() throws IOException {
logger.debug("File " + state.getFile() + " has been replaced and not renamed, removing from watchMap");
}
try {
oldState.getRandomAccessFile().close();
oldState.getLogFile().close();
} catch(Exception e) {}
oldWatchMap.remove(state.getFile());
}
Expand Down Expand Up @@ -223,12 +224,33 @@ private void processModifications() throws IOException {
removeMarkedFilesFromWatchMap();
}

// This filter will accept anything that is not a directory,
// including named pipes (FIFOs), sockets and device files.
// The standard org.apache.commons.io.filefilter.FileFileFilter excludes
// them even if their documentation says
// "This filter accepts Files that are files (not directories)."
protected class FileFileFilter implements IOFileFilter
{
@Override
public boolean accept(File file) {
return !file.isDirectory();
}

@Override
public boolean accept(File dir, String name) {
return accept(new File(dir, name));
}
}
protected IOFileFilter fileFileFilter() {
return new FileFileFilter();
}

private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception {
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
String directory = FilenameUtils.getFullPath(fileToWatch);
String fileName = FilenameUtils.getName(fileToWatch);
IOFileFilter fileFilter = FileFilterUtils.and(
FileFilterUtils.fileFileFilter(),
fileFileFilter(),
FileFilterUtils.nameFileFilter(fileName),
new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
Expand All @@ -240,7 +262,7 @@ private void addWildCardFiles(String filesToWatch, Event fields, long deadTime,
String wildcard = FilenameUtils.getName(filesToWatch);
logger.trace("Directory : " + new File(directory).getCanonicalPath() + ", wildcard : " + wildcard);
IOFileFilter fileFilter = FileFilterUtils.and(
FileFilterUtils.fileFileFilter(),
fileFileFilter(),
new WildcardFileFilter(wildcard),
new LastModifiedFileFilter(deadTime));
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
Expand Down Expand Up @@ -273,7 +295,7 @@ private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields,
state.setFields(fields);
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
state.setSignatureLength(signatureLength);
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
long signature = FileSigner.computeSignature(state.getLogFile(), signatureLength);
state.setSignature(signature);
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
state.setMultiline(multiline);
Expand Down Expand Up @@ -331,7 +353,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException {
List<File> markedList = null;
for(File file : oldWatchMap.keySet()) {
FileState state = oldWatchMap.get(file);
if(state.getRandomAccessFile() == null) {
if(state.getLogFile() == null) {
state.setDeleted();
}
if(state.isDeleted()) {
Expand All @@ -342,7 +364,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException {
markedList.add(file);
}
try {
state.getRandomAccessFile().close();
state.getLogFile().close();
} catch(Exception e) {}
}
}
Expand All @@ -358,7 +380,7 @@ public void close() throws IOException {
logger.debug("Closing all files");
for(File file : oldWatchMap.keySet()) {
FileState state = oldWatchMap.get(file);
state.getRandomAccessFile().close();
state.getLogFile().close();
}
}

Expand Down
39 changes: 39 additions & 0 deletions src/main/java/info/fetter/logstashforwarder/util/LogFile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package info.fetter.logstashforwarder.util;

/*
* Copyright 2018 Alberto González Palomo https://sentido-labs.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.log4j.Logger;

public interface LogFile {
/**
* Check whether the file is empty: normal files are empty
* when their size is zero, but other kinds of files like
* named pipes / FIFOs do not report a size.
*/
public abstract boolean isEmpty() throws IOException;
public abstract void seek(long pos) throws IOException;
public abstract long getFilePointer() throws IOException;
public abstract int read() throws IOException;
public abstract void close() throws IOException;
}
Loading