Skip to content

Commit d61d94b

Browse files
atezs82Attila Tóth
andauthored
set authentication configuration in Pulsar client (#45) (#48)
Co-authored-by: Attila Tóth <[email protected]>
1 parent 87b1cb8 commit d61d94b

File tree

12 files changed

+294
-108
lines changed

12 files changed

+294
-108
lines changed

README.md

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ val df = spark
123123
.load()
124124
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
125125
.as[(String, String)]
126-
127126
```
127+
128128
> #### Tip
129129
> For more information on how to use other language bindings for Spark Structured Streaming,
130130
> see [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).
@@ -330,6 +330,62 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
330330

331331
</table>
332332

333+
#### Authentication
334+
Should the Pulsar cluster require authentication, credentials can be set in the following way.
335+
336+
The following examples are in Scala.
337+
```scala
338+
// Secure connection with authentication, using the same credentials on the
339+
// Pulsar client and admin interface (if not given explicitly, the client configuration
340+
// is used for admin as well).
341+
val df = spark
342+
.readStream
343+
.format("pulsar")
344+
.option("service.url", "pulsar://localhost:6650")
345+
.option("admin.url", "http://localhost:8080")
346+
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
347+
.option("pulsar.client.authParams","token:<valid client JWT token>")
348+
.option("topicsPattern", "sensitiveTopic")
349+
.load()
350+
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
351+
.as[(String, String)]
352+
// Secure connection with authentication, using different credentials for
353+
// Pulsar client and admin interfaces.
354+
val df = spark
355+
.readStream
356+
.format("pulsar")
357+
.option("service.url", "pulsar://localhost:6650")
358+
.option("admin.url", "http://localhost:8080")
359+
.option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
360+
.option("pulsar.admin.authParams","token:<valid admin JWT token>")
361+
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
362+
.option("pulsar.client.authParams","token:<valid client JWT token>")
363+
.option("topicsPattern", "sensitiveTopic")
364+
.load()
365+
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
366+
.as[(String, String)]
367+
368+
// Secure connection with client TLS enabled.
369+
// Note that the certificate file has to be present at the specified
370+
// path on every machine of the cluster!
371+
val df = spark
372+
.readStream
373+
.format("pulsar")
374+
.option("service.url", "pulsar+ssl://localhost:6651")
375+
.option("admin.url", "http://localhost:8080")
376+
.option("pulsar.admin.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
377+
.option("pulsar.admin.authParams","token:<valid admin JWT token>")
378+
.option("pulsar.client.authPluginClassName","org.apache.pulsar.client.impl.auth.AuthenticationToken")
379+
.option("pulsar.client.authParams","token:<valid client JWT token>")
380+
.option("pulsar.client.tlsTrustCertsFilePath","/path/to/tls/cert/cert.pem")
381+
.option("pulsar.client.tlsAllowInsecureConnection","false")
382+
.option("pulsar.client.tlsHostnameVerificationenable","true")
383+
.option("topicsPattern", "sensitiveTopic")
384+
.load()
385+
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
386+
.as[(String, String)]
387+
```
388+
333389
#### Schema of Pulsar source
334390
* For topics without schema or with primitive schema in Pulsar, messages' payload
335391
is loaded to a `value` column with the corresponding type with Pulsar schema.
@@ -458,7 +514,15 @@ A possible solution to remove duplicates when reading the written data could be
458514

459515
Client/producer/reader configurations of Pulsar can be set via `DataStreamReader.option`
460516
with `pulsar.client.`/`pulsar.producer.`/`pulsar.reader.` prefix, e.g,
461-
`stream.option("pulsar.reader.receiverQueueSize", "1000000")`. For possible Pulsar parameters, check docs at
517+
`stream.option("pulsar.reader.receiverQueueSize", "1000000")`.
518+
Since the connector needs to access the Pulsar Admin interface as well, separate
519+
configuration of the admin client can be set via the same method with the
520+
`pulsar.admin` prefix. For example: `stream.option("pulsar.admin.authParams","token:<token>")`.
521+
This can be useful if a different authentication plugin or
522+
token need to be used. If this is not given explicitly, the client
523+
parameters (with `pulsar.client` prefix) will be used for accessing the admin
524+
interface as well.
525+
For possible Pulsar parameters, check docs at
462526
[Pulsar client libraries](https://pulsar.apache.org/docs/en/client-libraries/).
463527

464528
## Build Spark Pulsar Connector
@@ -488,11 +552,26 @@ $ cd pulsar-spark
488552
$ mvn clean install -DskipTests
489553
```
490554

555+
If you get the following error during compilation, try running Maven with Java 8:
556+
```
557+
[ERROR] [Error] : Source option 6 is no longer supported. Use 7 or later.
558+
[ERROR] [Error] : Target option 6 is no longer supported. Use 7 or later.
559+
```
560+
491561
5. Run the tests.
492562

493563
```bash
494564
$ mvn clean install
495565
```
566+
567+
Note: by configuring `scalatest-maven-plugin` in the [usual ways](https://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin), individual tests can be executed, if that is needed:
568+
569+
```bash
570+
mvn -Dsuites=org.apache.spark.sql.pulsar.CachedPulsarClientSuite clean install
571+
```
572+
573+
This might be handy if test execution is slower, or you get a `java.io.IOException: Too many open files` exception during full suite run.
574+
496575
Once the installation is finished, there is a fat jar generated under both local maven repo and `target` directory.
497576

498577

src/main/scala/org/apache/spark/sql/pulsar/CachedPulsarClient.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import scala.util.control.NonFatal
2222
import com.google.common.cache._
2323
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
2424

25+
import org.apache.pulsar.client.api.{ClientBuilder, PulsarClient}
26+
2527
import org.apache.spark.SparkEnv
2628
import org.apache.spark.internal.Logging
29+
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH}
2730

2831
private[pulsar] object CachedPulsarClient extends Logging {
2932

@@ -62,7 +65,9 @@ private[pulsar] object CachedPulsarClient extends Logging {
6265
.removalListener(removalListener)
6366
.build[Seq[(String, Object)], Client](cacheLoader)
6467

65-
private def createPulsarClient(pulsarConf: ju.Map[String, Object]): Client = {
68+
def createPulsarClient(
69+
pulsarConf: ju.Map[String, Object],
70+
pulsarClientBuilder: ClientBuilder = PulsarClient.builder()): Client = {
6671
val pulsarServiceUrl =
6772
pulsarConf.get(PulsarOptions.SERVICE_URL_OPTION_KEY).asInstanceOf[String]
6873
val clientConf = new PulsarConfigUpdater(
@@ -72,11 +77,27 @@ private[pulsar] object CachedPulsarClient extends Logging {
7277
).rebuild()
7378
logInfo(s"Client Conf = ${clientConf}")
7479
try {
75-
val pulsarClient: Client = org.apache.pulsar.client.api.PulsarClient
76-
.builder()
80+
pulsarClientBuilder
7781
.serviceUrl(pulsarServiceUrl)
7882
.loadConf(clientConf)
79-
.build();
83+
// Set TLS and authentication parameters if they were given
84+
if (clientConf.containsKey(AUTH_PLUGIN_CLASS_NAME)) {
85+
pulsarClientBuilder.authentication(
86+
clientConf.get(AUTH_PLUGIN_CLASS_NAME).toString, clientConf.get(AUTH_PARAMS).toString)
87+
}
88+
if (clientConf.containsKey(TLS_ALLOW_INSECURE_CONNECTION)) {
89+
pulsarClientBuilder.allowTlsInsecureConnection(
90+
clientConf.get(TLS_ALLOW_INSECURE_CONNECTION).toString.toBoolean)
91+
}
92+
if (clientConf.containsKey(TLS_HOSTNAME_VERIFICATION_ENABLE)) {
93+
pulsarClientBuilder.enableTlsHostnameVerification(
94+
clientConf.get(TLS_HOSTNAME_VERIFICATION_ENABLE).toString.toBoolean)
95+
}
96+
if (clientConf.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
97+
pulsarClientBuilder.tlsTrustCertsFilePath(
98+
clientConf.get(TLS_TRUST_CERTS_FILE_PATH).toString)
99+
}
100+
val pulsarClient: Client = pulsarClientBuilder.build()
80101
logDebug(
81102
s"Created a new instance of PulsarClient for serviceUrl = $pulsarServiceUrl,"
82103
+ s" clientConf = $clientConf.")

src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigUpdater.scala

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ import org.apache.spark.internal.Logging
2525
private[pulsar] case class PulsarConfigUpdater(
2626
module: String,
2727
pulsarParams: Map[String, Object],
28-
blacklistedKeys: Set[String] = Set())
29-
extends Logging {
28+
blacklistedKeys: Set[String] = Set(),
29+
keysToHideInLog: Set[String] = Set(PulsarOptions.AUTH_PARAMS))
30+
extends Logging {
3031

3132
private val map = new ju.HashMap[String, Object](pulsarParams.asJava)
3233

@@ -36,10 +37,12 @@ private[pulsar] case class PulsarConfigUpdater(
3637

3738
def set(key: String, value: Object, map: ju.Map[String, Object]): this.type = {
3839
if (blacklistedKeys.contains(key)) {
39-
logInfo(s"$module: Skip $key")
40+
logInfo(s"$module: Skip '$key'")
4041
} else {
4142
map.put(key, value)
42-
logInfo(s"$module: Set $key to $value, earlier value: ${pulsarParams.getOrElse(key, "")}")
43+
logInfo(s"$module: Set '$key' to " +
44+
s"'${printConfigValue(key, Option(value))}'," +
45+
s" earlier value: '${printConfigValue(key, pulsarParams.get(key))}'")
4346
}
4447
this
4548
}
@@ -50,21 +53,17 @@ private[pulsar] case class PulsarConfigUpdater(
5053

5154
def setIfUnset(key: String, value: Object, map: ju.Map[String, Object]): this.type = {
5255
if (blacklistedKeys.contains(key)) {
53-
logInfo(s"$module: Skip $key")
56+
logInfo(s"$module: Skip '$key'")
5457
} else {
5558
if (!map.containsKey(key)) {
5659
map.put(key, value)
57-
logDebug(s"$module: Set $key to $value")
60+
logInfo(s"$module: Set '$key' to " +
61+
s"'${printConfigValue(key, pulsarParams.get(key))}'")
5862
}
5963
}
6064
this
6165
}
6266

63-
def setAuthenticationConfigIfNeeded(): this.type = {
64-
// FIXME: not implemented yet
65-
this
66-
}
67-
6867
def build(): ju.Map[String, Object] = map
6968

7069
def rebuild(): ju.Map[String, Object] = {
@@ -76,4 +75,21 @@ private[pulsar] case class PulsarConfigUpdater(
7675
map
7776
}
7877

78+
private val HideCompletelyLimit = 6
79+
private val ShowFractionOfHiddenValue = 1.0 / 3.0
80+
private val CompletelyHiddenMessage = "...<completely hidden>..."
81+
private def printConfigValue(key: String,
82+
maybeVal: Option[Object]): String = {
83+
val value = maybeVal.map(_.toString).getOrElse("")
84+
if (keysToHideInLog.contains(key)) {
85+
if (value.length < HideCompletelyLimit) {
86+
return CompletelyHiddenMessage
87+
} else {
88+
return s"${value.take((value.length * ShowFractionOfHiddenValue).toInt)}..."
89+
}
90+
}
91+
92+
value
93+
}
94+
7995
}

src/main/scala/org/apache/spark/sql/pulsar/PulsarContinuousReader.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class PulsarContinuousReader(
6262
val reportDataLoss = reportDataLossFunc(failOnDataLoss)
6363

6464
private var offset: Offset = _
65+
66+
private var stopped = false
6567
override def setStartOffset(start: ju.Optional[Offset]): Unit = {
6668
offset = start.orElse {
6769
val actualOffsets = SpecificPulsarOffset(
@@ -129,9 +131,12 @@ class PulsarContinuousReader(
129131
metadataReader.commitCursorToOffset(off)
130132
}
131133

132-
override def stop(): Unit = {
133-
metadataReader.removeCursor()
134-
metadataReader.close()
134+
override def stop(): Unit = synchronized {
135+
if (!stopped) {
136+
metadataReader.removeCursor()
137+
metadataReader.close()
138+
stopped = true
139+
}
135140
}
136141
}
137142

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.pulsar.common.naming.TopicName
2626
import org.apache.pulsar.common.schema.SchemaInfo
2727

2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.pulsar.PulsarOptions.TOPIC_OPTION_KEYS
29+
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH, TOPIC_OPTION_KEYS}
3030
import org.apache.spark.sql.types.StructType
3131

3232
/**
@@ -38,24 +38,22 @@ private[pulsar] case class PulsarMetadataReader(
3838
serviceUrl: String,
3939
adminUrl: String,
4040
clientConf: ju.Map[String, Object],
41+
adminClientConf: ju.Map[String, Object],
4142
driverGroupIdPrefix: String,
4243
caseInsensitiveParameters: Map[String, String])
4344
extends Closeable
4445
with Logging {
4546

4647
import scala.collection.JavaConverters._
4748

48-
protected val admin: PulsarAdmin = AdminUtils.buildAdmin(adminUrl, clientConf)
49-
protected var client: PulsarClient = null
49+
protected val admin: PulsarAdmin = AdminUtils.buildAdmin(adminUrl, adminClientConf)
50+
protected var client: PulsarClient = CachedPulsarClient.getOrCreate(clientConf)
5051

5152
private var topics: Seq[String] = _
5253
private var topicPartitions: Seq[String] = _
5354

5455
override def close(): Unit = {
5556
admin.close()
56-
if (client != null) {
57-
client.close()
58-
}
5957
}
6058

6159
def setupCursor(startingPos: PerTopicOffset): Unit = {
@@ -391,9 +389,6 @@ private[pulsar] case class PulsarMetadataReader(
391389
PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp)))
392390
} else {
393391
assert (time > 0, s"time less than 0: $time")
394-
if (client == null) {
395-
client = PulsarClient.builder().serviceUrl(serviceUrl).build()
396-
}
397392
val reader = client
398393
.newReader()
399394
.topic(tp)
@@ -458,9 +453,6 @@ private[pulsar] case class PulsarMetadataReader(
458453
UserProvidedMessageId(
459454
PulsarSourceUtils.seekableLatestMid(admin.topics().getLastMessageId(tp)))
460455
case _ =>
461-
if (client == null) {
462-
client = PulsarClient.builder().serviceUrl(serviceUrl).build()
463-
}
464456
val reader = client
465457
.newReader()
466458
.startMessageId(off)

src/main/scala/org/apache/spark/sql/pulsar/PulsarMicroBatchReader.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ private[pulsar] class PulsarMicroBatchReader(
4949

5050
private var startTopicOffsets: Map[String, MessageId] = _
5151
private var endTopicOffsets: Map[String, MessageId] = _
52+
private var stopped = false
5253

5354
val reportDataLoss = reportDataLossFunc(failOnDataLoss)
5455

@@ -151,9 +152,12 @@ private[pulsar] class PulsarMicroBatchReader(
151152
}.asJava
152153
}
153154

154-
override def stop(): Unit = {
155-
metadataReader.removeCursor()
156-
metadataReader.close()
155+
override def stop(): Unit = synchronized {
156+
if (!stopped) {
157+
metadataReader.removeCursor()
158+
metadataReader.close()
159+
stopped = true
160+
}
157161
}
158162
}
159163

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.apache.pulsar.common.naming.TopicName
1919
private[pulsar] object PulsarOptions {
2020

2121
// option key prefix for different modules
22+
val PULSAR_ADMIN_OPTION_KEY_PREFIX = "pulsar.admin."
2223
val PULSAR_CLIENT_OPTION_KEY_PREFIX = "pulsar.client."
2324
val PULSAR_PRODUCER_OPTION_KEY_PREFIX = "pulsar.producer."
2425
val PULSAR_CONSUMER_OPTION_KEY_PREFIX = "pulsar.consumer."
@@ -47,12 +48,12 @@ private[pulsar] object PulsarOptions {
4748
val POLL_TIMEOUT_MS = "polltimeoutms"
4849
val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
4950

50-
val AUTH_PLUGIN_CLASS_NAME = "authpluginclassname"
51-
val AUTH_PARAMS = "authparams"
52-
val TLS_TRUST_CERTS_FILE_PATH = "tlstrustcertsfilepath"
53-
val TLS_ALLOW_INSECURE_CONNECTION = "tlsallowinsecureconnection"
54-
val USE_TLS = "usetls"
55-
val TLS_HOSTNAME_VERIFICATION_ENABLE = "tlshostnameverificationenable"
51+
val AUTH_PLUGIN_CLASS_NAME = "authPluginClassName"
52+
val AUTH_PARAMS = "authParams"
53+
val TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath"
54+
val TLS_ALLOW_INSECURE_CONNECTION = "tlsAllowInsecureConnection"
55+
val USE_TLS = "useTls"
56+
val TLS_HOSTNAME_VERIFICATION_ENABLE = "tlsHostnameVerificationEnable"
5657

5758

5859
val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =

0 commit comments

Comments
 (0)