9
9
from socketsecurity .core .classes import Comment
10
10
from socketsecurity .core .scm_comments import Comments
11
11
12
- # Declare all globals with initial None values
13
- github_sha : Optional [str ] = None
14
- github_api_url : Optional [str ] = None
15
- github_ref_type : Optional [str ] = None
16
- github_event_name : Optional [str ] = None
17
- github_workspace : Optional [str ] = None
18
- github_repository : Optional [str ] = None
19
- github_ref_name : Optional [str ] = None
20
- github_actor : Optional [str ] = None
21
- default_branch : Optional [str ] = None
22
- github_env : Optional [str ] = None
23
- pr_number : Optional [str ] = None
24
- pr_name : Optional [str ] = None
25
- is_default_branch : bool = False
26
- commit_message : Optional [str ] = None
27
- committer : Optional [str ] = None
28
- gh_api_token : Optional [str ] = None
29
- github_repository_owner : Optional [str ] = None
30
- event_action : Optional [str ] = None
31
-
32
- github_variables = [
33
- "GITHUB_SHA" ,
34
- "GITHUB_API_URL" ,
35
- "GITHUB_REF_TYPE" ,
36
- "GITHUB_EVENT_NAME" ,
37
- "GITHUB_WORKSPACE" ,
38
- "GITHUB_REPOSITORY" ,
39
- "GITHUB_REF_NAME" ,
40
- "DEFAULT_BRANCH" ,
41
- "PR_NUMBER" ,
42
- "PR_NAME" ,
43
- "COMMIT_MESSAGE" ,
44
- "GITHUB_ACTOR" ,
45
- "GITHUB_ENV" ,
46
- "GH_API_TOKEN" ,
47
- "GITHUB_REPOSITORY_OWNER" ,
48
- "EVENT_ACTION"
49
- ]
50
12
51
13
@dataclass
52
14
class GithubConfig :
@@ -67,6 +29,7 @@ class GithubConfig:
67
29
token : str
68
30
owner : str
69
31
event_action : Optional [str ]
32
+ headers : dict
70
33
71
34
@classmethod
72
35
def from_env (cls ) -> 'GithubConfig' :
@@ -76,13 +39,19 @@ def from_env(cls) -> 'GithubConfig':
76
39
log .error ("Unable to get Github API Token from GH_API_TOKEN" )
77
40
sys .exit (2 )
78
41
42
+ repository = os .getenv ('GITHUB_REPOSITORY' , '' )
43
+ owner = os .getenv ('GITHUB_REPOSITORY_OWNER' , '' )
44
+ if '/' in repository :
45
+ owner = repository .split ('/' )[0 ]
46
+ repository = repository .split ('/' )[1 ]
47
+
79
48
return cls (
80
49
sha = os .getenv ('GITHUB_SHA' , '' ),
81
50
api_url = os .getenv ('GITHUB_API_URL' , '' ),
82
51
ref_type = os .getenv ('GITHUB_REF_TYPE' , '' ),
83
52
event_name = os .getenv ('GITHUB_EVENT_NAME' , '' ),
84
53
workspace = os .getenv ('GITHUB_WORKSPACE' , '' ),
85
- repository = os . getenv ( 'GITHUB_REPOSITORY' , '' ). split ( '/' )[ - 1 ] ,
54
+ repository = repository ,
86
55
ref_name = os .getenv ('GITHUB_REF_NAME' , '' ),
87
56
default_branch = os .getenv ('DEFAULT_BRANCH' , '' ).lower () == 'true' ,
88
57
pr_number = os .getenv ('PR_NUMBER' ),
@@ -91,206 +60,140 @@ def from_env(cls) -> 'GithubConfig':
91
60
actor = os .getenv ('GITHUB_ACTOR' , '' ),
92
61
env = os .getenv ('GITHUB_ENV' , '' ),
93
62
token = token ,
94
- owner = os .getenv ('GITHUB_REPOSITORY_OWNER' , '' ),
95
- event_action = os .getenv ('EVENT_ACTION' )
63
+ owner = owner ,
64
+ event_action = os .getenv ('EVENT_ACTION' ),
65
+ headers = {
66
+ 'Authorization' : f"Bearer { token } " ,
67
+ 'User-Agent' : 'SocketPythonScript/0.0.1' ,
68
+ "accept" : "application/json"
69
+ }
96
70
)
97
71
98
72
99
- for env in github_variables :
100
- var_name = env .lower ()
101
- globals ()[var_name ] = os .getenv (env ) or None
102
- if var_name == "default_branch" :
103
- if default_branch is None or default_branch .lower () == "false" :
104
- is_default_branch = False
105
- else :
106
- is_default_branch = True
107
- if var_name != "gh_api_token" :
108
- value = globals ()[var_name ] = os .getenv (env ) or None
109
- log .debug (f"{ env } ={ value } " )
73
+ class Github :
74
+ def __init__ (self , config : Optional [GithubConfig ] = None ):
75
+ self .config = config or GithubConfig .from_env ()
110
76
111
- headers = {
112
- 'Authorization' : f"Bearer { gh_api_token } " ,
113
- 'User-Agent' : 'SocketPythonScript/0.0.1' ,
114
- "accept" : "application/json"
115
- }
77
+ if not self .config .token :
78
+ log .error ("Unable to get Github API Token" )
79
+ sys .exit (2 )
116
80
81
+ def check_event_type (self ) -> str :
82
+ if self .config .event_name .lower () == "push" :
83
+ if not self .config .pr_number :
84
+ return "main"
85
+ return "diff"
86
+ elif self .config .event_name .lower () == "pull_request" :
87
+ if self .config .event_action and self .config .event_action .lower () in ['opened' , 'synchronize' ]:
88
+ return "diff"
89
+ log .info (f"Pull Request Action { self .config .event_action } is not a supported type" )
90
+ sys .exit (0 )
91
+ elif self .config .event_name .lower () == "issue_comment" :
92
+ return "comment"
93
+
94
+ log .error (f"Unknown event type { self .config .event_name } " )
95
+ sys .exit (0 )
96
+
97
+ def post_comment (self , body : str ) -> None :
98
+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/{ self .config .pr_number } /comments"
99
+ payload = json .dumps ({"body" : body })
100
+ do_request (
101
+ path = path ,
102
+ payload = payload ,
103
+ method = "POST" ,
104
+ headers = self .config .headers ,
105
+ base_url = self .config .api_url
106
+ )
117
107
118
- class Github :
119
- commit_sha : str
120
- api_url : str
121
- ref_type : str
122
- event_name : str
123
- workspace : str
124
- repository : str
125
- ref_name : str
126
- default_branch : str
127
- is_default_branch : bool
128
- pr_number : int
129
- pr_name : str
130
- commit_message : str
131
- committer : str
132
- github_env : str
133
- api_token : str
134
- project_id : int
135
- event_action : str
108
+ def update_comment (self , body : str , comment_id : str ) -> None :
109
+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } "
110
+ payload = json .dumps ({"body" : body })
111
+ do_request (
112
+ path = path ,
113
+ payload = payload ,
114
+ method = "PATCH" ,
115
+ headers = self .config .headers ,
116
+ base_url = self .config .api_url
117
+ )
136
118
137
- def __init__ (self ):
138
- self .commit_sha = github_sha
139
- self .api_url = github_api_url
140
- self .ref_type = github_ref_type
141
- self .event_name = github_event_name
142
- self .workspace = github_workspace
143
- self .repository = github_repository
144
- if "/" in self .repository :
145
- self .repository = self .repository .rsplit ("/" )[1 ]
146
- self .branch = github_ref_name
147
- self .default_branch = default_branch
148
- self .is_default_branch = is_default_branch
149
- self .pr_number = pr_number
150
- self .pr_name = pr_name
151
- self .commit_message = commit_message
152
- self .committer = github_actor
153
- self .github_env = github_env
154
- self .api_token = gh_api_token
155
- self .project_id = 0
156
- self .event_action = event_action
157
- if self .api_token is None :
158
- print ("Unable to get Github API Token from GH_API_TOKEN" )
159
- sys .exit (2 )
119
+ def write_new_env (self , name : str , content : str ) -> None :
120
+ with open (self .config .env , "a" ) as f :
121
+ new_content = content .replace ("\n " , "\\ n" )
122
+ f .write (f"{ name } ={ new_content } " )
160
123
161
- @staticmethod
162
- def check_event_type () -> str :
163
- if github_event_name .lower () == "push" :
164
- if pr_number is None or pr_number == "" or pr_number == "0" :
165
- event_type = "main"
166
- else :
167
- event_type = "diff"
168
- elif github_event_name .lower () == "pull_request" :
169
- if event_action is not None and event_action != "" and (
170
- event_action .lower () == "opened" or event_action .lower () == 'synchronize' ):
171
- event_type = "diff"
172
- else :
173
- log .info (f"Pull Request Action { event_action } is not a supported type" )
174
- sys .exit (0 )
175
- elif github_event_name .lower () == "issue_comment" :
176
- event_type = "comment"
124
+ def get_comments_for_pr (self ) -> dict :
125
+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/{ self .config .pr_number } /comments"
126
+ raw_comments = Comments .process_response (
127
+ do_request (path , headers = self .config .headers , base_url = self .config .api_url )
128
+ )
129
+
130
+ comments = {}
131
+ if "error" not in raw_comments :
132
+ for item in raw_comments :
133
+ comment = Comment (** item )
134
+ comments [comment .id ] = comment
135
+ comment .body_list = comment .body .split ("\n " )
177
136
else :
178
- event_type = None
179
- log .error (f"Unknown event type { github_event_name } " )
180
- sys .exit (0 )
181
- return event_type
137
+ log .error (raw_comments )
138
+
139
+ return Comments .check_for_socket_comments (comments )
182
140
183
- @staticmethod
184
141
def add_socket_comments (
185
- security_comment : str ,
186
- overview_comment : str ,
187
- comments : dict ,
188
- new_security_comment : bool = True ,
189
- new_overview_comment : bool = True
142
+ self ,
143
+ security_comment : str ,
144
+ overview_comment : str ,
145
+ comments : dict ,
146
+ new_security_comment : bool = True ,
147
+ new_overview_comment : bool = True
190
148
) -> None :
191
- existing_overview_comment = comments .get ("overview" )
192
- existing_security_comment = comments .get ("security" )
193
149
if new_overview_comment :
194
150
log .debug ("New Dependency Overview comment" )
195
- if existing_overview_comment is not None :
151
+ if overview := comments . get ( "overview" ) :
196
152
log .debug ("Previous version of Dependency Overview, updating" )
197
- existing_overview_comment : Comment
198
- Github .update_comment (overview_comment , str (existing_overview_comment .id ))
153
+ self .update_comment (overview_comment , str (overview .id ))
199
154
else :
200
155
log .debug ("No previous version of Dependency Overview, posting" )
201
- Github .post_comment (overview_comment )
156
+ self .post_comment (overview_comment )
157
+
202
158
if new_security_comment :
203
159
log .debug ("New Security Issue Comment" )
204
- if existing_security_comment is not None :
160
+ if security := comments . get ( "security" ) :
205
161
log .debug ("Previous version of Security Issue comment, updating" )
206
- existing_security_comment : Comment
207
- Github .update_comment (security_comment , str (existing_security_comment .id ))
162
+ self .update_comment (security_comment , str (security .id ))
208
163
else :
209
164
log .debug ("No Previous version of Security Issue comment, posting" )
210
- Github .post_comment (security_comment )
211
-
212
- @staticmethod
213
- def post_comment (body : str ) -> None :
214
- repo = github_repository .rsplit ("/" , 1 )[1 ]
215
- path = f"repos/{ github_repository_owner } /{ repo } /issues/{ pr_number } /comments"
216
- payload = {
217
- "body" : body
218
- }
219
- payload = json .dumps (payload )
220
- do_request (path , payload = payload , method = "POST" , headers = headers , base_url = github_api_url )
221
-
222
- @staticmethod
223
- def update_comment (body : str , comment_id : str ) -> None :
224
- repo = github_repository .rsplit ("/" , 1 )[1 ]
225
- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } "
226
- payload = {
227
- "body" : body
228
- }
229
- payload = json .dumps (payload )
230
- do_request (path , payload = payload , method = "PATCH" , headers = headers , base_url = github_api_url )
165
+ self .post_comment (security_comment )
231
166
232
- @staticmethod
233
- def write_new_env (name : str , content : str ) -> None :
234
- file = open (github_env , "a" )
235
- new_content = content .replace ("\n " , "\\ n" )
236
- env_output = f"{ name } ={ new_content } "
237
- file .write (env_output )
238
-
239
- @staticmethod
240
- def get_comments_for_pr (repo : str , pr : str ) -> dict :
241
- path = f"repos/{ github_repository_owner } /{ repo } /issues/{ pr } /comments"
242
- raw_comments = Comments .process_response (do_request (path , headers = headers , base_url = github_api_url ))
243
- comments = {}
244
- if "error" not in raw_comments :
245
- for item in raw_comments :
246
- comment = Comment (** item )
247
- comments [comment .id ] = comment
248
- for line in comment .body .split ("\n " ):
249
- comment .body_list .append (line )
250
- else :
251
- log .error (raw_comments )
252
- socket_comments = Comments .check_for_socket_comments (comments )
253
- return socket_comments
254
-
255
- @staticmethod
256
- def remove_comment_alerts (comments : dict ):
257
- security_alert = comments .get ("security" )
258
- if security_alert is not None :
259
- security_alert : Comment
167
+ def remove_comment_alerts (self , comments : dict ) -> None :
168
+ if security_alert := comments .get ("security" ):
260
169
new_body = Comments .process_security_comment (security_alert , comments )
261
- Github .handle_ignore_reactions (comments )
262
- Github .update_comment (new_body , str (security_alert .id ))
263
-
264
- @staticmethod
265
- def handle_ignore_reactions (comments : dict ) -> None :
266
- for comment in comments ["ignore" ]:
267
- comment : Comment
268
- if "SocketSecurity ignore" in comment .body :
269
- if not Github .comment_reaction_exists (comment .id ):
270
- Github .post_reaction (comment .id )
271
-
272
- @staticmethod
273
- def post_reaction (comment_id : int ) -> None :
274
- repo = github_repository .rsplit ("/" , 1 )[1 ]
275
- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } /reactions"
276
- payload = {
277
- "content" : "+1"
278
- }
279
- payload = json .dumps (payload )
280
- do_request (path , payload = payload , method = "POST" , headers = headers , base_url = github_api_url )
170
+ self .handle_ignore_reactions (comments )
171
+ self .update_comment (new_body , str (security_alert .id ))
172
+
173
+ def handle_ignore_reactions (self , comments : dict ) -> None :
174
+ for comment in comments .get ("ignore" , []):
175
+ if "SocketSecurity ignore" in comment .body and not self .comment_reaction_exists (comment .id ):
176
+ self .post_reaction (comment .id )
177
+
178
+ def post_reaction (self , comment_id : int ) -> None :
179
+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } /reactions"
180
+ payload = json .dumps ({"content" : "+1" })
181
+ do_request (
182
+ path = path ,
183
+ payload = payload ,
184
+ method = "POST" ,
185
+ headers = self .config .headers ,
186
+ base_url = self .config .api_url
187
+ )
281
188
282
- @staticmethod
283
- def comment_reaction_exists (comment_id : int ) -> bool :
284
- repo = github_repository .rsplit ("/" , 1 )[1 ]
285
- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } /reactions"
189
+ def comment_reaction_exists (self , comment_id : int ) -> bool :
190
+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } /reactions"
286
191
try :
287
- response = do_request (path , headers = headers , base_url = github_api_url )
288
- data = response .json ()
289
- for reaction in data :
290
- content = reaction .get ("content" )
291
- if content is not None and content == ":thumbsup:" :
192
+ response = do_request (path , headers = self .config .headers , base_url = self .config .api_url )
193
+ for reaction in response .json ():
194
+ if reaction .get ("content" ) == ":thumbsup:" :
292
195
return True
293
196
except Exception as error :
294
- log .error (f"Unable to get reaction for { comment_id } for PR { pr_number } " )
197
+ log .error (f"Unable to get reaction for { comment_id } for PR { self . config . pr_number } " )
295
198
log .error (error )
296
199
return False
0 commit comments