-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsentiment_analyzer.py
97 lines (87 loc) · 4.44 KB
/
sentiment_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from config.conf import DUMP_INTERVAL
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from Location_Service import *
from Aggregator import Aggregator
# Logging setup
import logging
import logstash
from config.conf import logstash_host, logstash_port, kafka_country_tweets_topic, weight
from tweet_stream import TweetStream
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(logstash.TCPLogstashHandler(logstash_host, logstash_port, version=1))
class SentimentAnalyzer:
def __init__(self, tr, tw_stream_config):
log.info('Initializing SentimentAnalyzer')
self.ls = LocationService(DUMP_INTERVAL['dump_interval'])
self.ag = Aggregator(csv_location='data/worldcities.csv', bb_location='data/bounding_box.json')
self.tr = tr
self.tw_stream_config = tw_stream_config
def sentiment_analyzer(self, sentence):
# returns score for a sentence usign vader sentiment analyzer
analyser = SentimentIntensityAnalyzer()
score = analyser.polarity_scores(sentence)
return score['compound']
def compute_sentiment(self, country, city, hashtag):
''' calls Location Service to obtain LAT LON for a coutnry, city
calls Tweet retriver to retrive tweets for a hashtag
computes avg score '''
log.debug(f"computing sentiment for Country {country}, Hashtag {hashtag} pair")
coords = self.ls.get_coordinates_for_city({'city': city, 'country': country})
tweets = self.tr.get_tweets(hashtag, coords[LAT], coords[LON])
compound_sum = 0
if tweets is not None:
num_tweets = len(tweets)
if num_tweets == 0:
log.warning(f'No tweets found! Returning 0 sentiment for Country {country}, City {city}, LAT {coords[LAT]}, LON {coords[LON]} and Hashtag {hashtag}')
return 0
for tweet in tweets:
compound_score = self.sentiment_analyzer(tweet)
compound_sum += compound_score
avg_score = compound_sum / num_tweets
if avg_score <= 0.25 and avg_score >= -0.25:
scaled_score = avg_score * weight
else:
scaled_score = avg_score
return scaled_score
else:
log.warning(f'No tweets found! Returning 0 sentiment for Country {country}, City {city}, LAT {coords[LAT]}, LON {coords[LON]} and Hashtag {hashtag}')
return 0
def compute_sentiment_for_country(self, country_code, hashtag, city="Boulder", using_n_tweets=100,
produce_on_kafka=None):
''' calls Location Service to obtain LAT LON for a coutnry, city
calls Tweet retriver to retrive tweets for a hashtag
computes avg score '''
log.debug(f"computing sentiment for Country {country_code}, Hashtag {hashtag} pair")
country_bounding_box = self.ag.get_country_bb(country_code)
tweets = []
try:
ts = TweetStream(self.tw_stream_config[0], self.tw_stream_config[1], self.tw_stream_config[2], self.tw_stream_config[3])
tweets = ts.get_tweets(bounding_boxes=country_bounding_box, trends=[hashtag], num_tweets=using_n_tweets)
except Exception as ex:
log.error("[Country] Error occurred in retrieving tweets" + str(ex))
if produce_on_kafka is not None and tweets is not None and len(tweets) > 0:
total = int(len(tweets) * (0.1))
if total > 0:
produce_on_kafka.send(kafka_country_tweets_topic, value=tweets[0:total])
else:
log.warning("[Country] - The tweet volume was less that 10")
compound_sum = 0
if tweets is not None:
num_tweets = len(tweets)
if num_tweets == 0:
log.warning(
f'[compute_sentiment_for_country] - No tweets found! Returning 0 sentiment for Country {country_code}, Hashtag {hashtag} pair')
return 0
for tweet in tweets:
compound_score = self.sentiment_analyzer(tweet)
compound_sum += compound_score
avg_score = compound_sum / num_tweets
if avg_score <= 0.25 and avg_score >= -0.25:
scaled_score = avg_score * weight
else:
scaled_score = avg_score
return scaled_score
else:
log.error("[compute_sentiment_for_country] - no tweets from twitter")
return 0