diff --git a/mantis_shrimp/build.sbt b/mantis_shrimp/build.sbt index 35b4609..3059311 100644 --- a/mantis_shrimp/build.sbt +++ b/mantis_shrimp/build.sbt @@ -2,12 +2,18 @@ lazy val stampleRootProject = Project(id = "mantis_shrimp",base = file(".")) name := "mantis_shrimp" +excludeFilter in unmanagedSources := HiddenFileFilter || "FourClassNERTagger.scala" + +excludeFilter in unmanagedSources := HiddenFileFilter || "ThreeClassNERTagger.scala" + version := "1.0" scalaVersion := "2.10.2" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" +libraryDependencies += "com.rabbitmq" % "amqp-client" % "2.7.1" + libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.1" libraryDependencies += "org.monifu" %% "monifu-core" % "0.5" diff --git a/mantis_shrimp/scripts/import_dataset_kafka.py b/mantis_shrimp/scripts/_obsolete_import_dataset_kafka.py similarity index 100% rename from mantis_shrimp/scripts/import_dataset_kafka.py rename to mantis_shrimp/scripts/_obsolete_import_dataset_kafka.py diff --git a/mantis_shrimp/scripts/latest_message_on_topic.py b/mantis_shrimp/scripts/_obsolete_latest_message_on_topic.py similarity index 100% rename from mantis_shrimp/scripts/latest_message_on_topic.py rename to mantis_shrimp/scripts/_obsolete_latest_message_on_topic.py diff --git a/mantis_shrimp/scripts/example_usage_kafka.py b/mantis_shrimp/scripts/example_usage_kafka.py deleted file mode 100755 index 9fef763..0000000 --- a/mantis_shrimp/scripts/example_usage_kafka.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -import threading, logging, time - -from kafka.client import KafkaClient -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer - -class Producer(threading.Thread): - daemon = True - - def run(self): - client = KafkaClient("ocean-db.no-ip.biz:771") - producer = SimpleProducer(client) - - while True: - producer.send_messages('my-topic', "test") - producer.send_messages('my-topic', "\xc2Hola, mundo!") - - time.sleep(1) - - -class Consumer(threading.Thread): - daemon = True - - def run(self): - client = KafkaClient("ocean-db.no-ip.biz:771") - consumer = SimpleConsumer(client, "test-group", "my-topic") - - for message in consumer: - print(message) - -def main(): - threads = [ - Producer(), - Consumer() - ] - - for t in threads: - t.start() - - time.sleep(5) - -if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.DEBUG - ) - main() diff --git a/mantis_shrimp/scripts/import_dataset_rabbitmq.py b/mantis_shrimp/scripts/import_dataset_rabbitmq.py new file mode 100644 index 0000000..5e8a5e8 --- /dev/null +++ b/mantis_shrimp/scripts/import_dataset_rabbitmq.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +import pika +import sys + +""" +Simple script pushing datasets to kafka +""" + +import fnmatch +import os +import sys +from optparse import OptionParser +from nltk.tokenize import * +import logging +import codecs +# Try switching of kafka-python logger +try: + logging.getLogger("pika").setLevel(logging.ERROR) +except: + pass + +from sklearn.feature_extraction.text import CountVectorizer +from sklearn.feature_extraction.text import TfidfVectorizer + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../don_corleone/')) + +from don_utils import get_configuration +import json + + +# + +don_dir = os.path.abspath(os.path.join(__file__, "../../..")) +don_data_dir = os.path.join(don_dir, "data") + +def document_to_word_list(text): + """ Return concatenated list of bare words """ + text_sent = sent_tokenize(text) + words = [word_tokenize(sentence) for sentence in text_sent] + all_words = [] + for words_subset in words: all_words += words_subset #flattening + return all_words + +def get_documents(root_dir=os.path.join(don_data_dir, "Reuters115"), encoding="iso-8859-1"): + """ Reads all files in given directory""" + print don_data_dir + matches = [] + documents = [] + for root, dirnames, filenames in os.walk(root_dir): + for id, filename in enumerate(filenames): + print "Reading id ",id, " filename ",filename + documents.append(open(root+"/"+filename,"r").read().decode(encoding)) + return documents + +if __name__ == "__main__": + + parser = OptionParser() + parser.add_option( + '-r', + '--root_dir', + dest='root_dir', + default=os.path.join(don_data_dir, "Reuters115"), + help='Data directory' + ) + parser.add_option( + '-n', + '--name', + dest='name', + default='mock_dataset_1', + help='Dataset name' + ) + parser.add_option( + '-e', + '--file_encoding', + dest='file_encoding', + default="iso-8859-1", + help='File encoding - check by unix command file' + ) + (options, args) = parser.parse_args() + + print "Connecting to ","{0}:{1}".format(get_configuration("kafka","host"),get_configuration("kafka","port")) + + credentials = pika.PlainCredentials('admin', 'password') + connection = pika.BlockingConnection(pika.ConnectionParameters( + 'localhost', credentials=credentials)) + + + + channel = connection.channel() + + channel.queue_declare(queue='mantis_totag') + + + for id, d in enumerate(get_documents(options.root_dir, options.file_encoding)): + + print "Sending ", id + + words = document_to_word_list(d) + news = {"uuid":str(id), "title":(u" ".join(words[0:10])).encode("utf-8"), "summary":d.encode("utf8"), "text":""} + message = json.dumps(news).encode("utf-8") + + channel.basic_publish(exchange='', + routing_key='mantis_totag', + body=message) + + + connection.close() diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/AkkaMessages.scala b/mantis_shrimp/src/main/scala/mantisshrimp/AkkaMessages.scala index 5c9ce07..207fdab 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/AkkaMessages.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/AkkaMessages.scala @@ -6,9 +6,11 @@ package mantisshrimp //Messages for Akka system case class MantisTag(words: String, tag: String) case class Tagged(uuid: String, x: Seq[MantisTag]) +case class AlreadyTagged(uuid: String) case class ItemArrive(x: scala.collection.mutable.Map[String, AnyRef]) case class Tag(x: scala.collection.mutable.Map[String, AnyRef]) case class GetType +case class Register(name: String) object MantisLiterals{ diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/DonCorleoneUtils.scala b/mantis_shrimp/src/main/scala/mantisshrimp/DonCorleoneUtils.scala index 223b274..1629dc3 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/DonCorleoneUtils.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/DonCorleoneUtils.scala @@ -39,10 +39,11 @@ object DonCorleoneUtils{ // } //TODO: why reflection fails sometimes? def get_configuration_string(service_name:String, config_name: String): String = { - val request_url = "get_configuration?service_name="+service_name+"&config_name="+config_name+"&node_id="+this.config.node_id.as[String] - println("Connecting to "+(this.don_url.replaceAll("http://","") / request_url).toString()) - val value = JsonBuffer.parse((Http / this.don_url.replace("http://", "") / request_url).slurp[Char]).result - return value.toString() + val request_url = "get_configuration?service_name=" + service_name + "&config_name=" + config_name + "&node_id=" + this.config.node_id.as[String] + println("Connecting to " + (this.don_url.replaceAll("http://", "") / request_url).toString()) + val value = JsonBuffer.parse((Http / this.don_url.replace("http://", "") / request_url).slurp[Char]) + + return value.result.toString() } // //Demo functions: TODO: convert to tests diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/Mantis7ClassTagger.scala b/mantis_shrimp/src/main/scala/mantisshrimp/Mantis7ClassTagger.scala index afde567..0a00410 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/Mantis7ClassTagger.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/Mantis7ClassTagger.scala @@ -3,7 +3,7 @@ package mantisshrimp import ner.SevenClassNERTagger -class Mantis7ClassNERTagger extends BasicTaggerActor { +class Mantis7ClassNERTagger extends MantisTagger { val nerTagger = new SevenClassNERTagger() override def tag(x: scala.collection.mutable.Map[String, AnyRef]): Tuple2[String, Seq[MantisTag]] = { diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisTagger.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisActors.scala similarity index 56% rename from mantis_shrimp/src/main/scala/mantisshrimp/MantisTagger.scala rename to mantis_shrimp/src/main/scala/mantisshrimp/MantisActors.scala index 699a67b..4e28f33 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/MantisTagger.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/MantisActors.scala @@ -2,16 +2,38 @@ package mantisshrimp import akka.actor.Actor -/** - * Created by staszek on 4/21/14. - */ + +trait MantisNewsFetcher extends Actor{ + + /* + * Override in inhertiting classes + */ + def getNews(): scala.collection.mutable.Map[String, AnyRef] + + def handleAlreadyTagged(uuid: String): Unit + + /** + * Override in inheriting classes + */ + def getType(): String={ + return "NewsFetcher" + } + + def receive = { + case "get_news" => { + sender ! ItemArrive(getNews()) + } + case AlreadyTagged(uuid) => { + handleAlreadyTagged(uuid) + } + } +} + /* * Basic class for tagger */ - - -class BasicTaggerActor extends Actor{ +trait MantisTagger extends Actor{ /* * Override in inhertiting classes @@ -25,7 +47,7 @@ class BasicTaggerActor extends Actor{ * Override in inheriting classes */ def getType(): String={ - return "BasicTagger" + return "Tagger" } def receive = { diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisKafkaConsumer.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisKafkaConsumer.scala deleted file mode 100644 index a5ae980..0000000 --- a/mantis_shrimp/src/main/scala/mantisshrimp/MantisKafkaConsumer.scala +++ /dev/null @@ -1,162 +0,0 @@ -package mantisshrimp -//TODO: add checking Kafka spout - - - -import akka.actor.{Actor, Props} - -import java.util.Properties - -import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, FetchRequest} -import kafka.consumer.ConsumerConfig -import kafka.producer.KeyedMessage - -import kafka.common.OffsetOutOfRangeException - -import rapture.io._ -import rapture.json._ -import jsonParsers.scalaJson._ - - -import java.util.UUID -import kafka.consumer._ -import kafka.utils._ -import akka.actor.Actor - -import rapture.core._ -import rapture.json._ -import jsonParsers.scalaJson._ -import jsonParsers.scalaJson._ - -import scala.collection.mutable - -//import kafka.producer.{ProducerConfig, ProducerData, Producer} - -//Internal strategy for Rapture.io - I dont see this design choice.. -import strategy.throwExceptions - -import java.util.Properties - -import scala.collection.JavaConverters._ -import kafka.common.ErrorMapping -import kafka.consumer.ConsumerConfig -import java.nio.file.{Paths, Files} - -import akka.actor.{Actor, Props, ActorSystem} - -import java.lang.{Runnable, Thread} -/** - * Basic Kafka Actor. Note that it will be sufficient in most cases because - * We will create new topics for new big chunks of news. I do not want - * to focus on implementing this one single actor. - */ -class MantisKafkaFetcherBasic extends Actor { - val topic: String = "mantis_mock_1" - //Encoding for JSON parsing - implicit val enc = Encodings.`UTF-8` - //Stop fetching thread when exceedes - val maximumQueueSize = 100 - //Queue to store messages - val Q = new mutable.SynchronizedQueue[scala.collection.mutable.Map[String, AnyRef]]() - - // Prepare Kafka High Level Consumer. TODO: Create own wrapper around this - val props = new Properties() - props.put("group.id", topic+"_consumer") - props.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) - props.put("socket.timeout.ms", (ConsumerConfig.SocketTimeout).toString) - props.put("fetch.message.max.bytes", (1024 * 1024).toString) - props.put("fetch.min.bytes", (1).toString) - props.put("fetch.wait.max.ms", (100).toString) - props.put("auto.commit.enable", "true") - props.put("auto.commit.interval.ms", (ConsumerConfig.AutoCommitInterval).toString) - props.put("auto.offset.reset", "smallest") - props.put("zookeeper.connect", "ocean-db.no-ip.biz:2181") - props.put("consumer.timeout.ms", (-1).toString) - props.put("refresh.leader.backoff.ms", (ConsumerConfig.RefreshMetadataBackoffMs).toString) - - val consumerconfig = new ConsumerConfig(props) - val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig) - val topicMap = Map[String, Integer](topic -> 1) - val consumerMap = consumer.createMessageStreams(topicMap.asJava) - val streamz = consumerMap.get(topic) - val stream: KafkaStream[Array[Byte], Array[Byte]] = streamz.iterator().next() - val consumerIter:ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() - - // Fetch already tagged news (note this will be synchronized through offsets - // the more advanced version. - val tagged_in_current_topic_list = if(Files.exists(Paths.get("tagged.dat"))) - scala.io.Source.fromFile("tagged.dat").mkString.split("\\r?\\n").toList else - List() - var tagged_in_current_topic = scala.collection.mutable.Set[String]() - tagged_in_current_topic ++= tagged_in_current_topic - - /** - * Commits already tagged to file. Not implemented correctly now (should update list) - */ - private def commitTagged()={ - val fw = new java.io.FileWriter("tagged.dat") - try fw.write(tagged_in_current_topic_list.mkString("/n")) - finally fw.close() - } - - // Fetching thread running in background - val fetcher_thread = new java.lang.Thread(new java.lang.Runnable { - def run(){ - while(consumerIter.hasNext()){ - val msgoffset = consumerIter.next() - // Try parsing - if format is incorrect write error - try { - val msg = JsonBuffer.parse(new String(msgoffset.message)) - val uuid = msg.uuid.as[String] - //TODO: improve - if (!tagged_in_current_topic.contains(uuid)) { - var entry = scala.collection.mutable.HashMap[String, AnyRef]() - - //KafkaActor should act as a filter for garbage. It HAS to parse, and also - //has to improve quality. Those are Unicode decoded from UTF-8! - - entry += "title" -> msg.title.as[String].split("\\r?\\n").map(_.trim).mkString(" ") - entry += "summary" -> msg.summary.as[String].split("\\r?\\n").map(_.trim).mkString(" ") - entry += "text" -> msg.text.as[String].split("\\r?\\n").map(_.trim).mkString(" ") - entry += "uuid" -> msg.uuid.as[String] - Q.enqueue(entry) - } - - } - catch{ - // TODO: improve logging - case e: Exception => println("Failed parsing consumer message offset=" + msgoffset.offset.toString+" "+msgoffset.message.toString) - } - - if(Q.length % 1000 == 0){ - println("Already enqueued "+Q.length.toString+" news") - } - - while(Q.length > maximumQueueSize) - java.lang.Thread.sleep(1000) - - } - } - }) - fetcher_thread.setDaemon(true) - fetcher_thread.start - - - def receive = { - case "get_news" => { - //Wait for news to arrive - while(Q.isEmpty) - java.lang.Thread.sleep(100) - - //Ok - if(!Q.isEmpty) sender ! ItemArrive(Q.dequeue()) - } - } -} - - - - - - - diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisMaster.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisMaster.scala index d4c8724..ae91909 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/MantisMaster.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/MantisMaster.scala @@ -1,60 +1,23 @@ package mantisshrimp - - -import akka.actor.{Props, ActorSystem, Actor} -import scala.collection.mutable - -import monifu.concurrent._ +import akka.actor.Actor._ /** - * Created by staszek on 4/21/14. + * Created by staszek on 5/16/14. */ +class MantisMaster { + var hasTagger = false + var hasNewsFetcher = false + var hasNeo4jDumper = false -class MantisMaster extends Actor { - - val taggersCount:Int =0 - val kafkaFetchersCounter: Int = 0 - var taggers = List[akka.actor.ActorRef]() - var currentTagger = monifu.concurrent.atomic.AtomicInt(0) - - - def start { - //Define number of Taggers - val taggersCount = 5 - //Kakfa fetchers, only one possible - val kafkaFetchersCount = 1 - //Create ActorSystem - val system = ActorSystem("mantisshrimp") - - // Construct taggers - for(i <- 0 to taggersCount) taggers = system.actorOf(Props[Mantis7ClassNERTagger], name = ("taggerActor" + i.toString) ) :: taggers - - // KafkaFetcher thread - val kafkaFetcher = system.actorOf(Props[MantisKafkaFetcherBasic], name = ("kafkaFetcher0") ) - - // Run flow - kafkaFetcher ! "get_news" - } - def receive = { - case Tagged(uuid, x) => { - println(uuid + " tagged with " + x.mkString) - } - case ItemArrive(x) => { - //Should tag it with all taggers types. For now just call one in queue - println("News arrived") - - val currentTaggerLocal = currentTagger.getAndIncrement() % this.taggers.length + case Tag(x) => { - taggers(currentTaggerLocal) ! Tag(x) - - sender ! "get_news" } - case "start" => { - start + case GetType => { + } - } -} \ No newline at end of file + } +} diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherKafka.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherKafka.scala new file mode 100644 index 0000000..5e41cce --- /dev/null +++ b/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherKafka.scala @@ -0,0 +1,167 @@ +package mantisshrimp + +//TODO: add checking Kafka spout + + +import akka.actor.{Actor, Props} + + +import rapture.io._ +import rapture.json._ + + +import kafka.consumer._ + +import rapture.core._ +import rapture.json._ +import jsonParsers.scalaJson._ + +import scala.collection.mutable + +//import kafka.producer.{ProducerConfig, ProducerData, Producer} + +//Internal strategy for Rapture.io - I dont see this design choice.. + +import strategy.throwExceptions + +import java.util.Properties + +import scala.collection.JavaConverters._ +import kafka.consumer.ConsumerConfig +import java.nio.file.{Paths, Files} + +import akka.actor.{Actor, Props, ActorSystem} + + + + +import java.lang.{Runnable, Thread} + +/** + * Basic Kafka Actor. Note that it will be sufficient in most cases because + * We will create new topics for new big chunks of news. I do not want + * to focus on implementing this one single actor. + */ +class MantisNewsFetcherKafka extends Actor with MantisNewsFetcher { + val topic: String = "mantis_mock_1" + val tagged_topic: String = topic + "_tagged" + + //Encoding for JSON parsing + implicit val enc = Encodings.`UTF-8` + //Stop fetching thread when exceedes + val maximumQueueSize = 100 + //Queue to store messages + val Q = new mutable.SynchronizedQueue[scala.collection.mutable.Map[String, AnyRef]]() + + val taggedS: mutable.HashSet[String] = new mutable.HashSet[String] with mutable.SynchronizedSet[String] + + + // Fetch already tagged news (note this will be synchronized through offsets + // the more advanced version. + val tagged_commited = if (Files.exists(Paths.get("tagged.dat"))) + scala.io.Source.fromFile("tagged.dat").mkString.split("\\r?\\n").toList + else + List() + var tagged = scala.collection.mutable.Set[String]() + tagged ++= tagged_commited + + /** + * Commits already tagged to file. Not implemented correctly now (should update list) + */ + private def commitTagged() = { + val fw = new java.io.FileWriter("tagged.dat") + try fw.write(tagged.mkString("/n")) + finally fw.close() + } + + // Fetching thread running in background + val fetcher_thread = new java.lang.Thread(new java.lang.Runnable { + def run() { + while (true) { + //Source: http://sillycat.iteye.com/blog/2015181 + // Prepare Kafka High Level Consumer. TODO: Create own wrapper around this + val props = new Properties() + props.put("group.id", topic + "_consumer") + props.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString) + props.put("socket.timeout.ms", (ConsumerConfig.SocketTimeout).toString) + props.put("fetch.message.max.bytes", (1024 * 1024).toString) + props.put("fetch.min.bytes", (1).toString) + props.put("fetch.wait.max.ms", (100).toString) + props.put("auto.commit.enable", "true") + props.put("auto.commit.interval.ms", (ConsumerConfig.AutoCommitInterval).toString) + + println("Connecting to zookeper "+DonCorleoneUtils.get_configuration_string("zookeeper","host") + ":" + DonCorleoneUtils.get_configuration_string("zookeeper","port")) + + props.put("auto.offset.reset", "smallest") + props.put("zookeeper.connect", + DonCorleoneUtils.get_configuration_string("zookeeper","host") + ":" + DonCorleoneUtils.get_configuration_string("zookeeper","port")) + props.put("consumer.timeout.ms", (-1).toString) + props.put("refresh.leader.backoff.ms", (ConsumerConfig.RefreshMetadataBackoffMs).toString) + + val consumerconfig = new ConsumerConfig(props) + val consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig) + val topicMap = Map[String, Integer](topic -> 1) + val consumerMap = consumer.createMessageStreams(topicMap.asJava) + val streamz = consumerMap.get(topic) + val stream: KafkaStream[Array[Byte], Array[Byte]] = streamz.iterator().next() + val consumerIter: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() + + val msgoffset = consumerIter.next() + // Try parsing - if format is incorrect write error + try { + val msg = JsonBuffer.parse(new String(msgoffset.message)) + val uuid = msg.uuid.as[String] + //TODO: improve + if (!tagged.contains(uuid)) { + var entry = scala.collection.mutable.HashMap[String, AnyRef]() + //KafkaActor should act as a filter for garbage. It HAS to parse, and also + //has to improve quality. Those are Unicode decoded from UTF-8! + + entry += "title" -> msg.title.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "summary" -> msg.summary.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "text" -> msg.text.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "uuid" -> msg.uuid.as[String] + Q.enqueue(entry) + } + + } + catch { + case e: Exception => println("Failed parsing consumer message offset=" + msgoffset.offset.toString + " " + msgoffset.message.toString) + } + + if (Q.length % 1000 == 0) { + println("Already enqueued " + Q.length.toString + " news") + } + + while (Q.length > maximumQueueSize) + java.lang.Thread.sleep(1000) + + + + } + } + }) + fetcher_thread.setDaemon(true) + fetcher_thread.start + + override def getNews(): scala.collection.mutable.Map[String, AnyRef] = { + //Wait for news to arrive + while (Q.isEmpty) + java.lang.Thread.sleep(100) + + //Ok + return Q.dequeue() + } + + override def handleAlreadyTagged(uuid: String){ + tagged.add(uuid) + } + +} + + + + + + + diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherRabbitMQ.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherRabbitMQ.scala new file mode 100644 index 0000000..b2bea6d --- /dev/null +++ b/mantis_shrimp/src/main/scala/mantisshrimp/MantisNewsFetcherRabbitMQ.scala @@ -0,0 +1,119 @@ +package mantisshrimp + + +import com.rabbitmq.client._ +import akka.actor.{Actor, Props} +import rapture.io._ +import rapture.core._ +import rapture.json._ +import jsonParsers.scalaJson._ +import scala.collection.mutable +import strategy.throwExceptions +import java.nio.file.{Paths, Files} + +import akka.actor.{Actor, Props, ActorSystem} + + +object RabbitMQConnection { + + private val connection: Connection = null; + + /** + * Return a connection if one doesn't exist. Else create + * a new one + */ + def getConnection(): Connection = { + connection match { + case null => { + val factory = new ConnectionFactory(); + factory.setHost("localhost"); + factory.setUsername("admin"); + factory.setPassword("password"); + factory.newConnection(); + } + case _ => connection + } + } +} + +/** + * Fetching news from RabbitMQ + */ +class MantisNewsFetcherRabbitMQ extends Actor with MantisNewsFetcher { + val queue: String = "mantis_totag" + //Encoding for JSON parsing + implicit val enc = Encodings.`UTF-8` + //Stop fetching thread when exceedes + val maximumQueueSize = 100 + //Queue to store messages + val Q = new mutable.SynchronizedQueue[scala.collection.mutable.Map[String, AnyRef]]() + val rabbitMqMetaData: mutable.HashMap[String, QueueingConsumer.Delivery] = new mutable.HashMap[String, QueueingConsumer.Delivery] + + val connection = RabbitMQConnection.getConnection() + val fetchingChannel = connection.createChannel() + + + // Fetching thread running in background + val fetcher_thread = new java.lang.Thread(new java.lang.Runnable { + def run() { + val consumer = new QueueingConsumer(fetchingChannel) + + fetchingChannel.basicConsume(queue, false, consumer) + + while (true) { + val delivery = consumer.nextDelivery(); + + val msg_raw = new String(delivery.getBody()); + // Try parsing - if format is incorrect write error + try { + + val msg = JsonBuffer.parse(msg_raw) + val uuid = msg.uuid.as[String] + + var entry = scala.collection.mutable.HashMap[String, AnyRef]() + //KafkaActor should act as a filter for garbage. It HAS to parse, and also + //has to improve quality. Those are Unicode decoded from UTF-8! + + entry += "title" -> msg.title.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "summary" -> msg.summary.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "text" -> msg.text.as[String].split("\\r?\\n").map(_.trim).mkString(" ") + entry += "uuid" -> msg.uuid.as[String] + + + rabbitMqMetaData(msg.uuid.as[String]) = delivery + + Q.enqueue(entry) + + + } + catch { + case e: Exception => println("Failed parsing consumer message "+msg_raw) + } + + if (Q.length % 1000 == 0) { + println("Already enqueued " + Q.length.toString + " news") + } + + while (Q.length > maximumQueueSize) + java.lang.Thread.sleep(1000) + } + } + }) + fetcher_thread.setDaemon(true) + fetcher_thread.start + + override def getNews(): scala.collection.mutable.Map[String, AnyRef] = { + //Wait for news to arrive + while (Q.isEmpty) + java.lang.Thread.sleep(100) + + //Ok + return Q.dequeue() + } + + override def handleAlreadyTagged(uuid: String){ + fetchingChannel.basicAck(rabbitMqMetaData(uuid).getEnvelope().getDeliveryTag(), false) + rabbitMqMetaData.remove(uuid) + } + +} diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/MantisTaggerCoordinator.scala b/mantis_shrimp/src/main/scala/mantisshrimp/MantisTaggerCoordinator.scala new file mode 100644 index 0000000..40b962e --- /dev/null +++ b/mantis_shrimp/src/main/scala/mantisshrimp/MantisTaggerCoordinator.scala @@ -0,0 +1,63 @@ +package mantisshrimp + + +import akka.actor.{Props, ActorSystem, Actor} +import scala.collection.mutable + +import monifu.concurrent._ + +/** + * Created by staszek on 4/21/14. + */ + + +/* +* Class that can own several different Taggers + */ +class MantisTaggerCoordinator extends Actor { + + val taggersCount:Int =0 + val kafkaFetchersCounter: Int = 0 + var taggers = List[akka.actor.ActorRef]() + var currentTagger = monifu.concurrent.atomic.AtomicInt(0) + val system = ActorSystem("mantisshrimp") + val kafkaFetcher: akka.actor.ActorRef = + system.actorOf(Props[MantisNewsFetcherRabbitMQ], name = ("kafkaFetcher0") ) + + def start { + //Define number of Taggers + val taggersCount = 5 + //Kakfa fetchers, only one possible + val kafkaFetchersCount = 1 + + // Construct taggers + for(i <- 0 to taggersCount) + taggers = system.actorOf(Props[Mantis7ClassNERTagger], name = ("taggerActor" + i.toString) ) :: taggers + + + // Run flow + kafkaFetcher ! "get_news" + } + + def receive = { + case Tagged(uuid, x) => { + println(uuid + " tagged with " + x.mkString + " from "+sender.path) + kafkaFetcher ! AlreadyTagged(uuid) + } + case ItemArrive(x) => { + //Should tag it with all taggers types. For now just call one in queue + + val currentTaggerLocal = currentTagger.getAndIncrement() % this.taggers.length + + taggers(currentTaggerLocal) ! Tag(x) + + + + sender ! "get_news" + } + case "start" => { + start + } + } + +} \ No newline at end of file diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/main.scala b/mantis_shrimp/src/main/scala/mantisshrimp/main.scala index 884bbef..5774faa 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/main.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/main.scala @@ -11,9 +11,10 @@ import ner._ object Main extends App{ + def runSystem = { val system = ActorSystem("mantisshrimp") - val mantisMaster = system.actorOf(Props[MantisMaster], name = "mantisMaster") + val mantisMaster = system.actorOf(Props[MantisTaggerCoordinator], name = "mantisMaster") mantisMaster ! "start" } diff --git a/mantis_shrimp/src/main/scala/ner/FourClassNERTagger.scala b/mantis_shrimp/src/main/scala/ner/FourClassNERTagger.scala index a2b1bd4..7fb9158 100644 --- a/mantis_shrimp/src/main/scala/ner/FourClassNERTagger.scala +++ b/mantis_shrimp/src/main/scala/ner/FourClassNERTagger.scala @@ -12,7 +12,7 @@ class FourClassNERTagger extends NERTagger { val serializedClassifier = "stanford_classifiers/english.conll.4class.distsim.crf.ser.gz" val classifier = CRFClassifier.getClassifierNoExceptions(serializedClassifier) - def tag(text: String): Seq[MantisTag] = { + override def tag(text: String): Seq[MantisTag] = { val keywords = ListBuffer[(String, String)]() val tag_list = ListBuffer[MantisTag]() diff --git a/mantis_shrimp/src/main/scala/ner/SevenClassNERTagger.scala b/mantis_shrimp/src/main/scala/ner/SevenClassNERTagger.scala index 1715d78..a14f669 100644 --- a/mantis_shrimp/src/main/scala/ner/SevenClassNERTagger.scala +++ b/mantis_shrimp/src/main/scala/ner/SevenClassNERTagger.scala @@ -11,7 +11,7 @@ class SevenClassNERTagger extends NERTagger { val serializedClassifier = "stanford_classifiers/english.muc.7class.distsim.crf.ser.gz" val classifier = CRFClassifier.getClassifierNoExceptions(serializedClassifier) - def tag(text: String): Seq[MantisTag] = { + override def tag(text: String): Seq[MantisTag] = { val keywords = ListBuffer[(String, String)]() val tag_list = ListBuffer[MantisTag]() diff --git a/mantis_shrimp/src/main/scala/ner/ThreeClassNERTagger.scala b/mantis_shrimp/src/main/scala/ner/ThreeClassNERTagger.scala index 2697e00..eba4c0d 100644 --- a/mantis_shrimp/src/main/scala/ner/ThreeClassNERTagger.scala +++ b/mantis_shrimp/src/main/scala/ner/ThreeClassNERTagger.scala @@ -13,7 +13,7 @@ class ThreeClassNERTagger extends NERTagger { val serializedClassifier = "stanford_classifiers/english.all.3class.distsim.crf.ser.gz" val classifier = CRFClassifier.getClassifierNoExceptions(serializedClassifier) - def tag(text: String): Seq[MantisTag] = { + override def tag(text: String): Seq[MantisTag] = { val keywords = ListBuffer[(String, String)]() val tag_list = ListBuffer[MantisTag]() diff --git a/scala_test/Hello.scala b/scala_test/Hello.scala new file mode 100644 index 0000000..a7a9a6a --- /dev/null +++ b/scala_test/Hello.scala @@ -0,0 +1,5 @@ +object Main { + def main(a: Array[String]){ + println("Hello") + } +} \ No newline at end of file diff --git a/tests/spidercrab_1000_test.py b/tests/spidercrab_1000_test.py index 558e8ef..c1d6743 100755 --- a/tests/spidercrab_1000_test.py +++ b/tests/spidercrab_1000_test.py @@ -50,6 +50,7 @@ def run_slave(): files = [ TEMP_SPIDERCRAB_CONFIG, TEMP_SPIDERCRAB_STATS_EXPORT, + DATA_FILE ] NUMBER_OF_SLAVES = 10