Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patches for issues 41, 45, and 39 #53

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ gitfive/lib/__pycache__/
.DS_STORE
build/
dist/
gitfive.egg-info/
gitfive.egg-info/
venv/
.idea/
1 change: 0 additions & 1 deletion gitfive/gitfive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import sys


def main():
version = sys.version_info
if (version < (3, 10)):
Expand Down
62 changes: 62 additions & 0 deletions gitfive/lib/Testing/test_2FA
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import unittest
from unittest.mock import patch, AsyncMock
from gitfive.lib.objects import Credentials

class Test2FA(unittest.TestCase):
@patch('gitfive.lib.objects.httpx.AsyncClient.get')
@patch('gitfive.lib.objects.httpx.AsyncClient.post')
async def test_login_with_2fa_success(self, mock_post, mock_get):
# initial login page request
mock_get.return_value = AsyncMock(text='<html><form action="/session"><input name="authenticity_token" value="initial_token"/></form></html>')

# successful login submission response with a 2FA challenge
mock_post.side_effect = [
AsyncMock(status_code=302, cookies={"logged_in": ""}, headers={"location": "https://github.com/sessions/two-factor"}),
AsyncMock(status_code=200, cookies={"logged_in": "yes"}) # Mock the 2FA submission response
]

# 2FA page request with the form
mock_get.side_effect = [
AsyncMock(text='<html><form action="/session"><input name="authenticity_token" value="initial_token"/></form></html>'),
AsyncMock(text='<html><form action="/sessions/two-factor"><input name="authenticity_token" value="2fa_token"/></form></html>')
]

# simulate user entering the 2FA code
with patch('gitfive.objects.pwinput', return_value='123456'):
creds = Credentials()
await creds.login()

# verify the session is saved and the logged_in cookie is set to yes
self.assertEqual(creds.session.get("logged_in"), "yes")

# simulate user entering incorrect 2FA code
with patch('gitfive.lib.objects.pwinput', return_value='wrong_code'):
creds = Credentials()
with self.assertRaises(Exception) as context:
await creds.login()

# check correct exception is raised for incorrect 2FA token
self.assertTrue("2FA token is incorrect" in str(context.exception))

@patch('gitfive.lib.objects.httpx.AsyncClient.get')
@patch('gitfive.lib.objects.httpx.AsyncClient.post')
async def test_expired_2fa_token(self, mock_post, mock_get):
"""Test handling of expired 2FA token."""
mock_get.return_value = AsyncMock(text=self.auth_form_html)
# simulate server response for expired 2FA token
mock_post.side_effect = [
AsyncMock(status_code=302, headers={"location": "https://github.com/sessions/two-factor"}), # redirect to 2FA page
AsyncMock(status_code=408) # request Timeout status bc expired token
]

# simulate user entering expired 2FA code
with patch('gitfive.lib.objects.pwinput', return_value='expired_code'):
creds = Credentials()
with self.assertRaises(Exception) as context:
await creds.login()

# check appropriate message indicating token expiration
self.assertTrue("2FA token has expired" in str(context.exception))

if __name__ == '__main__':
unittest.main()
12 changes: 11 additions & 1 deletion gitfive/lib/github.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from bs4 import BeautifulSoup

from time import sleep
import re
import gitfive.config

Expand Down Expand Up @@ -27,8 +27,18 @@ async def create_repo(runner: GitfiveRunner, repo_name: str):
updated_headers["Origin"] = "https://github.com"

req = await runner.as_client.post("https://github.com/repositories", json=data, headers=updated_headers)
# Nonspecific issue - additional bandaid to prevent similar issues in the future.
# Checks for error 500 specifically, then implements a single attempt retry-after block to prevent
# immediate failure.
if req.status_code in [200, 302]:
return True
elif req.status_code == 500:
print(f'Couldn\'t create repo "{repo_name}".\nResponse code : {req.status_code}\nInteral Server Error.')
print("Retrying after 5s...")
sleep(5)
req = await runner.as_client.post("https://github.com/repositories", json=data, headers=updated_headers)
if req.status_code in [200, 302]:
return True
exit(f'Couldn\'t create repo "{repo_name}".\nResponse code : {req.status_code}\nResponse text : {req.text}')


Expand Down
18 changes: 15 additions & 3 deletions gitfive/lib/metamon.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,26 @@ async def start(runner: GitfiveRunner, emails: List[str]):
print(f"[METAMON] 🐙 Added commits in {round(time.time() - part_start, 2)}s !\n")

# Checking if repo is created
# If error 500, prints short summary. Need to test to determine "HTML response text" and exclude it w/ testing.
# Also implements a 5s retry-after instead of default 1s by prepending 4s wait.
# Implements a "500 counter", aborts on 5 failures due to 500 to prevent a hang from infinite loop.
err_ctr = 0
while True:
req = await runner.as_client.get(f"https://github.com/{runner.creds.username}/{temp_repo_name}/settings")
if req.status_code == 200:
break
elif req.status_code == 500:
print("[METAMON] 🐙 Error 500: retrying after 5s.")
err_ctr += 1
if err_ctr >= 5:
print("[METAMON] 🐙 Error 500: Retry exceeded. Unable to push.")
print("[METAMON] 🐙 Error 500: Quitting push attempt. Check authentication/github status?")
break
sleep(4)
sleep(1)

runner.tmprinter.out("[METAMON] 🐙 Pushing...")
repo.git.push('--set-upstream', repo.remote().name, 'mirage')
if err_ctr < 5:
runner.tmprinter.out("[METAMON] 🐙 Pushing...")
repo.git.push('--set-upstream', repo.remote().name, 'mirage')
else:
print("[METAMON] 🐙 No email found in commits.\n")

Expand Down
74 changes: 73 additions & 1 deletion gitfive/lib/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,58 @@ def out(self, text: str):
def clear(self):
self.rc.print(" " * self.max_len, end="\r")

import logging

class TwoFactorAuthentication:
def __init__(self, client):
self.client = client
self.logger = logging.getLogger(__name__)

async def fetch_2fa_form(self, url):
try:
response = await self.client.get(url)
response.raise_for_status()
return response.text
except httpx.HTTPStatusError as e:
self.logger.error(f"HTTP error occurred: {e}")
raise
except httpx.RequestError as e:
self.logger.error(f"Request error occurred: {e}")
raise

def parse_form(self, html_content):
soup = bs(html_content, 'html.parser')
form = soup.find("form", {"action": "/sessions/two-factor"})
if form is None:
self.logger.warning("The form with action '/sessions/two-factor' was not found.")
raise ValueError("The form with action '/sessions/two-factor' was not found in the HTML content.")

authenticity_token_input = form.find("input", {"name": "authenticity_token"})
if authenticity_token_input is None:
self.logger.warning("Authenticity token input not found.")
raise ValueError("The input field with name 'authenticity_token' was not found in the form.")

return authenticity_token_input.attrs["value"]

async def submit_2fa_code(self, authenticity_token, code):
data = {
"authenticity_token": authenticity_token,
"otp": code # might need to be changed depending on the forms requirements
}
try:
response = await self.client.post("https://github.com/sessions/two-factor", data=data)
response.raise_for_status()
if "logged_in" in response.cookies and response.cookies["logged_in"] == "yes":
return True
else:
return False
except httpx.HTTPStatusError as e:
self.logger.error(f"Failed to submit 2FA code, HTTP error: {e}")
raise
except httpx.RequestError as e:
self.logger.error(f"Network error when submitting 2FA code: {e}")
raise

class Credentials():
"""
Manages GitHub authentication.
Expand All @@ -53,6 +105,9 @@ def __init__(self):
self._as_client = httpx.AsyncClient(
# headers=config.headers, timeout=config.timeout, proxies="http://127.0.0.1:8282", verify=False
headers=config.headers, timeout=config.timeout

# self.async_client = async_client
# self.twofa_handler = TwoFactorAuthentication(self.async_client)
)

def load_creds(self):
Expand Down Expand Up @@ -306,7 +361,24 @@ async def login(self, force=False):
# unless we provide the same device_id that initiated 2FA procedure.
req = await self._as_client.get("https://github.com/sessions/two-factor")
body = bs(req.text, 'html.parser')
authenticity_token = body.find("form", {"action": "/sessions/two-factor"}).find("input", {"name": "authenticity_token"}).attrs["value"]

#authenticity_token = body.find("form", {"action": "/sessions/two-factor"}).find("input", {"name": "authenticity_token"}).attrs["value"]

form = body.find("form", {"action": "/sessions/two-factor"})
if form is None:
# Handle the absence of the expected form in the HTML
# This could involve logging an error or raising an exception
raise ValueError("The form with action '/sessions/two-factor' was not found in the HTML content.")

authenticity_token_input = form.find("input", {"name": "authenticity_token"})
if authenticity_token_input is None:
# Handle the absence of the input field within the form
# This could involve logging an error or raising an exception
raise ValueError("The input field with name 'authenticity_token' was not found in the form.")

authenticity_token = authenticity_token_input.attrs["value"]


msg = body.find("form", {"action": "/sessions/two-factor"}).find("div", {"class": "mt-3"}).text.strip().split("\n")[0]
rprint(f'[bold]🗨️ Github :[/bold] [italic]"{msg}"')
otp = pwinput("📱 Code => ")
Expand Down
30 changes: 19 additions & 11 deletions gitfive/lib/xray.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,18 +315,26 @@ async def analyze_ext_contribs(runner: GitfiveRunner):
total_count = data1.get("total_count")
runner.target.nb_ext_contribs = total_count


results = [data1]
if total_count > 100:
data2 = await runner.api.query(f"/search/commits?q=author:{runner.target.username.lower()} -user:{runner.target.username.lower()}&per_page=100&sort=author-date&order=desc")
results.append(data2)

if total_count > 200:
from math import ceil
middle_page = 10 # Max page ("Only the first 1000 search results are available")
if total_count <= 2000:
middle_page = ceil(ceil(total_count/100)/2)
data3 = await runner.api.query(f"/search/commits?q=author:{runner.target.username.lower()} -user:{runner.target.username.lower()}&per_page=100&sort=author-date&order=asc&page={middle_page}")
results.append(data3)
# need to test against private profile
# Reported behavior: Throws type error on comparison
# Expect now: If type is none, pass over all references and continue
# Test results: not tested yet
if total_count is not None:
if total_count > 100:
data2 = await runner.api.query(f"/search/commits?q=author:{runner.target.username.lower()} -user:{runner.target.username.lower()}&per_page=100&sort=author-date&order=desc")
results.append(data2)

if total_count > 200:
from math import ceil
middle_page = 10 # Max page ("Only the first 1000 search results are available")
if total_count <= 2000:
middle_page = ceil(ceil(total_count/100)/2)
data3 = await runner.api.query(f"/search/commits?q=author:{runner.target.username.lower()} -user:{runner.target.username.lower()}&per_page=100&sort=author-date&order=asc&page={middle_page}")
results.append(data3)
else:
raise Exception("User Profile private/not found.")

for data in results:
for item in data.get("items"):
Expand Down