Skip to content

Commit 51b29c4

Browse files
authored
Merge branch 'Baeldung:master' into scala-logging
2 parents 0c015d1 + 49005b9 commit 51b29c4

File tree

10 files changed

+299
-1
lines changed

10 files changed

+299
-1
lines changed

build.sbt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ lazy val scala_libraries_os = (project in file("scala-libraries-os"))
315315
)
316316

317317
lazy val scala_libraries_4 = (project in file("scala-libraries-4"))
318+
.configs(IntegrationTest)
318319
.settings(
319320
name := "scala-libraries-4",
320321
libraryDependencies += "com.lihaoyi" %% "utest" % "0.8.1" % "test",
@@ -329,7 +330,14 @@ lazy val scala_libraries_4 = (project in file("scala-libraries-4"))
329330
logback,
330331
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5"
331332
),
332-
scalacOptions += "-Xasync"
333+
libraryDependencies ++= Seq(
334+
"com.clever-cloud.pulsar4s" %% "pulsar4s-core" % "2.9.0",
335+
"com.clever-cloud.pulsar4s" %% "pulsar4s-jackson" % "2.9.0",
336+
"org.testcontainers" % "pulsar" % "1.17.6" % IntegrationTest
337+
),
338+
scalacOptions += "-Xasync",
339+
Defaults.itSettings,
340+
IntegrationTest / fork := true
333341
)
334342

335343
lazy val scala_strings = (project in file("scala-strings"))

scala-libraries-4/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
- [Introduction to scala-async](https://www.baeldung.com/scala/scala-async)
55
- [Get the First N Rows of a Spark Dataframe](https://www.baeldung.com/scala/spark-dataframe-get-first-n-rows)
66
- [Introduction to Skunk – Scala Driver for PostgreSQL](https://www.baeldung.com/scala/skunk-postgresql-driver)
7+
- [Apache Pulsar Scala Client – pulsar4s](https://www.baeldung.com/scala/pulsar4s)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import org.scalatest.BeforeAndAfterAll
4+
import org.scalatest.matchers.must.Matchers
5+
import org.scalatest.wordspec.AsyncWordSpec
6+
import org.testcontainers.containers.PulsarContainer
7+
import org.testcontainers.utility.DockerImageName
8+
9+
import scala.util.Success
10+
11+
class PulsarJsonSchemaTest
12+
extends AsyncWordSpec
13+
with BeforeAndAfterAll
14+
with Matchers {
15+
16+
val pulsar: PulsarContainer = new PulsarContainer(
17+
DockerImageName.parse("apachepulsar/pulsar:2.10.2")
18+
)
19+
20+
override def beforeAll(): Unit = pulsar.start()
21+
22+
override def afterAll(): Unit = pulsar.stop()
23+
24+
"pulsar json producer" should {
25+
"successfully send messages" in {
26+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
27+
val producer = new JsonPulsarProducer(pulsarClient)
28+
val messageIdTry = producer.sendMessage(
29+
"my-key",
30+
PulsarMessage(1, "a test message", System.currentTimeMillis())
31+
)
32+
messageIdTry mustBe a[Success[_]]
33+
}
34+
}
35+
36+
"pulsar consumer" should {
37+
"successfully consume messages" in {
38+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
39+
val producer = new JsonPulsarProducer(pulsarClient)
40+
val consumer = new JsonPulsarConsumer(pulsarClient)
41+
val messageIdTry =
42+
producer.sendMessage(
43+
"key-to-consume",
44+
PulsarMessage(2, "a test message", System.currentTimeMillis())
45+
)
46+
messageIdTry mustBe a[Success[_]]
47+
48+
val messageTry = consumer.consume()
49+
messageTry mustBe a[Success[_]]
50+
messageTry.get.key must contain("key-to-consume")
51+
messageTry.get.value.id mustBe 2
52+
messageTry.get.value.message mustBe "a test message"
53+
}
54+
55+
"successfully consume async messages" in {
56+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
57+
val producer = new JsonPulsarProducer(pulsarClient)
58+
val consumer = new JsonPulsarConsumer(pulsarClient)
59+
val messageIdTry =
60+
producer.sendMessage(
61+
"key-to-consume",
62+
PulsarMessage(3, "a test message", System.currentTimeMillis())
63+
)
64+
messageIdTry mustBe a[Success[_]]
65+
66+
consumer
67+
.consumeAsync()
68+
.map(message => {
69+
message.key must contain("key-to-consume")
70+
message.value.id mustBe 3
71+
message.value.message mustBe "a test message"
72+
})
73+
}
74+
}
75+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import org.scalatest.BeforeAndAfterAll
4+
import org.scalatest.matchers.must.Matchers
5+
import org.scalatest.wordspec.AsyncWordSpec
6+
import org.testcontainers.containers.PulsarContainer
7+
import org.testcontainers.utility.DockerImageName
8+
9+
import scala.util.Success
10+
11+
class PulsarStringSchemaTest
12+
extends AsyncWordSpec
13+
with BeforeAndAfterAll
14+
with Matchers {
15+
16+
val pulsar: PulsarContainer = new PulsarContainer(
17+
DockerImageName.parse("apachepulsar/pulsar:2.10.2")
18+
)
19+
20+
override def beforeAll(): Unit = pulsar.start()
21+
22+
override def afterAll(): Unit = pulsar.stop()
23+
24+
"pulsar producer" should {
25+
"successfully send messages" in {
26+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
27+
val producer = new PulsarProducer(pulsarClient)
28+
val messageIdTry = producer.sendMessage("my-key", "a test message")
29+
messageIdTry mustBe a[Success[_]]
30+
}
31+
}
32+
33+
"pulsar consumer" should {
34+
"successfully consume messages" in {
35+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
36+
val producer = new PulsarProducer(pulsarClient)
37+
val consumer = new PulsarConsumer(pulsarClient)
38+
val messageIdTry =
39+
producer.sendMessage("key-to-consume", "a test message")
40+
messageIdTry mustBe a[Success[_]]
41+
42+
val messageTry = consumer.consume()
43+
messageTry mustBe a[Success[_]]
44+
messageTry.get.key must contain("key-to-consume")
45+
messageTry.get.value mustBe "a test message"
46+
}
47+
48+
"successfully consume async messages" in {
49+
val pulsarClient = new PulsarClient(pulsar.getPulsarBrokerUrl)
50+
val producer = new PulsarProducer(pulsarClient)
51+
val consumer = new PulsarConsumer(pulsarClient)
52+
val messageIdTry =
53+
producer.sendMessage("key-to-consume", "a test message")
54+
messageIdTry mustBe a[Success[_]]
55+
56+
consumer
57+
.consumeAsync()
58+
.map(message => {
59+
message.key must contain("key-to-consume")
60+
message.value mustBe "a test message"
61+
})
62+
}
63+
}
64+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import com.sksamuel.pulsar4s.{
4+
Consumer,
5+
ConsumerConfig,
6+
ConsumerMessage,
7+
Subscription,
8+
Topic
9+
}
10+
import com.sksamuel.pulsar4s.jackson._
11+
12+
import scala.concurrent.{ExecutionContext, Future}
13+
import scala.util.Try
14+
15+
class JsonPulsarConsumer(pulsarClient: PulsarClient)(implicit
16+
executionContext: ExecutionContext
17+
) {
18+
19+
val topic: Topic = Topic("pulsar4s-json-topic")
20+
val consumerConfig: ConsumerConfig =
21+
ConsumerConfig(Subscription.generate, Seq(topic))
22+
val consumer: Consumer[PulsarMessage] =
23+
pulsarClient.consumer[PulsarMessage](consumerConfig)
24+
25+
def consume(): Try[ConsumerMessage[PulsarMessage]] =
26+
consumer.receive.map(message => {
27+
consumer.acknowledge(message.messageId)
28+
message
29+
})
30+
31+
def consumeAsync(): Future[ConsumerMessage[PulsarMessage]] =
32+
consumer.receiveAsync.map(message => {
33+
consumer.acknowledge(message.messageId)
34+
message
35+
})
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import com.sksamuel.pulsar4s.{
4+
DefaultProducerMessage,
5+
MessageId,
6+
Producer,
7+
ProducerConfig,
8+
Topic
9+
}
10+
import com.sksamuel.pulsar4s.jackson._
11+
12+
import scala.util.Try
13+
14+
class JsonPulsarProducer(pulsarClient: PulsarClient) {
15+
16+
val topic: Topic = Topic("pulsar4s-json-topic")
17+
val producerConfig: ProducerConfig = ProducerConfig(topic)
18+
val producer: Producer[PulsarMessage] =
19+
pulsarClient.producer[PulsarMessage](producerConfig)
20+
21+
def sendMessage(key: String, message: PulsarMessage): Try[MessageId] =
22+
producer.send(DefaultProducerMessage[PulsarMessage](Some(key), message))
23+
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import com.sksamuel.pulsar4s.{
4+
Consumer,
5+
ConsumerConfig,
6+
Producer,
7+
ProducerConfig,
8+
PulsarAsyncClient,
9+
PulsarClient,
10+
PulsarClientConfig
11+
}
12+
import org.apache.pulsar.client.api.Schema
13+
14+
class PulsarClient(pulsarServiceUrl: String) {
15+
16+
private val config: PulsarClientConfig = PulsarClientConfig(pulsarServiceUrl)
17+
private val client: PulsarAsyncClient = PulsarClient(config)
18+
19+
def producer[T: Schema](producerConfig: ProducerConfig): Producer[T] =
20+
client.producer[T](producerConfig)
21+
22+
def consumer[T: Schema](consumerConfig: ConsumerConfig): Consumer[T] =
23+
client.consumer[T](consumerConfig)
24+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import com.sksamuel.pulsar4s.{
4+
Consumer,
5+
ConsumerConfig,
6+
ConsumerMessage,
7+
Subscription,
8+
Topic
9+
}
10+
import org.apache.pulsar.client.api.Schema
11+
12+
import scala.concurrent.{ExecutionContext, Future}
13+
import scala.util.Try
14+
15+
class PulsarConsumer(pulsarClient: PulsarClient)(implicit
16+
executionContext: ExecutionContext
17+
) {
18+
19+
implicit val schema: Schema[String] = Schema.STRING
20+
21+
val topic: Topic = Topic("pulsar4s-topic")
22+
val consumerConfig: ConsumerConfig =
23+
ConsumerConfig(Subscription.generate, Seq(topic))
24+
val consumer: Consumer[String] = pulsarClient.consumer[String](consumerConfig)
25+
26+
def consume(): Try[ConsumerMessage[String]] =
27+
consumer.receive.map(message => {
28+
consumer.acknowledge(message.messageId)
29+
message
30+
})
31+
32+
def consumeAsync(): Future[ConsumerMessage[String]] =
33+
consumer.receiveAsync.map(message => {
34+
consumer.acknowledge(message.messageId)
35+
message
36+
})
37+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
case class PulsarMessage(id: Long, message: String, createdAt: Long)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.baeldung.scala.pulsar4s
2+
3+
import com.sksamuel.pulsar4s.{
4+
DefaultProducerMessage,
5+
MessageId,
6+
Producer,
7+
ProducerConfig,
8+
Topic
9+
}
10+
import org.apache.pulsar.client.api.Schema
11+
12+
import scala.util.Try
13+
14+
class PulsarProducer(pulsarClient: PulsarClient) {
15+
16+
implicit val schema: Schema[String] = Schema.STRING
17+
18+
val topic: Topic = Topic("pulsar4s-topic")
19+
val producerConfig: ProducerConfig = ProducerConfig(topic)
20+
val producer: Producer[String] = pulsarClient.producer[String](producerConfig)
21+
22+
def sendMessage(key: String, message: String): Try[MessageId] =
23+
producer.send(
24+
DefaultProducerMessage[String](Some(key), message)
25+
)
26+
}

0 commit comments

Comments
 (0)