forked from krowinski/php-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathresuming.php
125 lines (104 loc) · 3.45 KB
/
resuming.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?php
declare(strict_types=1);
namespace example;
error_reporting(E_ALL);
date_default_timezone_set('UTC');
include __DIR__ . '/../vendor/autoload.php';
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use MySQLReplication\BinLog\BinLogCurrent;
use MySQLReplication\Config\ConfigBuilder;
use MySQLReplication\Event\DTO\EventDTO;
use MySQLReplication\Event\EventSubscribers;
use MySQLReplication\MySQLReplicationFactory;
$logger = new Logger("app", [new StreamHandler(STDOUT)]);
BinLogBootstrap::clear();
/**
* Your db configuration @see ConfigBuilder for more options
*/
$binLogStream = new MySQLReplicationFactory(
BinLogBootstrap::startFromPosition(new ConfigBuilder())
->withUser('root')
->withHost('127.0.0.1')
->withPort(3306)
->withPassword('root')
->withRetry(3)
->build(),
null, null, null, null,
$logger
);
/**
* Class BenchmarkEventSubscribers
* @package example
*/
class MyEventSubscribers extends EventSubscribers
{
/**
* @param EventDTO $event (your own handler more in EventSubscribers class )
*/
public function allEvents(EventDTO $event): void
{
// all events got __toString() implementation
echo $event;
// all events got JsonSerializable implementation
//echo json_encode($event, JSON_PRETTY_PRINT);
echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
// save event for resuming it later
BinLogBootstrap::save($event->getEventInfo()->getBinLogCurrent());
}
}
/**
* Class SaveBinLogPos
* @package example
*/
class BinLogBootstrap
{
/**
* @var string
*/
private static $fileAndPath;
/**
* @return string
*/
private static function getFileAndPath(): string
{
if (null === self::$fileAndPath) {
self::$fileAndPath = '/tmp/bin-log-replicator-last-position';
}
return self::$fileAndPath;
}
public static function clear() {
file_exists(self::getFileAndPath()) && unlink(self::getFileAndPath());
}
/**
* @param BinLogCurrent $binLogCurrent
*/
public static function save(BinLogCurrent $binLogCurrent): void
{
echo 'saving file:' . $binLogCurrent->getBinFileName() . ', position:' . $binLogCurrent->getBinLogPosition() . ' bin log position' . PHP_EOL;
// can be redis/nosql/file - something fast!
// to speed up you can save every xxx time
// you can also use signal handler for ctrl + c exiting script to wait for last event
file_put_contents(self::getFileAndPath(), serialize($binLogCurrent));
}
/**
* @param ConfigBuilder $builder
* @return ConfigBuilder
*/
public static function startFromPosition(ConfigBuilder $builder): ConfigBuilder
{
if (!is_file(self::getFileAndPath())) {
return $builder;
}
/** @var BinLogCurrent $binLogCurrent */
$binLogCurrent = unserialize(file_get_contents(self::getFileAndPath()));
echo 'starting from file:' . $binLogCurrent->getBinFileName() . ', position:' . $binLogCurrent->getBinLogPosition() . ' bin log position' . PHP_EOL;
return $builder
->withBinLogFileName($binLogCurrent->getBinFileName())
->withBinLogPosition($binLogCurrent->getBinLogPosition());
}
}
// register your events handler here
$binLogStream->registerSubscriber(new MyEventSubscribers());
// start consuming events
$binLogStream->run();