Skip to content

Commit 32dc4e2

Browse files
author
Tanuj Khurana
committed
PHOENIX-7602 Replication Log Writer (Store and Forward mode)
This patch implements the replication mode transitions and log forwarding from fallback cluster to the standby cluster.
1 parent fb204f7 commit 32dc4e2

31 files changed

+2850
-1511
lines changed

phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/FailoverPhoenixConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,7 @@ public int getNetworkTimeout() throws SQLException {
662662
* @return the currently wrapped connection.
663663
*/
664664
@VisibleForTesting
665-
PhoenixConnection getWrappedConnection() {
665+
public PhoenixConnection getWrappedConnection() {
666666
return connection;
667667
}
668668

phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ public static HAGroupStoreManager getInstanceForZkUrl(final Configuration conf,
185185
}
186186

187187
@VisibleForTesting
188-
HAGroupStoreManager(final Configuration conf) {
188+
protected HAGroupStoreManager(final Configuration conf) {
189189
this(conf, getLocalZkUrl(conf));
190190
}
191191

phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,6 +1459,7 @@ public static boolean areSeparatorBytesForVarBinaryEncoded(byte[] bytes, int off
14591459
* SYSTEM.TRANSFORM
14601460
* SYSTEM.CDC_STREAM_STATUS
14611461
* SYSTEM.CDC_STREAM
1462+
* SYSTEM.HA_GROUP
14621463
* For SYSTEM.CATALOG and SYSTEM.CHILD_LINK we only replicate rows with tenant information.
14631464
* Non tenant (Global) rows are assumed to be executed by an admin or an admin process in each
14641465
* cluster separately and thus not replicated.

phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java

Lines changed: 99 additions & 41 deletions
Large diffs are not rendered by default.

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java

Lines changed: 471 additions & 0 deletions
Large diffs are not rendered by default.

phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ public void start() throws IOException {
157157
/**
158158
* Stops the replication log discovery service by shutting down the scheduler gracefully.
159159
* Waits for the configured shutdown timeout before forcing shutdown if necessary.
160-
* @throws IOException if there's an error during shutdown
161160
*/
162-
public void stop() throws IOException {
161+
public void stop() {
163162
ScheduledExecutorService schedulerToShutdown;
164163

165164
synchronized (this) {
@@ -204,6 +203,7 @@ public void stop() throws IOException {
204203
*/
205204
public void replay() throws IOException {
206205
Optional<ReplicationRound> optionalNextRound = getNextRoundToProcess();
206+
LOG.info("replay round={}", optionalNextRound.isPresent());
207207
while (optionalNextRound.isPresent()) {
208208
ReplicationRound replicationRound = optionalNextRound.get();
209209
try {
@@ -216,8 +216,18 @@ public void replay() throws IOException {
216216
setLastRoundProcessed(replicationRound);
217217
optionalNextRound = getNextRoundToProcess();
218218
}
219+
if (!optionalNextRound.isPresent()) {
220+
// no more rounds to process
221+
processNoMoreRoundsLeft();
222+
}
219223
}
220224

225+
/**
226+
* Individual implementations can take specific actions when there are no
227+
* more rounds ready to process.
228+
*/
229+
protected void processNoMoreRoundsLeft() throws IOException {}
230+
221231
/**
222232
* Returns the next replication round to process based on lastRoundProcessed.
223233
* Ensures sufficient time (round duration + buffer) has elapsed before returning the next
@@ -228,6 +238,7 @@ public void replay() throws IOException {
228238
protected Optional<ReplicationRound> getNextRoundToProcess() {
229239
long lastRoundEndTimestamp = getLastRoundProcessed().getEndTime();
230240
long currentTime = EnvironmentEdgeManager.currentTime();
241+
LOG.info("last={} current={}", lastRoundEndTimestamp, currentTime);
231242
if (currentTime - lastRoundEndTimestamp < roundTimeMills + bufferMillis) {
232243
// nothing more to process
233244
return Optional.empty();
@@ -381,7 +392,7 @@ protected void initializeLastRoundProcessed() throws IOException {
381392
LOG.info("Initializing lastRoundProcessed from current time {}", currentTime);
382393
this.lastRoundProcessed = replicationLogTracker
383394
.getReplicationShardDirectoryManager()
384-
.getReplicationRoundFromEndTime(EnvironmentEdgeManager.currentTime());
395+
.getReplicationRoundFromEndTime(currentTime);
385396
}
386397
}
387398
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package org.apache.phoenix.replication;
2+
3+
import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
4+
import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
5+
import static org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD;
6+
7+
import java.io.IOException;
8+
9+
import org.apache.hadoop.fs.FileStatus;
10+
import org.apache.hadoop.fs.FileSystem;
11+
import org.apache.hadoop.fs.FileUtil;
12+
import org.apache.hadoop.fs.Path;
13+
import org.apache.phoenix.jdbc.ClusterType;
14+
import org.apache.phoenix.jdbc.HAGroupStateListener;
15+
import org.apache.phoenix.jdbc.HAGroupStoreManager;
16+
import org.apache.phoenix.jdbc.HAGroupStoreRecord;
17+
import org.apache.phoenix.replication.metrics.MetricsReplicationLogDiscovery;
18+
import org.apache.phoenix.replication.metrics.MetricsReplicationLogForwarderSourceFactory;
19+
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
20+
import org.apache.phoenix.util.EnvironmentEdgeManager;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* ReplicationLogDiscoveryForwarder manages the forwarding of the replication log
26+
* from the fallback cluster to the remote cluster.
27+
*/
28+
public class ReplicationLogDiscoveryForwarder extends ReplicationLogDiscovery {
29+
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogDiscoveryForwarder.class);
30+
31+
private final ReplicationLogGroup logGroup;
32+
33+
/**
34+
* Create a tracker for the replication logs in the fallback cluster.
35+
*
36+
* @param logGroup HAGroup
37+
* @return ReplicationLogTracker
38+
*/
39+
private static ReplicationLogTracker createLogTracker(ReplicationLogGroup logGroup) {
40+
ReplicationShardDirectoryManager localShardManager = logGroup.getFallbackShardManager();
41+
return new ReplicationLogTracker(
42+
logGroup.conf,
43+
logGroup.getHAGroupName(),
44+
localShardManager,
45+
MetricsReplicationLogForwarderSourceFactory.
46+
getInstanceForTracker(logGroup.getHAGroupName()));
47+
}
48+
49+
public ReplicationLogDiscoveryForwarder(ReplicationLogGroup logGroup) {
50+
super(createLogTracker(logGroup));
51+
this.logGroup = logGroup;
52+
}
53+
54+
@Override
55+
public String getExecutorThreadNameFormat() {
56+
return "ReplicationLogDiscoveryForwarder-" + logGroup.getHAGroupName() + "-%d";
57+
}
58+
59+
public void init() throws IOException {
60+
replicationLogTracker.init();
61+
// Initialize the discovery only. Forwarding begins only when we switch to the
62+
// STORE_AND_FORWARD mode or SYNC_AND_FORWARD mode.
63+
super.init();
64+
65+
// Set up a listener to the ACTIVE_NOT_IN_SYNC state. This is needed because whenever any
66+
// RS switches to STORE_AND_FORWARD mode, other RS's in the cluster must move out of SYNC
67+
// mode.
68+
HAGroupStateListener activeNotInSync = (groupName,
69+
fromState,
70+
toState,
71+
modifiedTime,
72+
clusterType,
73+
lastSyncStateTimeInMs) -> {
74+
if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC.equals(toState)) {
75+
LOG.info("Received ACTIVE_NOT_IN_SYNC event for {}", logGroup);
76+
// If the current mode is SYNC only then switch to SYNC_AND_FORWARD mode
77+
if (logGroup.checkAndSetMode(SYNC, SYNC_AND_FORWARD)) {
78+
// replication mode switched, notify the event handler
79+
try {
80+
logGroup.sync();
81+
} catch (IOException e) {
82+
LOG.info("Failed to send sync event to {}", logGroup);
83+
}
84+
}
85+
}
86+
};
87+
88+
// Set up a listener to the ACTIVE_IN_SYNC state. This is needed because when the RS
89+
// switches back to SYNC mode, the other RS's in the cluster must move out of
90+
// SYNC_AND_FORWARD mode to SYNC mode.
91+
HAGroupStateListener activeInSync = (groupName,
92+
fromState,
93+
toState,
94+
modifiedTime,
95+
clusterType,
96+
lastSyncStateTimeInMs) -> {
97+
if (clusterType == ClusterType.LOCAL && HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC.equals(toState)) {
98+
LOG.info("Received ACTIVE_IN_SYNC event for {}", logGroup);
99+
// Set the current mode to SYNC
100+
if (logGroup.checkAndSetMode(SYNC_AND_FORWARD, SYNC)) {
101+
// replication mode switched, notify the event handler
102+
try {
103+
logGroup.sync();
104+
} catch (IOException e) {
105+
LOG.info("Failed to send sync event to {}", logGroup);
106+
}
107+
}
108+
}
109+
};
110+
111+
HAGroupStoreManager haGroupStoreManager = logGroup.getHAGroupStoreManager();
112+
haGroupStoreManager.subscribeToTargetState(logGroup.getHAGroupName(),
113+
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.LOCAL, activeNotInSync);
114+
haGroupStoreManager.subscribeToTargetState(logGroup.getHAGroupName(),
115+
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, activeInSync);
116+
}
117+
118+
@Override
119+
protected void processFile(Path src) throws IOException {
120+
FileSystem srcFS = replicationLogTracker.getFileSystem();
121+
FileStatus srcStat = srcFS.getFileStatus(src);
122+
long ts = EnvironmentEdgeManager.currentTimeMillis();
123+
ReplicationShardDirectoryManager remoteShardManager = logGroup.getStandbyShardManager();
124+
Path dst = remoteShardManager.getWriterPath(ts, logGroup.getServerName().getServerName());
125+
long startTime = EnvironmentEdgeManager.currentTimeMillis();
126+
FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, false, false, conf);
127+
// successfully copied the file
128+
long endTime = EnvironmentEdgeManager.currentTimeMillis();
129+
long copyTime = endTime - startTime;
130+
LOG.info("Copying file src={} dst={} size={} took {}ms", src, dst, srcStat.getLen(), copyTime);
131+
if (logGroup.getMode() == STORE_AND_FORWARD &&
132+
isLogCopyThroughputAboveThreshold(srcStat.getLen(), copyTime)) {
133+
// start recovery by switching to SYNC_AND_FORWARD
134+
if (logGroup.checkAndSetMode(STORE_AND_FORWARD, SYNC_AND_FORWARD)) {
135+
// replication mode switched, notify the event handler
136+
try {
137+
logGroup.sync();
138+
} catch (IOException e) {
139+
LOG.info("Failed to send sync event to {}", logGroup);
140+
}
141+
}
142+
}
143+
}
144+
145+
@Override
146+
protected void processNoMoreRoundsLeft() throws IOException {
147+
// check if we are caught up so that we can transition to SYNC state
148+
// we are caught up when there are no files currently in the out progress directory
149+
// and no new files exist for ongoing round
150+
if (replicationLogTracker.getInProgressFiles().isEmpty()
151+
&& replicationLogTracker.getNewFilesForRound(replicationLogTracker
152+
.getReplicationShardDirectoryManager()
153+
.getNextRound(getLastRoundProcessed())).isEmpty()) {
154+
LOG.info("Processed all the replication log files for {}", logGroup);
155+
// TODO ensure the mTime on the group store record is older than the wait sync timeout
156+
logGroup.setHAGroupStatusToSync();
157+
}
158+
}
159+
160+
/**
161+
* Determine if the throughput is above the configured threshold. If it is, then we can switch
162+
* to the SYNC_AND_FORWARD mode
163+
*
164+
* @param fileSize
165+
* @param copyTime
166+
* @return True if the throughput is good else false
167+
*/
168+
private boolean isLogCopyThroughputAboveThreshold(long fileSize, long copyTime) {
169+
// TODO: calculate throughput and check if is above configured threshold
170+
return true;
171+
}
172+
173+
@Override
174+
protected MetricsReplicationLogDiscovery createMetricsSource() {
175+
return MetricsReplicationLogForwarderSourceFactory.
176+
getInstanceForDiscovery(logGroup.getHAGroupName());
177+
}
178+
179+
@VisibleForTesting
180+
protected ReplicationLogTracker getReplicationLogTracker() {
181+
return replicationLogTracker;
182+
}
183+
}

0 commit comments

Comments
 (0)