Skip to content

Commit 8cfd0fc

Browse files
Cron in Batches (#237)
* 1. Changed cron temporarily to run trigger in batches (#236) --------- Co-authored-by: jaanbaaz <[email protected]> Co-authored-by: jaanbaaz <[email protected]>
1 parent 7c8f991 commit 8cfd0fc

File tree

2 files changed

+135
-101
lines changed

2 files changed

+135
-101
lines changed

app.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,9 @@ async def register(discord_userdata):
248248
async def event_handler():
249249
try:
250250
data = await request.json
251-
logger.info(f"Webhook Recieved - {data}")
251+
252+
logger.info(f"Webhook Received - {data}")
253+
252254
secret_key = os.getenv("WEBHOOK_SECRET")
253255

254256
verification_result, error_message = await verify_github_webhook(request,secret_key)
@@ -418,10 +420,12 @@ async def start_scheduler():
418420

419421
@app.route('/trigger-cron')
420422
async def trigger_cron():
423+
from_date = request.args.get('from')
424+
to_date = request.args.get('to')
421425
cronjob = CronJob()
422-
# return await cronjob.main()
423-
asyncio.create_task(cronjob.main())
426+
asyncio.create_task(cronjob.main(from_date, to_date))
424427
return 'cron started'
425428

429+
426430
if __name__ == '__main__':
427431
app.run()

cronjob/cronjob.py

+128-98
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
# from jwt import JWT
66
import os
77
from dotenv import load_dotenv
8-
import logging
98
import sys
109
import time
1110

1211
from sqlalchemy import NullPool
1312
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
1413
from sqlalchemy.orm import sessionmaker
1514
from utils.jwt_generator import GenerateJWT
15+
from utils.logging_file import logger
1616
from utils.new_jwt_generator import NewGenerateJWT
1717

1818
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
@@ -52,7 +52,7 @@ def get_github_jwt(self):
5252
pem_file.close()
5353
return encoded_jwt
5454
except Exception as e:
55-
logging.error(f"In get_github_jwt: {e}")
55+
logger.error(f"In get_github_jwt: {e}")
5656
return None
5757

5858

@@ -79,7 +79,8 @@ async def get_installations(self, token_headers: dict):
7979
elif installations_response.status_code == 401:
8080
if installations_response.json().get("message",
8181
None) == '\'Expiration time\' claim (\'exp\') must be a numeric value representing the future time at which the assertion expires':
82-
logging.info("JWT expired at get_installation stage")
82+
83+
logger.info("JWT expired at get_installation stage")
8384
return -1
8485

8586

@@ -97,7 +98,8 @@ async def get_access_token(self, token_headers: dict, installation_id: str):
9798
elif access_token_response.status_code == 401:
9899
if access_token_response.json().get("message",
99100
None) == '\'Expiration time\' claim (\'exp\') must be a numeric value representing the future time at which the assertion expires':
100-
logging.info("JWT expired at get_access_token stage")
101+
102+
logger.info("JWT expired at get_access_token stage")
101103
return -1
102104
else:
103105
return None
@@ -117,7 +119,8 @@ async def get_repos(self, token: str):
117119
repo_data = repo_response.json()
118120
return repo_data.get('repositories', [])
119121

120-
async def get_issues(self, token: str, since: datetime, repo_fullname: str):
122+
123+
async def get_issues(self, token: str, since: datetime, repo_fullname: str, to_date=None):
121124
page = 1
122125
all_issues = []
123126
while True:
@@ -127,50 +130,70 @@ async def get_issues(self, token: str, since: datetime, repo_fullname: str):
127130
"Authorization": f"Bearer {token}",
128131
"X-GitHub-Api-Version": "2022-11-28"
129132
}
130-
payload = {"labels": "c4gt community",
131-
"since": since.isoformat()}
133+
134+
payload = {
135+
"labels": "c4gt community",
136+
"since": since.isoformat(),
137+
"direction": "asc"
138+
}
132139
async with httpx.AsyncClient() as client:
133-
issues_response = await client.get(url=get_issue_url,
134-
headers=token_headers,
135-
params=payload
136-
)
140+
issues_response = await client.get(url=get_issue_url, headers=token_headers, params=payload)
137141
page_issues = issues_response.json()
138-
if len(page_issues)>0:
142+
143+
# Filter based on created_at if to_date is provided
144+
if to_date:
145+
page_issues = [issue for issue in page_issues if
146+
datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00')) <= to_date]
147+
148+
if len(page_issues) > 0:
149+
all_issues += page_issues
139150
page += 1
140-
all_issues = all_issues + page_issues
141151
else:
142152
break
143-
rate_limit = await self.get_rate_limits(token)
144-
print(rate_limit)
153+
154+
rate_limit = await self.get_rate_limits(token)
155+
print(rate_limit)
156+
145157
return all_issues
146158

147-
async def get_issue_comments(self, issue_comment_url, since: datetime, **kwargs):
148-
page=1
159+
async def get_issue_comments(self, issue_comment_url, since: datetime, to_date=None, **kwargs):
160+
page = 1
161+
149162
all_comments = []
150163
token = kwargs.get("token", None)
151164
while True:
152165
comments_url = f"{issue_comment_url}?state=all&page={page}&per_page=100"
153-
payload = {"since": since.isoformat()}
154-
166+
payload = {
167+
"since": since.isoformat(),
168+
"direction": "asc"
169+
}
170+
155171
async with httpx.AsyncClient() as client:
156-
if token is not None:
172+
if token:
157173
token_headers = {
158174
"Accept": "application/vnd.github+json",
159175
"Authorization": f"Bearer {token}",
160176
"X-GitHub-Api-Version": "2022-11-28"
161177
}
162-
issue_comment_response = await client.get(url=comments_url,
163-
headers=token_headers,
164-
params=payload)
178+
response = await client.get(url=comments_url, headers=token_headers, params=payload)
165179
else:
166-
issue_comment_response = await client.get(url=comments_url,
167-
)
168-
issue_comments_data = issue_comment_response.json()
180+
response = await client.get(url=comments_url, params=payload)
181+
182+
issue_comments_data = response.json()
183+
184+
# Filter based on created_at if to_date is provided
185+
if to_date:
186+
issue_comments_data = [
187+
comment for comment in issue_comments_data
188+
if datetime.fromisoformat(comment['created_at'].replace('Z', '+00:00')) <= to_date
189+
]
190+
169191
if len(issue_comments_data) > 0:
170-
page += 1
171192
all_comments += issue_comments_data
193+
page += 1
172194
else:
173195
break
196+
174197
return all_comments
175198

176199
async def get_pull_requests(self, token: str, repo_fullname, since: datetime):
@@ -198,119 +221,122 @@ async def get_pull_requests(self, token: str, repo_fullname, since: datetime):
198221
return all_prs
199222

200223

201-
async def main(self):
224+
async def main(self, from_date=None, to_date=None):
202225
start_time = time.time()
203-
action_types = ["labeled"]
226+
logger.info(f"Cron triggered")
204227
engine = create_async_engine(get_postgres_uri(), echo=False, poolclass=NullPool)
205-
async_session = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
206-
issue_handler = IssuesHandler()
207-
pr_handler = Pull_requestHandler()
208228
jwt_token = self.jwt_generator.__call__()
209-
# jwt_token = self.get_github_jwt()
229+
210230
jwt_headers = {
211231
"Accept": "application/vnd.github+json",
212232
"Authorization": f"Bearer {jwt_token}",
213233
"X-GitHub-Api-Version": "2022-11-28"
214234
}
235+
215236
installations = await self.get_installations(jwt_headers)
216237
access_tokens = {installation.get('id'): await self.get_access_token(jwt_headers, installation.get('id')) for
217-
installation in installations}
218-
# print(access_tokens)
238+
installation in installations}
239+
240+
219241
all_issue_ids = set()
220242
all_comment_ids = set()
221243
all_pr_id = set()
222244

223-
original_issue = await self.postgres_client.readAll("issues")
224-
original_prs = await self.postgres_client.readAll("pr_history")
225-
original_orgs = await self.postgres_client.readAll("community_orgs")
226-
245+
# Parse date params
246+
since = datetime.now() - timedelta(days=1) # Default to 1 day before
247+
if from_date:
248+
since = datetime.fromisoformat(from_date)
249+
to_date = datetime.fromisoformat(to_date).replace(tzinfo=timezone.utc) if to_date else None
227250

228251
for installation in installations:
229-
time.sleep(1)
230-
# token = await self.get_access_token(jwt_headers, installation.get('id'))
252+
logger.info(f"Installation: ", installation)
231253
token = access_tokens.get(installation.get('id'))
232254
if not token:
233-
print(f"Error in ")
255+
print(f"Error in installation {installation.get('id')}")
234256
continue
257+
235258
repos = await self.get_repos(token)
236259
for repo in repos:
237260
repo_name = repo.get("full_name")
238-
since = (datetime.now() - timedelta(days=1))
239-
issues = await self.get_issues(token, since, repo_name)
240-
241-
# process issues
242-
processed_issues = await self.process_cron_issues(issues,
243-
all_issue_ids,
244-
all_comment_ids,
245-
token=token)
246-
247-
#process prs
248-
pull_requests = await self.get_pull_requests(token,
249-
repo_name,
250-
since)
261+
logger.info(f"Repository: {repo_name}")
262+
issues = await self.get_issues(token, since, repo_name, to_date)
263+
264+
# Pass from_date and to_date to process_cron_issues
265+
processed_issues = await self.process_cron_issues(
266+
issues,
267+
all_issue_ids,
268+
all_comment_ids,
269+
from_date=since,
270+
to_date=to_date,
271+
token=token
272+
)
273+
274+
# pull_requests = await self.get_pull_requests(token, repo_name, since)
275+
# processed_prs = await self.process_cron_prs(pull_requests, all_pr_id)
251276

252-
processed_prs = await self.process_cron_prs(pull_requests, all_pr_id)
253-
print('finished cron')
254-
255-
256-
#purge remaining issues, comments
257277
await self.purge_issues_comments(all_issue_ids, all_comment_ids)
258278

259-
260-
new_issues_length = len(all_issue_ids)
261-
new_prs_length = len(all_pr_id)
262-
new_orgs = await self.postgres_client.readAll("community_orgs")
263-
new_orgs_length = len(new_orgs)
264-
#share report
265-
266-
original_issue_length = len(original_issue)
267-
original_pr_length = len(original_prs)
268-
original_orgs_length = len(original_orgs)
269279
end_time = time.time()
270-
271280
time_taken = end_time - start_time
272-
# await self.send_discord_report(original_issue_length, new_issues_length, original_pr_length, new_prs_length, original_orgs_length, new_orgs_length, time_taken)
273-
274-
async def process_cron_issues(self, issues, issue_ids_list, all_comment_ids, **kwargs):
281+
# await self.send_discord_report(
282+
# len(original_issue),
283+
# len(all_issue_ids),
284+
# len(original_prs),
285+
# len(all_pr_id),
286+
# len(original_orgs),
287+
# len(await self.postgres_client.readAll("community_orgs")),
288+
# time_taken
289+
# )
290+
291+
async def process_cron_issues(self, issues, issue_ids_list, all_comment_ids, from_date=None, to_date=None,
292+
**kwargs):
275293
try:
276294
token = kwargs.get("token", None)
277295
issue_handler = IssuesHandler()
278-
296+
279297
for issue in issues:
280298
try:
281-
time.sleep(1)
299+
logger.info(f"Issue: {issue.get('html_url')}")
282300
issue_ids_list.add(issue["id"])
283301
state = f'{issue["state"]}ed'
284302
state = state.replace('eded', 'ed')
285-
data={
286-
"action":state,
287-
"issue":issue
303+
data = {
304+
"action": state,
305+
"issue": issue
288306
}
289307
if token is not None:
290-
await issue_handler.handle_event(data=data,
291-
postgres_client='client',
292-
token=token)
308+
await issue_handler.handle_event(
309+
data=data,
310+
postgres_client='client',
311+
token=token
312+
)
293313
else:
294-
await issue_handler.handle_event(data=data,
295-
postgres_client='client')
296-
297-
298-
#process issue comments
299-
since = (datetime.now() - timedelta(days=1))
300-
all_comments = await self.get_issue_comments(issue["comments_url"],
301-
since=since,
302-
token=token)
303-
processed_comments = await self.process_cron_issue_comments(issue, all_comments, all_comment_ids)
304-
# rate_limts = await self.get_rate_limits(token)
305-
# print(rate_limts)
314+
await issue_handler.handle_event(
315+
data=data,
316+
postgres_client='client'
317+
)
318+
319+
# Process issue comments with from_date and to_date
320+
since = from_date or (datetime.now() - timedelta(days=1))
321+
all_comments = await self.get_issue_comments(
322+
issue["comments_url"],
323+
since=since,
324+
to_date=to_date,
325+
token=token
326+
)
327+
time.sleep(1)
328+
processed_comments = await self.process_cron_issue_comments(
329+
issue, all_comments, all_comment_ids
330+
)
331+
306332
except Exception as e:
307-
print("Exeption in issue - ", issue)
333+
print("Exception in issue - ", issue, e)
308334
continue
309-
335+
310336
return 'issues processed'
311337

312338
except Exception as e:
313-
print('Exception occured in process_cron_issues ', e)
339+
print('Exception occurred in process_cron_issues:', e)
314340
return e
315341

316342
async def get_issue_data(self, issue_url):
@@ -381,6 +407,7 @@ async def process_cron_prs(self, pull_requests, all_pr_id):
381407
pr_handler = Pull_requestHandler()
382408
for pr in pull_requests:
383409
try:
410+
logger.info(f"PR: {pr.get('html_url')}")
384411
all_pr_id.add(pr["id"])
385412
await pr_handler.handle_event(
386413
data={"action": "closed" if pr["state"] == "close" else "opened",
@@ -466,4 +493,7 @@ async def send_discord_report(self, original_issue_length, new_issues_length, or
466493

467494
if __name__ == '__main__':
468495
cronjob = CronJob()
469-
asyncio.run(cronjob.main())
496+
from_date= "2025-03-01T00:00:00"
497+
to_date = "2025-03-11T00:00:00"
498+
asyncio.run(cronjob.main(from_date,to_date))
499+

0 commit comments

Comments
 (0)