-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtweet_stream.py
116 lines (97 loc) · 4.27 KB
/
tweet_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# Tweet Stream Class
import os
import sys
import json
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
# Logging setup
import logging
import logstash
from config.conf import logstash_host, logstash_port
import time
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.addHandler(logstash.TCPLogstashHandler(logstash_host, logstash_port, version=1))
# global variables for the listener
this = sys.modules[__name__]
this.count = 0
this.num_tweets = 100
this.tweets = []
this.trends = ["#"]
this.st_time = time.time()
class listener(StreamListener):
def on_data(self, data):
if time.time() - this.st_time > 60:
log.error("[Tweet stream] on data timing out. Couldn't find tweets within 60 seconds")
return False
# How many tweets you want to find, could change to time based
if this.count <= this.num_tweets:
data = json.loads(data)
if data is not None and 'text' in data and [trend in data['text'] for trend in this.trends]:
tweet = data["text"]
this.tweets.append(tweet)
this.count += 1
return True
else:
return False
def on_error(self, status):
log.error(status)
def on_timeout(self):
log.error("[Tweet stream] timing out. Couldn't find tweets within 60 seconds")
def on_disconnect(self, notice):
log.error("[Tweet stream] disconnected. Couldn't find tweets within 60 seconds")
class TweetStream:
'''Initialize TweetStream. For security, API keys associated with
your Twitter app should be present in environment variables.
num_tweets = number of tweets you want to get back from calling statuses/filter Twitter endpoint'''
def __init__(self, streamConsumerKey, streamConsumerSecret, streamAccessTokenKey, streamAccessTokenSecret,
num_tweets=this.num_tweets):
log.info(f'Initializing {__name__} with "{num_tweets}" as number of tweets to search')
self.num_tweets = num_tweets
self.tweets = this.tweets
# Load consumer keys from imported configuration.
log.info('Loading comsumer keys from configuration file')
self.consumer_key = streamConsumerKey
self.consumer_secret = streamConsumerSecret
self.access_token = streamAccessTokenKey
self.access_secret = streamAccessTokenSecret
# Ensure API keys are present.
if 0 in [len(self.consumer_key), len(self.consumer_secret)]:
raise EnvironmentError('At least one Twitter API key is not defined in config.conf')
else:
log.info('Twitter API key and secret key loaded')
log.info('Initializing Twitter client')
self.client = self._initialize_client()
if self.client is None:
raise Exception('Failed to initialize TweetStream')
def _initialize_client(self):
'''Initialize the tweepy API client'''
try:
# Initialize our client object with consumer keys.
# Obtain the OAuth2 access token using our consumer keys.
# Re-initialize our client object that stores the access token for later use.
auth = OAuthHandler(self.consumer_key, self.consumer_secret)
auth.set_access_token(self.access_token, self.access_secret)
client = Stream(auth, listener(), timeout=60)
log.info('Successfully initialized Twitter client')
except Exception as ex:
log.error(f'Connection or access token retrievel error: {ex}')
client = None
return client
def get_tweets(self, bounding_boxes, trends, num_tweets):
this.tweets = []
this.num_tweets = num_tweets
this.trends = trends
this.st_time = time.time()
self.trends = this.trends
self.bounding_boxes = bounding_boxes
log.info(
f"Calling statuses/filter with bounding box location {self.bounding_boxes} and tracks (trends) {this.trends}")
try:
this.count = 0
self.client.filter(locations=self.bounding_boxes, track=self.trends)
except Exception as ex:
log.error(f'No tweets extracted; suppressing error: {ex}')
this.tweets = []
return this.tweets