Skip to content

Commit ec07b37

Browse files
author
Michael Schiff
committed
Event carries binlog filename and offset. Simplifies logic for
EventListeners that care about checkpointing progress.
1 parent 6cdecfd commit ec07b37

File tree

6 files changed

+26
-14
lines changed

6 files changed

+26
-14
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,7 @@ private void listenForEventPackets() throws IOException {
889889
try {
890890
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
891891
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
892-
inputStream);
892+
inputStream, binlogFilename, binlogPosition);
893893
if (event == null) {
894894
throw new EOFException();
895895
}
@@ -1047,7 +1047,7 @@ public void unregisterEventListener(EventListener eventListener) {
10471047

10481048
private void notifyEventListeners(Event event) {
10491049
if (event.getData() instanceof EventDeserializer.EventDataWrapper) {
1050-
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal());
1050+
event = new Event(event.getHeader(), ((EventDeserializer.EventDataWrapper) event.getData()).getExternal(), binlogFilename, binlogPosition);
10511051
}
10521052
synchronized (eventListeners) {
10531053
for (EventListener eventListener : eventListeners) {

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogFileReader.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,34 @@ public class BinaryLogFileReader implements Closeable {
3838

3939
private final ByteArrayInputStream inputStream;
4040
private final EventDeserializer eventDeserializer;
41+
private final String filename;
42+
private long offset;
4143

4244
public BinaryLogFileReader(File file) throws IOException {
4345
this(file, new EventDeserializer());
4446
}
4547

4648
public BinaryLogFileReader(File file, EventDeserializer eventDeserializer) throws IOException {
47-
this(file != null ? new BufferedInputStream(new FileInputStream(file)) : null, eventDeserializer);
49+
this(file != null ? file.getName() : null,
50+
file != null ? new BufferedInputStream(new FileInputStream(file)) : null,
51+
eventDeserializer);
4852
}
4953

50-
public BinaryLogFileReader(InputStream inputStream) throws IOException {
51-
this(inputStream, new EventDeserializer());
54+
public BinaryLogFileReader(String filename, InputStream inputStream) throws IOException {
55+
this(filename, inputStream, new EventDeserializer());
5256
}
5357

54-
public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
58+
public BinaryLogFileReader(String filename, InputStream inputStream, EventDeserializer eventDeserializer) throws IOException {
59+
if (filename == null) {
60+
throw new IllegalArgumentException("File name cannot be NULL");
61+
}
5562
if (inputStream == null) {
5663
throw new IllegalArgumentException("Input stream cannot be NULL");
5764
}
5865
if (eventDeserializer == null) {
5966
throw new IllegalArgumentException("Event deserializer cannot be NULL");
6067
}
68+
this.filename = filename;
6169
this.inputStream = new ByteArrayInputStream(inputStream);
6270
try {
6371
byte[] magicHeader = this.inputStream.read(MAGIC_HEADER.length);
@@ -79,7 +87,7 @@ public BinaryLogFileReader(InputStream inputStream, EventDeserializer eventDeser
7987
* @return deserialized event or null in case of end-of-stream
8088
*/
8189
public Event readEvent() throws IOException {
82-
return eventDeserializer.nextEvent(inputStream);
90+
return eventDeserializer.nextEvent(inputStream, filename, offset++);
8391
}
8492

8593
@Override

src/main/java/com/github/shyiko/mysql/binlog/event/Event.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ public class Event implements Serializable {
2424

2525
private EventHeader header;
2626
private EventData data;
27+
private String binlogFilename;
28+
private long binlogPosition;
2729

28-
public Event(EventHeader header, EventData data) {
30+
public Event(EventHeader header, EventData data, String binlogFilename, long binlogPosition) {
2931
this.header = header;
3032
this.data = data;
33+
this.binlogFilename = binlogFilename;
34+
this.binlogPosition = binlogPosition;
3135
}
3236

3337
@SuppressWarnings("unchecked")

src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/EventDeserializer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
189189
/**
190190
* @return deserialized event or null in case of end-of-stream
191191
*/
192-
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
192+
public Event nextEvent(ByteArrayInputStream inputStream, String binlogFilename, long binlogOffset) throws IOException {
193193
if (inputStream.peek() == -1) {
194194
return null;
195195
}
@@ -212,7 +212,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
212212
}
213213
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
214214
}
215-
return new Event(eventHeader, eventData);
215+
return new Event(eventHeader, eventData, binlogFilename, binlogOffset);
216216
}
217217

218218
private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,

src/test/java/com/github/shyiko/mysql/binlog/BinaryLogFileReaderIntegrationTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public class BinaryLogFileReaderIntegrationTest {
4040

4141
@Test
4242
public void testNextEvent() throws Exception {
43-
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
44-
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
43+
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila",
44+
new GZIPInputStream(new FileInputStream("src/test/resources/mysql-bin.sakila.gz")));
4545
try {
4646
int numberOfEvents = 0;
4747
while ((reader.readEvent()) != null) {
@@ -68,7 +68,7 @@ public void testDeserializationSuppressionByEventType() throws Exception {
6868
EventDeserializer eventDeserializer = new EventDeserializer();
6969
eventDeserializer.setEventDataDeserializer(EventType.XID, new NullEventDataDeserializer());
7070
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new ByteArrayEventDataDeserializer());
71-
BinaryLogFileReader reader = new BinaryLogFileReader(new GZIPInputStream(
71+
BinaryLogFileReader reader = new BinaryLogFileReader("mysql-bin.sakila.gz", new GZIPInputStream(
7272
new FileInputStream("src/test/resources/mysql-bin.sakila.gz")), eventDeserializer);
7373
try {
7474
boolean n = true, b = true;

src/test/java/com/github/shyiko/mysql/binlog/jmx/BinaryLogClientStatisticsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,6 @@ private Event generateEvent(long timestamp, EventType type, long serverId, long
107107
header.setEventType(type);
108108
header.setServerId(serverId);
109109
header.setNextPosition(nextPosition);
110-
return new Event(header, null);
110+
return new Event(header, null, "filename", 4);
111111
}
112112
}

0 commit comments

Comments
 (0)