-
Notifications
You must be signed in to change notification settings - Fork 10
Linter: enable attribute-defined-outside-init rule #528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d6fe487
ff068d9
0e9e975
bae79cb
bd9a33f
141e479
9913bc8
48b2bfd
0c2e7fb
7020ed2
9b94bb6
38e3945
95a11ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,10 +15,11 @@ def update_service_config(connection_manager, res): | |
| logger.debug("Updating blocking, setting blocking to : %s", res["block"]) | ||
| connection_manager.block = bool(res["block"]) | ||
|
|
||
| connection_manager.conf.update( | ||
| endpoints=res.get("endpoints", []), | ||
| last_updated_at=res.get("configUpdatedAt", get_unixtime_ms()), | ||
| blocked_uids=res.get("blockedUserIds", []), | ||
| bypassed_ips=res.get("allowedIPAddresses", []), | ||
| received_any_stats=res.get("receivedAnyStats", True), | ||
| connection_manager.conf.set_endpoints(res.get("endpoints", [])) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replacing a single atomic update with multiple unsynchronized set_* calls can cause other threads to observe partially-updated connection_manager.conf (race condition). Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
| connection_manager.conf.set_last_updated_at( | ||
| res.get("configUpdatedAt", get_unixtime_ms()) | ||
| ) | ||
| connection_manager.conf.set_blocked_user_ids(res.get("blockedUserIds", [])) | ||
| connection_manager.conf.set_bypassed_ips(res.get("allowedIPAddresses", [])) | ||
| if res.get("receivedAnyStats", True): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handling of 'receivedAnyStats' changed from explicitly setting to only enabling when true, which alters prior semantics and can leave the flag unchanged when false. Details🔧 How do I fix it? More info - Comment |
||
| connection_manager.conf.enable_received_stats() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,40 +6,16 @@ | |
| from aikido_zen.helpers.match_endpoints import match_endpoints | ||
|
|
||
|
|
||
| # noinspection PyAttributeOutsideInit | ||
| class ServiceConfig: | ||
| """Class holding the config of the connection_manager""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| endpoints, | ||
| last_updated_at: int, | ||
| blocked_uids, | ||
| bypassed_ips, | ||
| received_any_stats: bool, | ||
| ): | ||
| # Init the class using update function : | ||
| self.update( | ||
| endpoints, last_updated_at, blocked_uids, bypassed_ips, received_any_stats | ||
| ) | ||
|
|
||
| def update( | ||
| self, | ||
| endpoints, | ||
| last_updated_at: int, | ||
| blocked_uids, | ||
| bypassed_ips, | ||
| received_any_stats: bool, | ||
| ): | ||
| self.last_updated_at = last_updated_at | ||
| self.received_any_stats = bool(received_any_stats) | ||
| self.blocked_uids = set(blocked_uids) | ||
| self.set_endpoints(endpoints) | ||
| self.set_bypassed_ips(bypassed_ips) | ||
| def __init__(self): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing the atomic update(...) and replacing it with multiple setters allows partially-applied config changes and introduces thread-safety/race condition risk. Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
| self.endpoints = [] | ||
| self.bypassed_ips = IPMatcher() | ||
| self.blocked_uids = set() | ||
| self.last_updated_at = -1 | ||
| self.received_any_stats = False | ||
|
|
||
| def set_endpoints(self, endpoints): | ||
| """Sets non-graphql endpoints""" | ||
|
|
||
| self.endpoints = [ | ||
| endpoint for endpoint in endpoints if not endpoint.get("graphql") | ||
| ] | ||
|
|
@@ -66,11 +42,20 @@ def get_endpoints(self, route_metadata): | |
| return match_endpoints(route_metadata, self.endpoints) | ||
|
|
||
| def set_bypassed_ips(self, bypassed_ips): | ||
| """Creates an IPMatcher from the given bypassed ip set""" | ||
| """Creates a new IPMatcher from the given bypassed ip set""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment documents what the method does instead of why (explain rationale for creating a new IPMatcher rather than describing mechanics). Details🔧 How do I fix it? More info - Comment |
||
| self.bypassed_ips = IPMatcher() | ||
| for ip in bypassed_ips: | ||
| self.bypassed_ips.add(ip) | ||
|
|
||
| def is_bypassed_ip(self, ip): | ||
| """Checks if the IP is on the bypass list""" | ||
| return self.bypassed_ips.has(ip) | ||
|
|
||
| def set_blocked_user_ids(self, blocked_user_ids): | ||
| self.blocked_uids = set(blocked_user_ids) | ||
|
|
||
| def enable_received_any_stats(self): | ||
| self.received_any_stats = True | ||
|
|
||
| def set_last_updated_at(self, last_updated_at: int): | ||
| self.last_updated_at = last_updated_at | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,7 +50,7 @@ def __init__(self, context_obj=None, body=None, req=None, source=None): | |
| self.parsed_userinput = {} | ||
| self.xml = {} | ||
| self.outgoing_req_redirects = [] | ||
| self.set_body(body) | ||
| self.body = Context.parse_body_object(body) | ||
| self.headers: Headers = Headers() | ||
| self.cookies = {} | ||
| self.query = {} | ||
|
|
@@ -107,26 +107,28 @@ def set_cookies(self, cookies): | |
| self.cookies = cookies | ||
|
|
||
| def set_body(self, body): | ||
| try: | ||
| self.set_body_internal(body) | ||
| except Exception as e: | ||
| logger.debug("Exception occurred whilst setting body: %s", e) | ||
| self.body = Context.parse_body_object(body) | ||
|
|
||
| def set_body_internal(self, body): | ||
| @staticmethod | ||
| def parse_body_object(body): | ||
| """Sets the body and checks if it's possibly JSON""" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring for parse_body_object describes what the function does rather than why (redundant "what" comment). Details🔧 How do I fix it? More info - Comment |
||
| self.body = body | ||
| if isinstance(self.body, (str, bytes)) and len(body) == 0: | ||
| # Make sure that empty bodies like b"" don't get sent. | ||
| self.body = None | ||
| if isinstance(self.body, bytes): | ||
| self.body = self.body.decode("utf-8") # Decode byte input to string. | ||
| if not isinstance(self.body, str): | ||
| return | ||
| if self.body.strip()[0] in ["{", "[", '"']: | ||
| # Might be JSON, but might not have been parsed correctly by server because of wrong headers | ||
| parsed_body = json.loads(self.body) | ||
| if parsed_body: | ||
| self.body = parsed_body | ||
| try: | ||
| if isinstance(body, (str, bytes)) and len(body) == 0: | ||
| # Make sure that empty bodies like b"" don't get sent. | ||
| return None | ||
| if isinstance(body, bytes): | ||
| body = body.decode("utf-8") # Decode byte input to string. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function parameter 'body' is reassigned (body = body.decode(...)), which obscures the original argument value. Details✨ AI Reasoning 🔧 How do I fix it? More info - Comment |
||
| if not isinstance(body, str): | ||
| return body | ||
| if body.strip()[0] in ["{", "[", '"']: | ||
| # Might be JSON, but might not have been parsed correctly by server because of wrong headers | ||
| parsed_body = json.loads(body) | ||
| if parsed_body: | ||
| return parsed_body | ||
| return body | ||
| except Exception as e: | ||
| logger.debug("Exception occurred whilst parsing body: %s", e) | ||
| return body | ||
|
|
||
| def get_route_metadata(self): | ||
| """Returns a route_metadata object""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing explicit ServiceConfig(...) with ServiceConfig() changes initial received_any_stats default and can invert report_initial_stats behavior.
Details
🔧 How do I fix it?
Trace execution paths carefully. Ensure precondition checks happen before using values, validate ranges before checking impossible conditions, and don't check for states that the code has already ruled out.
More info - Comment
@AikidoSec feedback: [FEEDBACK]to get better review comments in the future.