|
28 | 28 | import java.io.StringReader;
|
29 | 29 | import java.io.StringWriter;
|
30 | 30 | import java.io.Writer;
|
| 31 | +import java.lang.reflect.InvocationTargetException; |
31 | 32 | import java.net.DatagramPacket;
|
32 | 33 | import java.net.DatagramSocket;
|
33 | 34 | import java.net.InetAddress;
|
@@ -132,7 +133,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
|
132 | 133 | // of updates; see the implementation comment at setLastSeenQuorumVerifier().
|
133 | 134 | private AtomicReference<QuorumCnxManager> qcmRef = new AtomicReference<>();
|
134 | 135 |
|
| 136 | + /** Class name to instantiate for SSL QuorumAuthServer */ |
135 | 137 | QuorumAuthServer authServer;
|
| 138 | + /** Class name to instantiate for SSL QuorumAuthLearner */ |
136 | 139 | QuorumAuthLearner authLearner;
|
137 | 140 |
|
138 | 141 | /**
|
@@ -693,6 +696,10 @@ void setId(long id) {
|
693 | 696 | }
|
694 | 697 |
|
695 | 698 | private boolean sslQuorum;
|
| 699 | + |
| 700 | + private String sslAuthServerProvider; |
| 701 | + private String sslAuthLearnerProvider; |
| 702 | + |
696 | 703 | private boolean shouldUsePortUnification;
|
697 | 704 |
|
698 | 705 | public boolean isSslQuorum() {
|
@@ -1176,12 +1183,98 @@ public void initialize() throws SaslException {
|
1176 | 1183 | }
|
1177 | 1184 | authServer = new SaslQuorumAuthServer(isQuorumServerSaslAuthRequired(), quorumServerLoginContext, authzHosts);
|
1178 | 1185 | authLearner = new SaslQuorumAuthLearner(isQuorumLearnerSaslAuthRequired(), quorumServicePrincipal, quorumLearnerLoginContext);
|
| 1186 | + } else if (isSslQuorum()) { |
| 1187 | + try { |
| 1188 | + authServer = getSslQuorumAuthServer(); |
| 1189 | + authLearner = getSslQuorumAuthLearner(); |
| 1190 | + } catch (Exception e) { |
| 1191 | + LOG.error(e.getMessage(), e); |
| 1192 | + throw new SaslException(e.getMessage()); |
| 1193 | + } |
1179 | 1194 | } else {
|
1180 | 1195 | authServer = new NullQuorumAuthServer();
|
1181 | 1196 | authLearner = new NullQuorumAuthLearner();
|
1182 | 1197 | }
|
1183 | 1198 | }
|
1184 | 1199 |
|
| 1200 | + /** |
| 1201 | + * Instantiates and returns the configured SSL QuorumAuthServer implementation. |
| 1202 | + * <p> |
| 1203 | + * Reads the class name from the {@code sslAuthServerProvider} property. If |
| 1204 | + * no provider is configured, falls back to {@link NullQuorumAuthServer}. |
| 1205 | + * </p> |
| 1206 | + * |
| 1207 | + * @return an instance of {@link QuorumAuthServer}, or {@link NullQuorumAuthServer} |
| 1208 | + * if no provider is defined |
| 1209 | + * @throws SaslException if the configured class cannot be found, instantiated, |
| 1210 | + * or does not implement {@link QuorumAuthServer} |
| 1211 | + */ |
| 1212 | + private QuorumAuthServer getSslQuorumAuthServer() throws SaslException { |
| 1213 | + if (sslAuthServerProvider == null) { |
| 1214 | + LOG.info("sslAuthServerProvider not defined; using NullQuorumAuthServer"); |
| 1215 | + return new NullQuorumAuthServer(); |
| 1216 | + } |
| 1217 | + try { |
| 1218 | + Class<?> cls = Class.forName(sslAuthServerProvider); |
| 1219 | + Object inst = cls.getDeclaredConstructor().newInstance(); |
| 1220 | + if (!(inst instanceof QuorumAuthServer)) { |
| 1221 | + throw new SaslException( |
| 1222 | + sslAuthServerProvider + " does not implement QuorumAuthServer"); |
| 1223 | + } |
| 1224 | + return (QuorumAuthServer) inst; |
| 1225 | + |
| 1226 | + } catch (ClassNotFoundException e) { |
| 1227 | + throw new SaslException( |
| 1228 | + "SSL auth server provider class not found: " + sslAuthServerProvider, e); |
| 1229 | + } catch (NoSuchMethodException | InstantiationException |
| 1230 | + | IllegalAccessException | InvocationTargetException e) { |
| 1231 | + throw new SaslException( |
| 1232 | + "Failed to instantiate SSL auth server provider: " + sslAuthServerProvider, e); |
| 1233 | + } catch (ClassCastException e) { |
| 1234 | + throw new SaslException( |
| 1235 | + "Configured class is not a QuorumAuthServer: " + sslAuthServerProvider, e); |
| 1236 | + } |
| 1237 | + } |
| 1238 | + |
| 1239 | + /** |
| 1240 | + * Instantiates and returns the configured SSL QuorumAuthLearner implementation. |
| 1241 | + * <p> |
| 1242 | + * Reads the class name from the {@code sslAuthLearnerProvider} property. If |
| 1243 | + * no provider is configured, falls back to {@link NullQuorumAuthLearner}. |
| 1244 | + * </p> |
| 1245 | + * |
| 1246 | + * @return an instance of {@link QuorumAuthLearner}, or {@link NullQuorumAuthLearner} |
| 1247 | + * if no provider is defined |
| 1248 | + * @throws SaslException if the configured class cannot be found, instantiated, |
| 1249 | + * or does not implement {@link QuorumAuthLearner} |
| 1250 | + */ |
| 1251 | + private QuorumAuthLearner getSslQuorumAuthLearner() throws SaslException { |
| 1252 | + if (sslAuthLearnerProvider == null) { |
| 1253 | + LOG.info("sslAuthLearnerProvider not defined; using NullQuorumAuthLearner"); |
| 1254 | + return new NullQuorumAuthLearner(); |
| 1255 | + } |
| 1256 | + try { |
| 1257 | + Class<?> cls = Class.forName(sslAuthLearnerProvider); |
| 1258 | + Object inst = cls.getDeclaredConstructor().newInstance(); |
| 1259 | + if (!(inst instanceof QuorumAuthLearner)) { |
| 1260 | + throw new SaslException( |
| 1261 | + sslAuthLearnerProvider + " does not implement QuorumAuthLearner"); |
| 1262 | + } |
| 1263 | + return (QuorumAuthLearner) inst; |
| 1264 | + |
| 1265 | + } catch (ClassNotFoundException e) { |
| 1266 | + throw new SaslException( |
| 1267 | + "SSL auth learner provider class not found: " + sslAuthLearnerProvider, e); |
| 1268 | + } catch (NoSuchMethodException | InstantiationException |
| 1269 | + | IllegalAccessException | InvocationTargetException e) { |
| 1270 | + throw new SaslException( |
| 1271 | + "Failed to instantiate SSL auth learner provider: " + sslAuthLearnerProvider, e); |
| 1272 | + } catch (ClassCastException e) { |
| 1273 | + throw new SaslException( |
| 1274 | + "Configured class is not a QuorumAuthLearner: " + sslAuthLearnerProvider, e); |
| 1275 | + } |
| 1276 | + } |
| 1277 | + |
1185 | 1278 | QuorumStats quorumStats() {
|
1186 | 1279 | return quorumStats;
|
1187 | 1280 | }
|
@@ -2190,6 +2283,14 @@ public void setSslQuorum(boolean sslQuorum) {
|
2190 | 2283 | this.sslQuorum = sslQuorum;
|
2191 | 2284 | }
|
2192 | 2285 |
|
| 2286 | + public void setSslAuthServerProvider(String sslAuthServerProvider) { |
| 2287 | + this.sslAuthServerProvider = sslAuthServerProvider; |
| 2288 | + } |
| 2289 | + |
| 2290 | + public void setSslAuthLearnerProvider(String sslAuthLearnerProvider) { |
| 2291 | + this.sslAuthLearnerProvider = sslAuthLearnerProvider; |
| 2292 | + } |
| 2293 | + |
2193 | 2294 | public void setUsePortUnification(boolean shouldUsePortUnification) {
|
2194 | 2295 | LOG.info("Port unification {}", shouldUsePortUnification ? "enabled" : "disabled");
|
2195 | 2296 | this.shouldUsePortUnification = shouldUsePortUnification;
|
@@ -2740,6 +2841,14 @@ boolean isQuorumSaslAuthEnabled() {
|
2740 | 2841 | return quorumSaslEnableAuth;
|
2741 | 2842 | }
|
2742 | 2843 |
|
| 2844 | + public QuorumAuthServer getQuorumAuthServer() { |
| 2845 | + return authServer; |
| 2846 | + } |
| 2847 | + |
| 2848 | + public QuorumAuthLearner getQuorumAuthLearner() { |
| 2849 | + return authLearner; |
| 2850 | + } |
| 2851 | + |
2743 | 2852 | private boolean isQuorumServerSaslAuthRequired() {
|
2744 | 2853 | return quorumServerSaslAuthRequired;
|
2745 | 2854 | }
|
|
0 commit comments