1+ import sys
2+ from urllib .parse import urlparse
3+
4+ import gitpod
5+ import gitpod .lib as util
6+ from gitpod import AsyncGitpod
7+
8+
9+ async def handle_pat_auth (client : AsyncGitpod , user_id : str , runner_id : str , host : str ) -> None :
10+ scm_info = {
11+ "github.com" : {
12+ "create_url" : f"https://{ host } /settings/tokens/new?scopes=repo,workflow,read:user,user:email&description=gitpod" ,
13+ "docs_url" : "https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens" ,
14+ "scopes" : "repo, workflow, read:user, user:email"
15+ },
16+ "gitlab.com" : {
17+ "create_url" : f"https://{ host } /-/user_settings/personal_access_tokens?scopes=api,read_user,read_repository&name=gitpod" ,
18+ "docs_url" : "https://docs.gitlab.com/ee/user/profile/personal_access_tokens.html" ,
19+ "scopes" : "api, read_user, read_repository"
20+ },
21+ "dev.azure.com" : {
22+ "create_url" : None ,
23+ "docs_url" : "https://docs.microsoft.com/en-us/azure/devops/organizations/accounts/use-personal-access-tokens-to-authenticate" ,
24+ "scopes" : "Code (Read & Write)"
25+ }
26+ }.get (host , {
27+ "create_url" : None ,
28+ "docs_url" : "https://www.gitpod.io/docs/flex/source-control" ,
29+ "scopes" : "repository access"
30+ })
31+
32+ print ("\n To create a Personal Access Token:" )
33+ create_url = scm_info ["create_url" ] # type: ignore
34+ if create_url :
35+ print (f"1. Visit: { create_url } " )
36+ else :
37+ print (f"1. Go to { host } > Settings > Developer Settings" )
38+ print (f"2. Create a new token with the following scopes: { scm_info ['scopes' ]} " ) # type: ignore
39+ print (f"3. Copy the generated token" )
40+ print (f"\n For detailed instructions, visit: { scm_info ['docs_url' ]} " ) # type: ignore
41+
42+ pat = input ("\n Enter your Personal Access Token: " ).strip ()
43+ if not pat :
44+ return
45+
46+ await util .set_scm_pat (client , user_id , runner_id , host , pat )
47+
48+ async def verify_context_url (client : AsyncGitpod , context_url : str , runner_id : str ) -> None :
49+ """Verify and handle authentication for a repository context URL."""
50+ host = urlparse (context_url ).hostname
51+ if host is None :
52+ print ("Error: Invalid context URL" )
53+ sys .exit (1 )
54+
55+ resp = await client .users .get_authenticated_user ()
56+ assert resp is not None
57+ user = resp .user
58+ assert user is not None
59+ user_id = user .id
60+ assert user_id is not None
61+
62+ # Main authentication loop
63+ first_attempt = True
64+ while True :
65+ try :
66+ # Try to access the context URL
67+ await client .runners .parse_context_url (context_url = context_url , runner_id = runner_id )
68+ print ("\n ✓ Authentication verified successfully" )
69+ return
70+
71+ except gitpod .APIError as e :
72+ if e .code != "failed_precondition" :
73+ raise e
74+
75+ # Show authentication required message only on first attempt
76+ if first_attempt :
77+ print (f"\n Authentication required for { host } " )
78+ first_attempt = False
79+
80+ # Get authentication options for the host
81+ auth_resp = await client .runners .check_authentication_for_host (
82+ host = host ,
83+ runner_id = runner_id
84+ )
85+
86+ # Handle re-authentication case
87+ if auth_resp .authenticated and not first_attempt :
88+ print ("\n It looks like you are already authenticated." )
89+ if input ("Would you like to re-authenticate? (y/n): " ).lower ().strip () != 'y' :
90+ print ("\n Authentication cancelled" )
91+ sys .exit (1 )
92+ else :
93+ print ("\n Retrying authentication..." )
94+ continue
95+
96+ auth_methods : list [tuple [str , str ]] = []
97+ if auth_resp .authentication_url :
98+ auth_methods .append (("OAuth" , "Recommended" ))
99+ if auth_resp .pat_supported :
100+ auth_methods .append (("Personal Access Token (PAT)" , "" ))
101+
102+ if not auth_methods :
103+ print (f"\n Error: No authentication method available for { host } " )
104+ sys .exit (1 )
105+
106+ # Present authentication options
107+ if len (auth_methods ) > 1 :
108+ print ("\n Available authentication methods:" )
109+ for i , (method , note ) in enumerate (auth_methods , 1 ):
110+ note_text = f" ({ note } )" if note else ""
111+ print (f"{ i } . { method } { note_text } " )
112+
113+ choice = input (f"\n Choose authentication method (1-{ len (auth_methods )} ): " ).strip ()
114+ try :
115+ method_index = int (choice ) - 1
116+ if not 0 <= method_index < len (auth_methods ):
117+ raise ValueError ()
118+ except ValueError :
119+ method_index = 0 # Default to OAuth if invalid input
120+ else :
121+ method_index = 0
122+
123+ # Handle chosen authentication method
124+ chosen_method = auth_methods [method_index ][0 ]
125+ if chosen_method == "Personal Access Token (PAT)" :
126+ await handle_pat_auth (client , user_id , runner_id , host )
127+ else :
128+ print (f"\n Please visit the following URL to authenticate:" )
129+ print (f"{ auth_resp .authentication_url } " )
130+ print ("\n Waiting for authentication to complete..." )
131+ input ("Press Enter after completing authentication in your browser..." )
0 commit comments