Skip to content

Commit

Permalink
Refactoring MantisShrimp, added RabbitMQ, #149, #133
Browse files Browse the repository at this point in the history
  • Loading branch information
kudkudak committed May 16, 2014
1 parent 4048f75 commit 99b3b66
Show file tree
Hide file tree
Showing 20 changed files with 521 additions and 274 deletions.
6 changes: 6 additions & 0 deletions mantis_shrimp/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ lazy val stampleRootProject = Project(id = "mantis_shrimp",base = file("."))

name := "mantis_shrimp"

excludeFilter in unmanagedSources := HiddenFileFilter || "FourClassNERTagger.scala"

excludeFilter in unmanagedSources := HiddenFileFilter || "ThreeClassNERTagger.scala"

version := "1.0"

scalaVersion := "2.10.2"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.rabbitmq" % "amqp-client" % "2.7.1"

libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.1"

libraryDependencies += "org.monifu" %% "monifu-core" % "0.5"
Expand Down
48 changes: 0 additions & 48 deletions mantis_shrimp/scripts/example_usage_kafka.py

This file was deleted.

107 changes: 107 additions & 0 deletions mantis_shrimp/scripts/import_dataset_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python
import pika
import sys

"""
Simple script pushing datasets to kafka
"""

import fnmatch
import os
import sys
from optparse import OptionParser
from nltk.tokenize import *
import logging
import codecs
# Try switching of kafka-python logger
try:
logging.getLogger("pika").setLevel(logging.ERROR)
except:
pass

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer

sys.path.append(os.path.join(os.path.dirname(__file__), '../../don_corleone/'))

from don_utils import get_configuration
import json


#

don_dir = os.path.abspath(os.path.join(__file__, "../../.."))
don_data_dir = os.path.join(don_dir, "data")

def document_to_word_list(text):
""" Return concatenated list of bare words """
text_sent = sent_tokenize(text)
words = [word_tokenize(sentence) for sentence in text_sent]
all_words = []
for words_subset in words: all_words += words_subset #flattening
return all_words

def get_documents(root_dir=os.path.join(don_data_dir, "Reuters115"), encoding="iso-8859-1"):
""" Reads all files in given directory"""
print don_data_dir
matches = []
documents = []
for root, dirnames, filenames in os.walk(root_dir):
for id, filename in enumerate(filenames):
print "Reading id ",id, " filename ",filename
documents.append(open(root+"/"+filename,"r").read().decode(encoding))
return documents

if __name__ == "__main__":

parser = OptionParser()
parser.add_option(
'-r',
'--root_dir',
dest='root_dir',
default=os.path.join(don_data_dir, "Reuters115"),
help='Data directory'
)
parser.add_option(
'-n',
'--name',
dest='name',
default='mock_dataset_1',
help='Dataset name'
)
parser.add_option(
'-e',
'--file_encoding',
dest='file_encoding',
default="iso-8859-1",
help='File encoding - check by unix command file'
)
(options, args) = parser.parse_args()

print "Connecting to ","{0}:{1}".format(get_configuration("kafka","host"),get_configuration("kafka","port"))

credentials = pika.PlainCredentials('admin', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', credentials=credentials))



channel = connection.channel()

channel.queue_declare(queue='mantis_totag')


for id, d in enumerate(get_documents(options.root_dir, options.file_encoding)):

print "Sending ", id

words = document_to_word_list(d)
news = {"uuid":str(id), "title":(u" ".join(words[0:10])).encode("utf-8"), "summary":d.encode("utf8"), "text":""}
message = json.dumps(news).encode("utf-8")

channel.basic_publish(exchange='',
routing_key='mantis_totag',
body=message)


connection.close()
2 changes: 2 additions & 0 deletions mantis_shrimp/src/main/scala/mantisshrimp/AkkaMessages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package mantisshrimp
//Messages for Akka system
case class MantisTag(words: String, tag: String)
case class Tagged(uuid: String, x: Seq[MantisTag])
case class AlreadyTagged(uuid: String)
case class ItemArrive(x: scala.collection.mutable.Map[String, AnyRef])
case class Tag(x: scala.collection.mutable.Map[String, AnyRef])
case class GetType
case class Register(name: String)


object MantisLiterals{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ object DonCorleoneUtils{
// }
//TODO: why reflection fails sometimes?
def get_configuration_string(service_name:String, config_name: String): String = {
val request_url = "get_configuration?service_name="+service_name+"&config_name="+config_name+"&node_id="+this.config.node_id.as[String]
println("Connecting to "+(this.don_url.replaceAll("http://","") / request_url).toString())
val value = JsonBuffer.parse((Http / this.don_url.replace("http://", "") / request_url).slurp[Char]).result
return value.toString()
val request_url = "get_configuration?service_name=" + service_name + "&config_name=" + config_name + "&node_id=" + this.config.node_id.as[String]
println("Connecting to " + (this.don_url.replaceAll("http://", "") / request_url).toString())
val value = JsonBuffer.parse((Http / this.don_url.replace("http://", "") / request_url).slurp[Char])

return value.result.toString()
}

// //Demo functions: TODO: convert to tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package mantisshrimp
import ner.SevenClassNERTagger


class Mantis7ClassNERTagger extends BasicTaggerActor {
class Mantis7ClassNERTagger extends MantisTagger {
val nerTagger = new SevenClassNERTagger()

override def tag(x: scala.collection.mutable.Map[String, AnyRef]): Tuple2[String, Seq[MantisTag]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,38 @@ package mantisshrimp

import akka.actor.Actor

/**
* Created by staszek on 4/21/14.
*/

trait MantisNewsFetcher extends Actor{

/*
* Override in inhertiting classes
*/
def getNews(): scala.collection.mutable.Map[String, AnyRef]

def handleAlreadyTagged(uuid: String): Unit

/**
* Override in inheriting classes
*/
def getType(): String={
return "NewsFetcher"
}

def receive = {
case "get_news" => {
sender ! ItemArrive(getNews())
}
case AlreadyTagged(uuid) => {
handleAlreadyTagged(uuid)
}
}
}


/*
* Basic class for tagger
*/


class BasicTaggerActor extends Actor{
trait MantisTagger extends Actor{

/*
* Override in inhertiting classes
Expand All @@ -25,7 +47,7 @@ class BasicTaggerActor extends Actor{
* Override in inheriting classes
*/
def getType(): String={
return "BasicTagger"
return "Tagger"
}

def receive = {
Expand Down
Loading

0 comments on commit 99b3b66

Please sign in to comment.