Skip to content

Event carries binlog filename and offset. #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ private void listenForEventPackets() throws IOException {
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
inputStream, binlogFilename, binlogPosition);
if (event == null) {
throw new EOFException();
}
Expand Down Expand Up @@ -1047,7 +1047,8 @@ public void unregisterEventListener(EventListener eventListener) {

private void notifyEventListeners(Event event) {
if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal(),
binlogFilename, binlogPosition);
}
synchronized (eventListeners) {
for (EventListener eventListener : eventListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,39 @@
*/
public class BinaryLogFileReader implements Closeable {

public static final byte[] MAGIC_HEADER = new byte[]{(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};
public static final byte[] MAGIC_HEADER = new byte[] {(byte) 0xfe, (byte) 0x62, (byte) 0x69, (byte) 0x6e};

private final ByteArrayInputStream inputStream;
private final EventDeserializer eventDeserializer;
private final String filename;
private long offset;

public BinaryLogFileReader(File file) throws IOException {
this(file, new EventDeserializer());
}

public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException {
this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer);
this(file != null ? file.getName() : null,
file != null ? new BufferedInputStream(new FileInputStream(file)) : null,
eventDeserializer);
}

public BinaryLogFileReader(InputStream inputStream) throws IOException {
this(inputStream, new EventDeserializer());
public BinaryLogFileReader(String filename, InputStream inputStream) throws IOException {
this(filename, inputStream, new EventDeserializer());
}

public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
public BinaryLogFileReader(String filename, InputStream inputStream, EventDeserializer eventDeserializer)
throws IOException {
if (filename == null) {
throw new IllegalArgumentException("File name cannot be NULL");
}
if (inputStream == null) {
throw new IllegalArgumentException("Input stream cannot be NULL");
}
if (eventDeserializer == null) {
throw new IllegalArgumentException("Event deserializer cannot be NULL");
}
this.filename = filename;
this.inputStream = new ByteArrayInputStream(inputStream);
try {
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
Expand All @@ -79,7 +88,7 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
* @return deserialized event or null in case of end-of-stream
*/
public Event readEvent() throws IOException {
return eventDeserializer.nextEvent(inputStream);
return eventDeserializer.nextEvent(inputStream, filename, offset++);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ public class Event implements Serializable {

private EventHeader header;
private EventData data;
private String binlogFilename;
private long binlogPosition;

public Event(EventHeader header, EventData data) {
public Event(EventHeader header, EventData data, String binlogFilename, long binlogPosition) {
this.header = header;
this.data = data;
this.binlogFilename = binlogFilename;
this.binlogPosition = binlogPosition;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
/**
* @return deserialized event or null in case of end-of-stream
*/
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
public Event nextEvent(ByteArrayInputStream inputStream, String binlogFilename, long binlogOffset)
throws IOException {
if (inputStream.peek() == -1) {
return null;
}
Expand All @@ -212,7 +213,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
}
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return new Event(eventHeader, eventData);
return new Event(eventHeader, eventData, binlogFilename, binlogOffset);
}

private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class BinaryLogFileReaderIntegrationTest {

@Test
public void testNextEvent() throws Exception {
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila",
new GZIPInputStream(new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
try {
int numberOfEvents = 0;
while ((reader.readEvent()) != null) {
Expand All @@ -68,7 +68,7 @@ public void testDeserializationSuppressionByEventType() throws Exception {
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setEventDataDeserializer(EventType.XID, new NullEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer());
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila.gz", new GZIPInputStream(
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer);
try {
boolean n = true, b = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ private Event generateEvent(long timestamp, EventType type, long serverId, long
header.setEventType(type);
header.setServerId(serverId);
header.setNextPosition(nextPosition);
return new Event(header, null);
return new Event(header, null, "filename", 4);
}
}