forked from krowinski/php-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathEvent.php
147 lines (129 loc) · 6.03 KB
/
Event.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
<?php
declare(strict_types=1);
namespace MySQLReplication\Event;
use MySQLReplication\BinaryDataReader\BinaryDataReader;
use MySQLReplication\BinaryDataReader\BinaryDataReaderException;
use MySQLReplication\BinLog\BinLogException;
use MySQLReplication\BinLog\BinLogSocketConnect;
use MySQLReplication\Config\Config;
use MySQLReplication\Definitions\ConstEventType;
use MySQLReplication\Event\DTO\EventDTO;
use MySQLReplication\Event\DTO\FormatDescriptionEventDTO;
use MySQLReplication\Event\DTO\HeartbeatDTO;
use MySQLReplication\Event\DTO\QueryDTO;
use MySQLReplication\Event\RowEvent\RowEventFactory;
use MySQLReplication\Exception\MySQLReplicationException;
use MySQLReplication\JsonBinaryDecoder\JsonBinaryDecoderException;
use MySQLReplication\Socket\SocketException;
use Psr\SimpleCache\CacheInterface;
use Psr\SimpleCache\InvalidArgumentException;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
class Event
{
private const MARIADB_DUMMY_QUERY = '# Dum';
private const EOF_HEADER_VALUE = 254;
private $binLogSocketConnect;
private $rowEventFactory;
private $eventDispatcher;
private $cache;
/**
* @var Config
*/
private $config;
public function __construct(
Config $config,
BinLogSocketConnect $binLogSocketConnect,
RowEventFactory $rowEventFactory,
EventDispatcherInterface $eventDispatcher,
CacheInterface $cache
) {
$this->config = $config;
$this->binLogSocketConnect = $binLogSocketConnect;
$this->rowEventFactory = $rowEventFactory;
$this->eventDispatcher = $eventDispatcher;
$this->cache = $cache;
}
public function connect(): void
{
$this->binLogSocketConnect->connect();
}
/**
* @throws BinaryDataReaderException
* @throws BinLogException
* @throws MySQLReplicationException
* @throws JsonBinaryDecoderException
* @throws InvalidArgumentException
* @throws SocketException
*/
public function consume(): void
{
$binLogServerInfo = $this->binLogSocketConnect->getBinLogServerInfo();
$binaryDataReader = new BinaryDataReader($this->binLogSocketConnect->getResponse());
// check EOF_Packet -> https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
if (self::EOF_HEADER_VALUE === $binaryDataReader->readUInt8()) {
return;
}
// decode all events data
$eventInfo = $this->createEventInfo($binaryDataReader);
$eventDTO = null;
// we always need this events to clean table maps and for BinLogCurrent class to keep track of binlog position
// always parse table map event but propagate when needed (we need this for creating table cache)
if (ConstEventType::TABLE_MAP_EVENT === $eventInfo->getType()) {
$eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeTableMapDTO();
} else if (ConstEventType::ROTATE_EVENT === $eventInfo->getType()) {
$this->cache->clear();
$eventDTO = (new RotateEvent($binLogServerInfo, $eventInfo, $binaryDataReader))->makeRotateEventDTO();
} else if (ConstEventType::GTID_LOG_EVENT === $eventInfo->getType()) {
$eventDTO = (new GtidEvent($eventInfo, $binaryDataReader))->makeGTIDLogDTO();
} else if (ConstEventType::HEARTBEAT_LOG_EVENT === $eventInfo->getType()) {
$eventDTO = new HeartbeatDTO($eventInfo);
} else if (ConstEventType::MARIA_GTID_EVENT === $eventInfo->getType()) {
$eventDTO = (new MariaDbGtidEvent($eventInfo, $binaryDataReader))->makeMariaDbGTIDLogDTO();
}
// check for ignore and permitted events
if (!$this->config->checkEvent($eventInfo->getType())) {
return;
}
if (in_array($eventInfo->getType(), [ConstEventType::UPDATE_ROWS_EVENT_V1, ConstEventType::UPDATE_ROWS_EVENT_V2], true)) {
$eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeUpdateRowsDTO();
} else if (in_array($eventInfo->getType(), [ConstEventType::WRITE_ROWS_EVENT_V1, ConstEventType::WRITE_ROWS_EVENT_V2], true)) {
$eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeWriteRowsDTO();
} else if (in_array($eventInfo->getType(), [ConstEventType::DELETE_ROWS_EVENT_V1, ConstEventType::DELETE_ROWS_EVENT_V2], true)) {
$eventDTO = $this->rowEventFactory->makeRowEvent($binaryDataReader, $eventInfo)->makeDeleteRowsDTO();
} else if (ConstEventType::XID_EVENT === $eventInfo->getType()) {
$eventDTO = (new XidEvent($eventInfo, $binaryDataReader))->makeXidDTO();
} else if (ConstEventType::QUERY_EVENT === $eventInfo->getType()) {
$eventDTO = $this->filterDummyMariaDbEvents((new QueryEvent($eventInfo, $binaryDataReader))->makeQueryDTO());
} else if (ConstEventType::FORMAT_DESCRIPTION_EVENT === $eventInfo->getType()) {
$eventDTO = new FormatDescriptionEventDTO($eventInfo);
}
$this->dispatch($eventDTO);
}
private function createEventInfo(BinaryDataReader $binaryDataReader): EventInfo
{
return new EventInfo(
$binaryDataReader->readInt32(),
$binaryDataReader->readUInt8(),
$binaryDataReader->readInt32(),
$binaryDataReader->readInt32(),
$binaryDataReader->readInt32(),
$binaryDataReader->readUInt16(),
$this->binLogSocketConnect->getCheckSum(),
$this->binLogSocketConnect->getBinLogCurrent()
);
}
private function filterDummyMariaDbEvents(QueryDTO $queryDTO): ?QueryDTO
{
if ($this->binLogSocketConnect->getBinLogServerInfo()->isMariaDb() &&
false !== strpos($queryDTO->getQuery(), self::MARIADB_DUMMY_QUERY)) {
return null;
}
return $queryDTO;
}
private function dispatch(EventDTO $eventDTO = null): void
{
if (null !== $eventDTO) {
$this->eventDispatcher->dispatch($eventDTO, $eventDTO->getType());
}
}
}