Skip to content

Commit a1d683a

Browse files
paderlolchenhao26-nineteenOliverwqcwrw
authored
0.4.9-pre (#325)
* Feat/sync support2.x#mutiple thread sync02 (#304) * update port * Multithreading sync * solve conflict * imple SyncService * adapter deregister * optimization some code * fix deregister instance equals logic Co-authored-by: Oliver <[email protected]> Co-authored-by: paderlol <[email protected]> * Optimize the code for assigning tasks. (#320) * Develop (#321) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix #305 (#322) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code (#323) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code. * Refactoring the Nacos Sync to Consul Logic (#324) * Optimize the code for assigning tasks. * Adds prefix to the input string if it doesn't already have it.#308 * Fix .#305 * Fix cyclic dependency code. * Refactoring the Nacos Sync to Consul Logic. --------- Co-authored-by: chenhao26 <[email protected]> Co-authored-by: Oliver <[email protected]>
1 parent 61d3476 commit a1d683a

36 files changed

+1337
-262
lines changed

nacossync-distribution/bin/nacosSync.sql

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ CREATE TABLE `cluster` (
1010
`connect_key_list` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
1111
`user_name` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
1212
`password` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
13+
`namespace` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
14+
`cluster_level` int default 0,
1315
PRIMARY KEY (`id`)
1416
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
1517
/******************************************/
@@ -39,5 +41,6 @@ CREATE TABLE `task` (
3941
`task_status` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
4042
`version` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
4143
`worker_ip` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
44+
`status` int default null ,
4245
PRIMARY KEY (`id`)
4346
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

nacossync-worker/pom.xml

+12
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@
7171
<groupId>io.springfox</groupId>
7272
<artifactId>springfox-swagger-ui</artifactId>
7373
</dependency>
74+
<dependency>
75+
<groupId>org.springframework.boot</groupId>
76+
<artifactId>spring-boot-starter-test</artifactId>
77+
</dependency>
7478
<!--nacos-->
7579
<dependency>
7680
<groupId>com.alibaba.nacos</groupId>
@@ -161,6 +165,14 @@
161165
</archive>
162166
</configuration>
163167
</plugin>
168+
<plugin>
169+
<groupId>org.apache.maven.plugins</groupId>
170+
<artifactId>maven-compiler-plugin</artifactId>
171+
<configuration>
172+
<source>9</source>
173+
<target>9</target>
174+
</configuration>
175+
</plugin>
164176
</plugins>
165177
</build>
166178
</project>

nacossync-worker/src/main/java/com/alibaba/nacossync/cache/SkyWalkerCacheServices.java

+15
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,25 @@ public FinishedTask getFinishedTask(TaskDO taskDO) {
9696

9797
return finishedTaskMap.get(operationId);
9898
}
99+
100+
public FinishedTask getFinishedTask(String operationId) {
101+
if (StringUtils.isEmpty(operationId)) {
102+
return null;
103+
}
104+
return finishedTaskMap.get(operationId);
105+
}
106+
107+
public FinishedTask removeFinishedTask(String operationId) {
108+
if (StringUtils.isEmpty(operationId)) {
109+
return null;
110+
}
111+
return finishedTaskMap.remove(operationId);
112+
}
99113

100114
public Map<String, FinishedTask> getFinishedTaskMap() {
101115

102116
return finishedTaskMap;
103117
}
118+
104119

105120
}

nacossync-worker/src/main/java/com/alibaba/nacossync/constant/SkyWalkerConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,7 @@ public class SkyWalkerConstants {
3535
public static final String GROUP_NAME_PARAM="groupNameParam";
3636
public static final String PAGE_NO="pageNo";
3737
public static final String PAGE_SIZE="pageSize";
38+
public static final String SYNC_INSTANCE_TAG="sync.instance.tag";
39+
public static final Integer MAX_THREAD_NUM = 200;
3840

3941
}

nacossync-worker/src/main/java/com/alibaba/nacossync/dao/ClusterAccessService.java

+8
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,12 @@ private List<Predicate> getPredicates(Root<ClusterDO> root, CriteriaBuilder crit
103103
predicates.add(criteriaBuilder.like(root.get("clusterName"), "%" + queryCondition.getServiceName() + "%"));
104104
return predicates;
105105
}
106+
107+
public int findClusterLevel(String sourceClusterId){
108+
ClusterDO clusterDO = clusterRepository.findByClusterId(sourceClusterId);
109+
if (clusterDO != null) {
110+
return clusterDO.getClusterLevel();
111+
}
112+
return -1;
113+
}
106114
}

nacossync-worker/src/main/java/com/alibaba/nacossync/dao/TaskAccessService.java

+4
Original file line numberDiff line numberDiff line change
@@ -114,5 +114,9 @@ private Page<TaskDO> getTaskDOS(QueryCondition queryCondition, Pageable pageable
114114

115115
}, pageable);
116116
}
117+
118+
public List<TaskDO> findServiceNameIsNull() {
119+
return taskRepository.findAllByServiceNameEquals("ALL");
120+
}
117121

118122
}

nacossync-worker/src/main/java/com/alibaba/nacossync/dao/repository/TaskRepository.java

+7
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,12 @@ public interface TaskRepository extends CrudRepository<TaskDO, Integer>, JpaRepo
4141
List<TaskDO> findAllByTaskIdIn(List<String> taskIds);
4242

4343
List<TaskDO> getAllByWorkerIp(String workerIp);
44+
45+
/**
46+
* query service is all,use ns leven sync data
47+
* @param serviceName
48+
* @return
49+
*/
50+
List<TaskDO> findAllByServiceNameEquals(String serviceName);
4451

4552
}

nacossync-worker/src/main/java/com/alibaba/nacossync/event/listener/EventListener.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void listenerSyncTaskEvent(SyncTaskEvent syncTaskEvent) {
6262

6363
try {
6464
long start = System.currentTimeMillis();
65-
if (syncManagerService.sync(syncTaskEvent.getTaskDO())) {
65+
if (syncManagerService.sync(syncTaskEvent.getTaskDO(), null)) {
6666
skyWalkerCacheServices.addFinishedTask(syncTaskEvent.getTaskDO());
6767
metricsManager.record(MetricsStatisticsType.SYNC_TASK_RT, System.currentTimeMillis() - start);
6868
} else {
@@ -88,7 +88,5 @@ public void listenerDeleteTaskEvent(DeleteTaskEvent deleteTaskEvent) {
8888
} catch (Exception e) {
8989
log.warn("listenerDeleteTaskEvent process error", e);
9090
}
91-
9291
}
93-
9492
}

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncManagerService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ public boolean delete(TaskDO taskDO) throws NacosException {
5252

5353
}
5454

55-
public boolean sync(TaskDO taskDO) {
55+
public boolean sync(TaskDO taskDO, Integer index) {
5656

57-
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO);
57+
return getSyncService(taskDO.getSourceClusterId(), taskDO.getDestClusterId()).sync(taskDO, index);
5858

5959
}
6060

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/SyncService.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,18 @@ public interface SyncService {
3535
* execute sync
3636
*
3737
* @param taskDO
38+
* @param index
3839
* @return
3940
*/
40-
boolean sync(TaskDO taskDO);
41+
boolean sync(TaskDO taskDO, Integer index);
4142

4243
/**
4344
* Determines that the current instance data is from another source cluster
4445
*/
4546
default boolean needSync(Map<String, String> sourceMetaData) {
46-
return StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
47+
boolean syncTag = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SYNC_INSTANCE_TAG));
48+
boolean blank = StringUtils.isBlank(sourceMetaData.get(SkyWalkerConstants.SOURCE_CLUSTERID_KEY));
49+
return syncTag && blank;
4750
}
4851

4952
/**

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/AbstractServerHolderImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
@Slf4j
2727
public abstract class AbstractServerHolderImpl<T> implements Holder {
2828

29-
private final Map<String, T> serviceMap = new ConcurrentHashMap<>();
29+
protected final Map<String, T> serviceMap = new ConcurrentHashMap<>();
30+
3031
@Autowired
3132
protected SkyWalkerCacheServices skyWalkerCacheServices;
3233

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/EurekaServerHolder.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
1111
* specific language governing permissions and limitations under the License.
1212
*/
13+
1314
package com.alibaba.nacossync.extension.holder;
1415

1516
import com.alibaba.nacossync.extension.eureka.EurekaNamingService;
@@ -30,12 +31,27 @@
3031
@Service
3132
@Slf4j
3233
public class EurekaServerHolder extends AbstractServerHolderImpl<EurekaNamingService> {
34+
35+
private static final String HTTP_PREFIX = "http://";
36+
37+
private static final String HTTPS_PREFIX = "https://";
38+
3339
@Override
3440
EurekaNamingService createServer(String clusterId, Supplier<String> serverAddressSupplier) throws Exception {
35-
RestTemplateTransportClientFactory restTemplateTransportClientFactory =
36-
new RestTemplateTransportClientFactory();
37-
EurekaEndpoint eurekaEndpoint = new DefaultEndpoint(serverAddressSupplier.get());
41+
RestTemplateTransportClientFactory restTemplateTransportClientFactory = new RestTemplateTransportClientFactory();
42+
EurekaEndpoint eurekaEndpoint = new DefaultEndpoint(addHttpPrefix(serverAddressSupplier.get()));
3843
EurekaHttpClient eurekaHttpClient = restTemplateTransportClientFactory.newClient(eurekaEndpoint);
3944
return new EurekaNamingService(eurekaHttpClient);
4045
}
46+
47+
public String addHttpPrefix(String input) {
48+
if (input == null || input.isEmpty()) {
49+
return input;
50+
}
51+
if (!input.startsWith(HTTP_PREFIX) && !input.startsWith(HTTPS_PREFIX)) {
52+
input = HTTP_PREFIX + input;
53+
}
54+
55+
return input;
56+
}
4157
}

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/holder/NacosServerHolder.java

+68-4
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,23 @@
1313
package com.alibaba.nacossync.extension.holder;
1414

1515
import com.alibaba.nacos.api.PropertyKeyConst;
16+
import com.alibaba.nacos.api.exception.NacosException;
1617
import com.alibaba.nacos.api.naming.NamingFactory;
1718
import com.alibaba.nacos.api.naming.NamingService;
19+
import com.alibaba.nacossync.constant.SkyWalkerConstants;
1820
import com.alibaba.nacossync.dao.ClusterAccessService;
21+
import com.alibaba.nacossync.dao.TaskAccessService;
1922
import com.alibaba.nacossync.pojo.model.ClusterDO;
23+
import com.alibaba.nacossync.pojo.model.TaskDO;
2024
import com.google.common.base.Joiner;
2125
import java.util.List;
2226
import java.util.Optional;
2327
import java.util.Properties;
28+
import java.util.concurrent.ConcurrentHashMap;
2429
import java.util.function.Supplier;
2530
import lombok.extern.slf4j.Slf4j;
2631
import org.apache.logging.log4j.util.Strings;
32+
import org.springframework.beans.factory.annotation.Autowired;
2733
import org.springframework.stereotype.Service;
2834

2935
/**
@@ -35,17 +41,30 @@
3541
public class NacosServerHolder extends AbstractServerHolderImpl<NamingService> {
3642

3743
private final ClusterAccessService clusterAccessService;
44+
45+
private final TaskAccessService taskAccessService;
46+
47+
private static ConcurrentHashMap<String,NamingService> globalNameService = new ConcurrentHashMap<>(16);
3848

39-
public NacosServerHolder(ClusterAccessService clusterAccessService) {
49+
public NacosServerHolder(ClusterAccessService clusterAccessService, TaskAccessService taskAccessService) {
4050
this.clusterAccessService = clusterAccessService;
51+
this.taskAccessService = taskAccessService;
4152
}
4253

4354
@Override
4455
NamingService createServer(String clusterId, Supplier<String> serverAddressSupplier)
4556
throws Exception {
57+
String newClusterId;
58+
if (clusterId.contains(":")) {
59+
String[] split = clusterId.split(":");
60+
newClusterId = split[1];
61+
} else {
62+
newClusterId = clusterId;
63+
}
64+
//代表此时为组合key,确定target集群中的nameService是不同的
4665
List<String> allClusterConnectKey = skyWalkerCacheServices
47-
.getAllClusterConnectKey(clusterId);
48-
ClusterDO clusterDO = clusterAccessService.findByClusterId(clusterId);
66+
.getAllClusterConnectKey(newClusterId);
67+
ClusterDO clusterDO = clusterAccessService.findByClusterId(newClusterId);
4968
String serverList = Joiner.on(",").join(allClusterConnectKey);
5069
Properties properties = new Properties();
5170
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
@@ -58,6 +77,51 @@ NamingService createServer(String clusterId, Supplier<String> serverAddressSuppl
5877
Optional.ofNullable(clusterDO.getPassword()).ifPresent(value ->
5978
properties.setProperty(PropertyKeyConst.PASSWORD, value)
6079
);
61-
return NamingFactory.createNamingService(properties);
80+
NamingService namingService = NamingFactory.createNamingService(properties);
81+
globalNameService.put(clusterId,namingService);
82+
return namingService;
83+
}
84+
85+
/**
86+
* Get NamingService for different clients
87+
* @param clusterId clusterId
88+
* @return Returns Naming Service objects for different clusters
89+
*/
90+
public NamingService getNameService(String clusterId){
91+
return globalNameService.get(clusterId);
92+
}
93+
94+
public NamingService getSourceNamingService(String taskId, String sourceClusterId) {
95+
String key = taskId + sourceClusterId;
96+
return serviceMap.computeIfAbsent(key, k->{
97+
try {
98+
log.info("Starting create source cluster server, key={}", key);
99+
//代表此时为组合key,确定target集群中的nameService是不同的
100+
List<String> allClusterConnectKey = skyWalkerCacheServices
101+
.getAllClusterConnectKey(sourceClusterId);
102+
ClusterDO clusterDO = clusterAccessService.findByClusterId(sourceClusterId);
103+
TaskDO task = taskAccessService.findByTaskId(taskId);
104+
String serverList = Joiner.on(",").join(allClusterConnectKey);
105+
Properties properties = new Properties();
106+
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
107+
properties.setProperty(PropertyKeyConst.NAMESPACE, Optional.ofNullable(clusterDO.getNamespace()).orElse(
108+
Strings.EMPTY));
109+
Optional.ofNullable(clusterDO.getUserName()).ifPresent(value ->
110+
properties.setProperty(PropertyKeyConst.USERNAME, value)
111+
);
112+
113+
Optional.ofNullable(clusterDO.getPassword()).ifPresent(value ->
114+
properties.setProperty(PropertyKeyConst.PASSWORD, value)
115+
);
116+
properties.setProperty(SkyWalkerConstants.SOURCE_CLUSTERID_KEY,task.getSourceClusterId());
117+
properties.setProperty(SkyWalkerConstants.DEST_CLUSTERID_KEY,task.getDestClusterId());
118+
return NamingFactory.createNamingService(properties);
119+
}catch (NacosException e) {
120+
log.error("start source server fail,taskId:{},sourceClusterId:{}"
121+
, taskId, sourceClusterId, e);
122+
return null;
123+
}
124+
});
125+
62126
}
63127
}

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/AbstractNacosSync.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public boolean delete(TaskDO taskDO) {
101101
}
102102

103103
@Override
104-
public boolean sync(TaskDO taskDO) {
104+
public boolean sync(TaskDO taskDO, Integer index) {
105105
String taskId = taskDO.getTaskId();
106106
try {
107107
NamingService sourceNamingService = nacosServerHolder.get(taskDO.getSourceClusterId());
@@ -128,7 +128,7 @@ public boolean sync(TaskDO taskDO) {
128128
return true;
129129
}
130130

131-
private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService) throws NacosException {
131+
private void doSync(String taskId, TaskDO taskDO, NamingService sourceNamingService) throws Exception {
132132
if (syncTaskTap.putIfAbsent(taskId, 1) != null) {
133133
log.info("任务Id:{}上一个同步任务尚未结束", taskId);
134134
return;
@@ -172,7 +172,7 @@ private void syncNewInstance(TaskDO taskDO, List<Instance> sourceInstances) thro
172172
}
173173

174174

175-
private void removeInvalidInstance(TaskDO taskDO, List<Instance> sourceInstances) throws NacosException {
175+
private void removeInvalidInstance(TaskDO taskDO, List<Instance> sourceInstances) throws Exception {
176176
String taskId = taskDO.getTaskId();
177177
if (this.sourceInstanceSnapshot.containsKey(taskId)) {
178178
Set<String> oldInstanceKeys = this.sourceInstanceSnapshot.get(taskId);
@@ -187,13 +187,23 @@ private void removeInvalidInstance(TaskDO taskDO, List<Instance> sourceInstances
187187
}
188188
}
189189

190+
@Override
191+
public boolean needDelete(Map<String, String> destMetaData, TaskDO taskDO) {
192+
return SyncService.super.needDelete(destMetaData, taskDO);
193+
}
194+
195+
@Override
196+
public boolean needSync(Map<String, String> sourceMetaData) {
197+
return SyncService.super.needSync(sourceMetaData);
198+
}
199+
190200
public abstract String composeInstanceKey(String ip, int port);
191201

192202
public abstract void register(TaskDO taskDO, Instance instance);
193203

194204
public abstract void deregisterInstance(TaskDO taskDO) throws Exception;
195205

196-
public abstract void removeInvalidInstance(TaskDO taskDO, Set<String> invalidInstanceKeys);
206+
public abstract void removeInvalidInstance(TaskDO taskDO, Set<String> invalidInstanceKeys) throws Exception;
197207

198208
public NacosServerHolder getNacosServerHolder() {
199209
return nacosServerHolder;

nacossync-worker/src/main/java/com/alibaba/nacossync/extension/impl/ConsulSyncToNacosServiceImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public boolean delete(TaskDO taskDO) {
9696
}
9797

9898
@Override
99-
public boolean sync(TaskDO taskDO) {
99+
public boolean sync(TaskDO taskDO, Integer index) {
100100
try {
101101
ConsulClient consulClient = consulServerHolder.get(taskDO.getSourceClusterId());
102102
NamingService destNamingService = nacosServerHolder.get(taskDO.getDestClusterId());
@@ -106,7 +106,7 @@ public boolean sync(TaskDO taskDO) {
106106
Set<String> instanceKeys = new HashSet<>();
107107
overrideAllInstance(taskDO, destNamingService, healthServiceList, instanceKeys);
108108
cleanAllOldInstance(taskDO, destNamingService, instanceKeys);
109-
specialSyncEventBus.subscribe(taskDO, this::sync);
109+
specialSyncEventBus.subscribe(taskDO, t->sync(t, index));
110110
} catch (Exception e) {
111111
log.error("Sync task from consul to nacos was failed, taskId:{}", taskDO.getTaskId(), e);
112112
metricsManager.recordError(MetricsStatisticsType.SYNC_ERROR);

0 commit comments

Comments
 (0)