Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
DDAgentFeaturesDiscovery ret = featuresDiscovery;
if (ret == null) {
synchronized (this) {
if (featuresDiscovery == null) {
if ((ret = featuresDiscovery) == null) {
createRemaining(config);
ret =
new DDAgentFeaturesDiscovery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import java.io.IOException;
import java.io.OutputStream;
import javax.annotation.concurrent.NotThreadSafe;

/**
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
* that the latest n bytes matches the marker, a content is injected before.
* that the latest n bytes matches the marker, a content is injected before. In case of IOException
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
* this case the draining will be resumed.
*/
@NotThreadSafe
public class InjectingPipeOutputStream extends OutputStream {
private final byte[] lookbehind;
private int pos;
private boolean bufferFilled;
private int count;
private final byte[] marker;
private final byte[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private boolean filter;
private boolean wasDraining;
private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
Expand All @@ -34,32 +39,39 @@ public InjectingPipeOutputStream(
this.marker = marker;
this.lookbehind = new byte[marker.length];
this.pos = 0;
this.count = 0;
this.matchingPos = 0;
this.wasDraining = false;
// should filter the stream to potentially inject into it.
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int b) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// continue draining
drain();
}
downstream.write(b);
return;
}

if (bufferFilled) {
if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
} else {
count++;
}

lookbehind[pos] = (byte) b;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
found = true;
filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
Expand All @@ -73,18 +85,23 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] array, int off, int len) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// needs drain
drain();
}
downstream.write(array, off, len);
return;
}

if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
// Also we need to fill the buffer with the tail since we don't know about the next write.
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
Expand All @@ -99,7 +116,12 @@ public void write(byte[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
boolean wasFiltering = filter;

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = wasFiltering;
for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
Expand Down Expand Up @@ -133,16 +155,25 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
}

private void drain() throws IOException {
if (bufferFilled) {
for (int i = 0; i < lookbehind.length; i++) {
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
if (count > 0) {
boolean wasFiltering = filter;
filter = false;
wasDraining = true;
int start = (pos - count + lookbehind.length) % lookbehind.length;
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
count--;
}
} else {
downstream.write(this.lookbehind, 0, pos);
filter = wasFiltering;
wasDraining = false;
}
}

public void commit() throws IOException {
if (filter || wasDraining) {
drain();
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

@Override
Expand All @@ -152,9 +183,14 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
if (!found) {
drain();
try {
commit();
} finally {
downstream.close();
}
downstream.close();
}

public void setFilter(boolean filter) {
this.filter = filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import java.io.IOException;
import java.io.Writer;
import javax.annotation.concurrent.NotThreadSafe;

/**
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
* the latest n bytes matches the marker, a content is injected before.
* the latest n bytes matches the marker, a content is injected before. In case of IOException
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
* this case the draining will be resumed.
*/
@NotThreadSafe
public class InjectingPipeWriter extends Writer {
private final char[] lookbehind;
private int pos;
private boolean bufferFilled;
private int count;
private final char[] marker;
private final char[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private boolean filter;
private boolean wasDraining;
private int matchingPos;
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
Expand All @@ -34,32 +39,39 @@ public InjectingPipeWriter(
this.marker = marker;
this.lookbehind = new char[marker.length];
this.pos = 0;
this.count = 0;
this.matchingPos = 0;
this.wasDraining = false;
// should filter the stream to potentially inject into it.
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int c) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// continue draining
drain();
}
downstream.write(c);
return;
}

if (bufferFilled) {
if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
} else {
count++;
}

lookbehind[pos] = (char) c;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == c) {
if (matchingPos == marker.length) {
found = true;
filter = false;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
Expand All @@ -71,25 +83,25 @@ public void write(int c) throws IOException {
}
}

@Override
public void flush() throws IOException {
downstream.flush();
}

@Override
public void write(char[] array, int off, int len) throws IOException {
if (found) {
if (!filter) {
if (wasDraining) {
// needs drain
drain();
}
downstream.write(array, off, len);
return;
}

if (len > bulkWriteThreshold) {
// if the content is large enough, we can bulk write everything but the N trail and tail.
// This because the buffer can already contain some byte from a previous single write.
// Also we need to fill the buffer with the tail since we don't know about the next write.
int idx = arrayContains(array, off, len, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
filter = false;
drain();
downstream.write(array, off, idx);
downstream.write(contentToInject);
Expand All @@ -104,7 +116,13 @@ public void write(char[] array, int off, int len) throws IOException {
write(array[i]);
}
drain();
boolean wasFiltering = filter;

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = wasFiltering;

for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
}
Expand Down Expand Up @@ -138,23 +156,42 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
}

private void drain() throws IOException {
if (bufferFilled) {
for (int i = 0; i < lookbehind.length; i++) {
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
if (count > 0) {
boolean wasFiltering = filter;
filter = false;
wasDraining = true;
int start = (pos - count + lookbehind.length) % lookbehind.length;
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
count--;
}
} else {
downstream.write(this.lookbehind, 0, pos);
filter = wasFiltering;
wasDraining = false;
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

public void commit() throws IOException {
if (filter || wasDraining) {
drain();
}
}

@Override
public void flush() throws IOException {
downstream.flush();
}

@Override
public void close() throws IOException {
if (!found) {
drain();
try {
commit();
} finally {
downstream.close();
}
downstream.close();
}

public void setFilter(boolean filter) {
this.filter = filter;
}
}
Loading