Skip to content

Commit

Permalink
Working on Kafka actor, #110
Browse files Browse the repository at this point in the history
  • Loading branch information
kudkudak committed Apr 20, 2014
1 parent 93d0c17 commit 8daee0e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
2 changes: 1 addition & 1 deletion mantis_shrimp/scripts/import_dataset_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ def get_documents(root_dir=os.path.join(don_data_dir, "Reuters115"), encoding="i
for id, d in enumerate(get_documents(options.root_dir, options.file_encoding)):
print "Pushing #",id," document"
words = document_to_word_list(d)
news = {"uuid":id, "title":(u" ".join(words[0:10])).encode("utf-8"), "summary":d.encode("utf8"), "text":""}
news = {"uuid":str(id), "title":(u" ".join(words[0:10])).encode("utf-8"), "summary":d.encode("utf8"), "text":""}
producer.send_messages("mantis_"+options.name, json.dumps(news).encode("utf-8"))

70 changes: 56 additions & 14 deletions mantis_shrimp/src/main/scala/MantisKafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ import kafka.api.PartitionOffsetRequestInfo
import kafka.message.MessageAndOffset


import org.specs2.mutable._
import rapture.fs._
import rapture.io._
import rapture.net._
import rapture.core._
import rapture.json._
import jsonParsers.scalaJson._


import java.util.UUID
import kafka.consumer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, ActorSystem}

import rapture.core._
Expand Down Expand Up @@ -134,20 +140,25 @@ import java.nio.file.{Paths, Files}

import akka.actor.{Actor, Props, ActorSystem}

import java.io.FileWriter
import monifu.concurrent.atomic._
import java.lang.{Runnable, Thread}
/**
* Basic Kafka Actor. Note that it will be sufficient in most cases because
* We will create new topics for new big chunks of news. I do not want
* to focus on implementing this one single actor.
*/
class MantisKafkaFetcherBasic extends Actor {
//Encoding for JSON parsing
implicit val enc = Encodings.`UTF-8`
//Main source of news
val topic = "mantis_mock_dataset_2"
//Stop fetching thread when exceedes
val maximumQueueSize = 1000
val maximumQueueSize = 1000
//Queue to store messages
val Q = new mutable.SynchronizedQueue[String]()
val Q = new mutable.SynchronizedQueue[Map[String, AnyRef]]()

// Prepare Kafka High Level Consumer
// Prepare Kafka High Level Consumer. TODO: Create own wrapper around this
val props = new Properties()
props.put("group.id", "console-consumer-2222222")
props.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
Expand All @@ -161,6 +172,7 @@ class MantisKafkaFetcherBasic extends Actor {
props.put("zookeeper.connect", "ocean-db.no-ip.biz:2181")
props.put("consumer.timeout.ms", (-1).toString)
props.put("refresh.leader.backoff.ms", (ConsumerConfig.RefreshMetadataBackoffMs).toString)

val consumerconfig = new ConsumerConfig(props)
val consumer = kafka.consumer.Consumer.createJavaConsgumerConnector(consumerconfig)
val topicMap = Map[String, Integer]("mantis_mock_dataset_2" -> 1)
Expand All @@ -170,18 +182,48 @@ class MantisKafkaFetcherBasic extends Actor {
val consumerIter:ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()

// Fetch already tagged news (note this will be synchronized through offsets
// the more advanced version
tagged = if(Files.exists("fetched"))

val fetcher_thread = new Thread(new Runnable {
// the more advanced version.
val tagged_in_current_topic_list = if(Files.exists(Paths.get("tagged.dat")))
scala.io.Source.fromFile("tagged.dat").mkString.split("\\r?\\n").toList else
List()
var tagged_in_current_topic = scala.collection.mutable.Set[String]()
tagged_in_current_topic ++= tagged_in_current_topic

/**
* Commits already tagged to file. Not implemented correctly now (should update list)
*/
private def commitTagged()={
val fw = new java.io.FileWriter("tagged.dat")
try fw.write(tagged_in_current_topic_list.mkString("/n"))
finally fw.close()
}

// Fetching thread running in background
val fetcher_thread = new java.lang.Thread(new java.lang.Runnable {
def run(){
while(consumerIter.hasNext()){


System.out.println("MSG -> " + new String(consumerIter.next().message))
val msgoffset = consumerIter.next()

// Try parsing - if format is incorrect write error
try {
val msg = JsonBuffer.parse(new String(msgoffset.message))
val uuid = msg.uuid.as[String]
if (!tagged_in_current_topic.contains(uuid)) {
var entry = scala.collection.mutable.HashMap[String, AnyRef]()
entry += "title" -> msg.title.as[String]
entry += "summary" -> msg.summary.as[String]
entry += "text" -> msg.text.as[String]
}
}
catch{
// TODO: improve logging
case e: Exception => println("Failed parsing consumer message offset=" + msgoffset.offset.toString)
}
}

}
})

fetcher_thread.setDaemon(true)
fetcher_thread.start


def receive = {
Expand Down

0 comments on commit 8daee0e

Please sign in to comment.