-
Notifications
You must be signed in to change notification settings - Fork 78
/
Copy pathCassandraDriver.scala
113 lines (97 loc) · 3.45 KB
/
CassandraDriver.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package cassandra
import org.apache.spark.sql._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import kafka.KafkaService
import radio.{SimpleSongAggregation, SimpleSongAggregationKafka}
import spark.SparkHelper
import foreachSink._
import log.LazyLogger
object CassandraDriver extends LazyLogger {
private val spark = SparkHelper.getSparkSession()
import spark.implicits._
val connector = CassandraConnector(SparkHelper.getSparkSession().sparkContext.getConf)
val namespace = "structuredstreaming"
val foreachTableSink = "radio"
val StreamProviderTableSink = "radioothersink"
val kafkaMetadata = "kafkametadata"
def getTestInfo() = {
val rdd = spark.sparkContext.cassandraTable(namespace, kafkaMetadata)
if( !rdd.isEmpty ) {
log.warn(rdd.count)
log.warn(rdd.first)
} else {
log.warn(s"$namespace, $kafkaMetadata is empty in cassandra")
}
}
/**
* remove kafka metadata and only focus on business structure
*/
def getDatasetForCassandra(df: DataFrame) = {
df.select(KafkaService.radioStructureName + ".*")
.as[SimpleSongAggregation]
}
//https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
//https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
def saveForeach(df: DataFrame ) = {
val ds = CassandraDriver.getDatasetForCassandra(df)
ds
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
}
def saveStreamSinkProvider(ds: Dataset[SimpleSongAggregationKafka]) = {
ds
.toDF() //@TODO see if we can use directly the Dataset object
.writeStream
.format("cassandra.StreamSinkProvider.CassandraSinkProvider")
.outputMode("update")
.queryName("KafkaToCassandraStreamSinkProvider")
.start()
}
/**
* @TODO handle more topic name, for our example we only use the topic "test"
*
* we can use collect here as kafkameta data is not big at all
*
* if no metadata are found, we would use the earliest offsets.
*
* @see https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-batch
* assign json string {"topicA":[0,1],"topicB":[2,4]}
* Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source.
*/
def getKafkaMetadata() = {
try {
val kafkaMetadataRDD = spark.sparkContext.cassandraTable(namespace, kafkaMetadata)
val output = if (kafkaMetadataRDD.isEmpty) {
("startingOffsets", "earliest")
} else {
("startingOffsets", transformKafkaMetadataArrayToJson(kafkaMetadataRDD.collect()))
}
log.warn("getKafkaMetadata " + output.toString)
output
}
catch {
case e: Exception =>
("startingOffsets", "earliest")
}
}
/**
* @param array
* @return {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
*/
def transformKafkaMetadataArrayToJson(array: Array[CassandraRow]) : String = {
s"""{"${KafkaService.topicName}":
{
"${array(0).getLong("partition")}": ${array(0).getLong("offset")}
}
}
""".replaceAll("\n", "").replaceAll(" ", "")
}
def debug() = {
val output = spark.sparkContext.cassandraTable(namespace, foreachTableSink)
log.warn(output.count)
}
}