-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathscraper.py
executable file
·210 lines (177 loc) · 7.33 KB
/
scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import requests
import datetime
from timeit import default_timer
import pickle
import itertools
import asyncio
from concurrent.futures import ThreadPoolExecutor
import random
import time
from ratelimit import limits, sleep_and_retry
import os
import argparse
"""First we will get all reddit submissions to a given subreddit, in each day. Since there is a limit of 500 results per request, let's hope the number of submissions is smaller than that. All we need is the submission id and content."""
parser = argparse.ArgumentParser()
parser.add_argument("--subreddit",type=str,default="SuicideWatch",help="subreddit to scrape")
opt = parser.parse_args()
datadir = os.getcwd() + "/data/"
dt_curr = int(datetime.datetime(2017,1,1,0,0).timestamp())
dt_last = int(datetime.datetime(2018,1,1,0,0).timestamp())
post_endpoint=' https://api.pushshift.io/reddit/search/submission/'
post_parameters = {
'after': dt_curr,
'before': dt_last,
'fields': ('author','selftext','id','created_utc','num_comments'),
'size': 500,
'sort': 'asc'
}
print('Collecting posts...')
subreddit = opt.subreddit
post_parameters['subreddit'] = subreddit
post2data = dict()
nresults = 100
@sleep_and_retry
@limits(calls=6, period=4)
def fetch(session, post_parameters):
for attempts in range(100):
response = session.get(post_endpoint,params=post_parameters)
if response.status_code != 200:
print('Error at timestamp {}. Retrying...'.format(post_parameters['after']))
else:
return response
outname=datadir + subreddit+'_post2data.pkl'
if os.path.exists(outname):
with open(outname,'rb') as infile:
post2data = pickle.load(infile)
else:
while nresults == 100:
print('Getting posts created after', datetime.datetime.fromtimestamp(post_parameters['after']))
with requests.Session() as session:
response = fetch(session, post_parameters)
nresults = len(response.json()['data'])
for post in response.json()['data']:
post_id = post['id']
del post['id']
post2data[post_id]=post
post_parameters['after'] = post['created_utc']
with open(outname,'wb') as outfile:
pickle.dump(post2data,outfile)
"""Now we will collect the ids of the comments associated with each post."""
import time
base_url = 'https://api.pushshift.io/reddit/submission/comment_ids/'
post_ids = [post_id for post_id, post_data in post2data.items() if post_data['num_comments']>0]
nposts = len(post_ids)
post2comments = dict()
print('Collecting comment ids...')
start_time = default_timer()
@sleep_and_retry
@limits(calls=6, period=6)
def fetch(session, i):
post_id = post_ids[i]
if i%10 == 0:
print('(Elapsed {}s) Processing post # {} of {}'.format(int(default_timer()-start_time),i,nposts) )
for attempts in range(100):
response = session.get(base_url+post_id)
if response.status_code == 200:
break
else:
print('Error (too many requests). Retrying after some random amount of time.')
time.sleep(random.random())
return post_id,response
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=6) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
# Initialize the event loop
loop = asyncio.get_event_loop()
# Use list comprehension to create a list of
# tasks to complete. The executor will run the `fetch`
# function for each csv in the csvs_to_fetch list
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, ind) # Allows us to pass in multiple arguments to `fetch`
)
for ind in range(nposts)
]
# Initializes the tasks to run and awaits their results
for post_id,response in await asyncio.gather(*tasks):
if response.status_code != 200:
print('Error at url {}'.format(response.url))
else:
post2comments[post_id] = response.json()['data']
outname=datadir + subreddit +'_post2comments.pkl'
if os.path.exists(outname):
with open(outname,'rb') as infile:
post2comments = pickle.load(infile)
else:
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
with open(outname,'wb') as outfile:
pickle.dump(post2comments,outfile)
comment_ids = list(itertools.chain.from_iterable(post2comments.values()))
print(len(comment_ids))
print(len(post2comments))
"""Now we will collect the comments."""
comment_endpoint='https://api.pushshift.io/reddit/comment/search'
# comment_parameters = {
# 'fields': ('author','body','link_id','parent_id','id','created_utc'),
# 'sort': 'asc'
# }
print('Collecting comments...')
base_url = comment_endpoint+'?sort=asc&fields=author,body,link_id,parent_id,id,created_utc&ids='
comment2data = dict()
ncomments = len(comment_ids)
start_time = default_timer()
@sleep_and_retry
@limits(calls=6, period=6)
def fetch(session, i):
full_url = base_url+','.join(comment_ids[i:min(i+1000,ncomments)])
if i%10000 == 0:
print('(Elapsed {}s) Processing comment # {} of {}'.format(int(default_timer()-start_time),i,ncomments) )
for attempts in range(100):
response = session.get(full_url)
if response.status_code == 200:
break
else:
print('Error (too many requests). Retrying after some random amount of time.')
time.sleep(random.random())
return response
async def get_data_asynchronous():
with ThreadPoolExecutor(max_workers=6) as executor:
with requests.Session() as session:
# Set any session parameters here before calling `fetch`
# Initialize the event loop
loop = asyncio.get_event_loop()
# Use list comprehension to create a list of
# tasks to complete. The executor will run the `fetch`
# function for each csv in the csvs_to_fetch list
tasks = [
loop.run_in_executor(
executor,
fetch,
*(session, i) # Allows us to pass in multiple arguments to `fetch`
)
for i in range(0,ncomments,1000)
]
# Initializes the tasks to run and awaits their results
for response in await asyncio.gather(*tasks):
if response.status_code != 200:
print('Error at {}-th comment_id ()'.format(i,comment_ids[i]))
else:
for comment in response.json()['data']:
comment_id = comment['id']
del comment['id']
comment2data[comment_id]=comment
outname=datadir + subreddit+'_comment2data.pkl'
if os.path.exists(outname):
with open(outname,'rb') as infile:
comment2data = pickle.load(infile)
else:
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(get_data_asynchronous())
loop.run_until_complete(future)
with open(outname,'wb') as outfile:
pickle.dump(comment2data,outfile)