Skip to content

Commit f8d108d

Browse files
Give RowsEvent a method to inspect the underlying event type. (#1016)
* Give `RowsEvent` methods to inspect the underlying event type. It's currently impossible to figure out what kind of operation led to a `RowsEvent`; multiple results indicate an `UPDATE`, but there's no way to differentiate an `INSERT` from a `DELETE`. * Rework individual methods for `INSERT`/`UPDATE`/`DELETE` into a single public RowsEvent type. * Nit: clarify difference between private an public `RowsEvent` types. * List event type information on dumps for `RowsEvent`. * Verify `RowsEvent` type handling within `TransactionPayloadEvent` decoded event tests.
1 parent 0cb1e12 commit f8d108d

File tree

3 files changed

+84
-0
lines changed

3 files changed

+84
-0
lines changed

replication/row_event.go

+38
Original file line numberDiff line numberDiff line change
@@ -905,6 +905,7 @@ type RowsEvent struct {
905905
// for mariadb *_COMPRESSED_EVENT_V1
906906
compressed bool
907907

908+
// raw event type associated with a RowsEvent
908909
eventType EventType
909910

910911
Table *TableMapEvent
@@ -950,6 +951,29 @@ type RowsEvent struct {
950951
ignoreJSONDecodeErr bool
951952
}
952953

954+
// EnumRowsEventType is an abridged type describing the operation which triggered the given RowsEvent.
955+
type EnumRowsEventType byte
956+
957+
const (
958+
EnumRowsEventTypeUnknown = EnumRowsEventType(iota)
959+
EnumRowsEventTypeInsert
960+
EnumRowsEventTypeUpdate
961+
EnumRowsEventTypeDelete
962+
)
963+
964+
func (t EnumRowsEventType) String() string {
965+
switch t {
966+
case EnumRowsEventTypeInsert:
967+
return "insert"
968+
case EnumRowsEventTypeUpdate:
969+
return "update"
970+
case EnumRowsEventTypeDelete:
971+
return "delete"
972+
default:
973+
return fmt.Sprintf("unknown (%d)", t)
974+
}
975+
}
976+
953977
// EnumRowImageType is allowed types for every row in mysql binlog.
954978
// See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39
955979
// enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI };
@@ -1120,6 +1144,19 @@ func (e *RowsEvent) Decode(data []byte) error {
11201144
return e.DecodeData(pos, data)
11211145
}
11221146

1147+
func (e *RowsEvent) Type() EnumRowsEventType {
1148+
switch e.eventType {
1149+
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
1150+
return EnumRowsEventTypeInsert
1151+
case UPDATE_ROWS_EVENTv0, UPDATE_ROWS_EVENTv1, UPDATE_ROWS_EVENTv2, MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
1152+
return EnumRowsEventTypeUpdate
1153+
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
1154+
return EnumRowsEventTypeDelete
1155+
default:
1156+
return EnumRowsEventTypeUnknown
1157+
}
1158+
}
1159+
11231160
func isBitSet(bitmap []byte, i int) bool {
11241161
return bitmap[i>>3]&(1<<(uint(i)&7)) > 0
11251162
}
@@ -1817,6 +1854,7 @@ func (e *RowsEvent) Dump(w io.Writer) {
18171854
fmt.Fprintf(w, "Flags: %d\n", e.Flags)
18181855
fmt.Fprintf(w, "Column count: %d\n", e.ColumnCount)
18191856
fmt.Fprintf(w, "NDB data: %s\n", e.NdbData)
1857+
fmt.Fprintf(w, "Event type: %s (%s)", e.Type(), e.eventType)
18201858

18211859
fmt.Fprintf(w, "Values:\n")
18221860
for _, rows := range e.Rows {

replication/row_event_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -1176,6 +1176,37 @@ func TestRowsDataExtraData(t *testing.T) {
11761176
}
11771177
}
11781178

1179+
func TestRowsEventType(t *testing.T) {
1180+
testcases := []struct {
1181+
eventType EventType
1182+
want EnumRowsEventType
1183+
}{
1184+
{WRITE_ROWS_EVENTv0, EnumRowsEventTypeInsert},
1185+
{WRITE_ROWS_EVENTv1, EnumRowsEventTypeInsert},
1186+
{WRITE_ROWS_EVENTv2, EnumRowsEventTypeInsert},
1187+
{MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeInsert},
1188+
{UPDATE_ROWS_EVENTv0, EnumRowsEventTypeUpdate},
1189+
{UPDATE_ROWS_EVENTv1, EnumRowsEventTypeUpdate},
1190+
{UPDATE_ROWS_EVENTv2, EnumRowsEventTypeUpdate},
1191+
{MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeUpdate},
1192+
{DELETE_ROWS_EVENTv0, EnumRowsEventTypeDelete},
1193+
{DELETE_ROWS_EVENTv1, EnumRowsEventTypeDelete},
1194+
{DELETE_ROWS_EVENTv2, EnumRowsEventTypeDelete},
1195+
{MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1, EnumRowsEventTypeDelete},
1196+
1197+
// Whoops, these are not rows events at all
1198+
{EXEC_LOAD_EVENT, EnumRowsEventTypeUnknown},
1199+
{HEARTBEAT_EVENT, EnumRowsEventTypeUnknown},
1200+
}
1201+
1202+
for _, tc := range testcases {
1203+
rev := new(RowsEvent)
1204+
rev.eventType = tc.eventType
1205+
1206+
require.Equal(t, tc.want, rev.Type())
1207+
}
1208+
}
1209+
11791210
func TestTableMapHelperMaps(t *testing.T) {
11801211
/*
11811212
CREATE TABLE `_types` (

replication/transaction_payload_event_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func TestTransactionPayloadEventDecode(t *testing.T) {
6969
}
7070
err := e.decodePayload()
7171
require.NoError(t, err)
72+
73+
// Check raw events
7274
require.Len(t, e.Events, 8)
7375
require.Equal(t, QUERY_EVENT, e.Events[0].Header.EventType)
7476
require.Equal(t, TABLE_MAP_EVENT, e.Events[1].Header.EventType)
@@ -78,4 +80,17 @@ func TestTransactionPayloadEventDecode(t *testing.T) {
7880
require.Equal(t, TABLE_MAP_EVENT, e.Events[5].Header.EventType)
7981
require.Equal(t, DELETE_ROWS_EVENTv2, e.Events[6].Header.EventType)
8082
require.Equal(t, XID_EVENT, e.Events[7].Header.EventType)
83+
84+
// Check insert/update/delete rows events casting
85+
ievent, ok := e.Events[2].Event.(*RowsEvent)
86+
require.True(t, ok)
87+
require.Equal(t, ievent.Type(), EnumRowsEventTypeInsert)
88+
89+
uevent, ok := e.Events[4].Event.(*RowsEvent)
90+
require.True(t, ok)
91+
require.Equal(t, uevent.Type(), EnumRowsEventTypeUpdate)
92+
93+
devent, ok := e.Events[6].Event.(*RowsEvent)
94+
require.True(t, ok)
95+
require.Equal(t, devent.Type(), EnumRowsEventTypeDelete)
8196
}

0 commit comments

Comments
 (0)