Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,19 @@ TEST-**.xml
# Mac OS .DS_Store, which is a file that stores custom attributes of its containing folder
.DS_Store
*.env*

# # Métricas
# /metrics-before-radon
# /metrics-before-pylint
# /metrics-after-pylint
# /metrics-before-codecarbon
# /metrics-after-codecarbon
# /metrics-before-pytest

# extract_metrics_before_radon.py
# extract_metrics_before_pylint.py
# extract_metrics_after_pylint.py
# extract_score_before_pylint.py
# extract_metrics_before_codecarbon.py
# extract_metrics_after_codecarbon.py
# extract_metrics_before_pytest.py
78 changes: 43 additions & 35 deletions bot/exts/backend/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,7 @@ async def on_command_error(self, ctx: Context, e: errors.CommandError) -> None:
)

if isinstance(e, errors.CommandNotFound) and not getattr(ctx, "invoked_from_error_handler", False):
# We might not invoke a command from the error handler, but it's easier and safer to ensure
# this is always set rather than trying to get it exact, and shouldn't cause any issues.
ctx.invoked_from_error_handler = True

# All errors from attempting to execute these commands should be handled by the error handler.
# We wrap non CommandErrors in CommandInvokeError to mirror the behaviour of normal commands.
try:
if await self.try_silence(ctx):
return
if await self.try_run_fixed_codeblock(ctx):
return
await self.try_get_tag(ctx)
except Exception as err:
log.info("Re-handling error raised by command in error handler")
if isinstance(err, errors.CommandError):
await self.on_command_error(ctx, err)
else:
await self.on_command_error(ctx, errors.CommandInvokeError(err))
await self._handle_command_not_found(ctx, e)
elif isinstance(e, errors.UserInputError):
log.debug(debug_message)
await self.handle_user_input_error(ctx, e)
Expand All @@ -124,30 +107,55 @@ async def on_command_error(self, ctx: Context, e: errors.CommandError) -> None:
log.debug(debug_message)
await ctx.send(e)
elif isinstance(e, errors.CommandInvokeError):
if isinstance(e.original, ResponseCodeError):
await self.handle_api_error(ctx, e.original)
elif isinstance(e.original, LockedResourceError):
await ctx.send(f"{e.original} Please wait for it to finish and try again later.")
elif isinstance(e.original, InvalidInfractedUserError):
await ctx.send(f"Cannot infract that user. {e.original.reason}")
elif isinstance(e.original, Forbidden):
try:
await handle_forbidden_from_block(e.original, ctx.message)
except Forbidden:
await self.handle_unexpected_error(ctx, e.original)
else:
await self.handle_unexpected_error(ctx, e.original)
await self._handle_command_invoke_error(ctx, e)
elif isinstance(e, errors.ConversionError):
if isinstance(e.original, ResponseCodeError):
await self.handle_api_error(ctx, e.original)
else:
await self.handle_unexpected_error(ctx, e.original)
await self._handle_conversion_error(ctx, e)
elif isinstance(e, errors.DisabledCommand):
log.debug(debug_message)
else:
# ExtensionError
await self.handle_unexpected_error(ctx, e)

async def _handle_command_not_found(self, ctx: Context, e: errors.CommandError) -> None:
"""Handle CommandNotFound errors by attempting silence, codeblock, or tag commands."""
ctx.invoked_from_error_handler = True

try:
if await self.try_silence(ctx):
return
if await self.try_run_fixed_codeblock(ctx):
return
await self.try_get_tag(ctx)
except Exception as err:
log.info("Re-handling error raised by command in error handler")
if isinstance(err, errors.CommandError):
await self.on_command_error(ctx, err)
else:
await self.on_command_error(ctx, errors.CommandInvokeError(err))

async def _handle_command_invoke_error(self, ctx: Context, e: errors.CommandInvokeError) -> None:
"""Handle CommandInvokeError by dispatching on the underlying exception type."""
if isinstance(e.original, ResponseCodeError):
await self.handle_api_error(ctx, e.original)
elif isinstance(e.original, LockedResourceError):
await ctx.send(f"{e.original} Please wait for it to finish and try again later.")
elif isinstance(e.original, InvalidInfractedUserError):
await ctx.send(f"Cannot infract that user. {e.original.reason}")
elif isinstance(e.original, Forbidden):
try:
await handle_forbidden_from_block(e.original, ctx.message)
except Forbidden:
await self.handle_unexpected_error(ctx, e.original)
else:
await self.handle_unexpected_error(ctx, e.original)

async def _handle_conversion_error(self, ctx: Context, e: errors.ConversionError) -> None:
"""Handle ConversionError by dispatching on the underlying exception type."""
if isinstance(e.original, ResponseCodeError):
await self.handle_api_error(ctx, e.original)
else:
await self.handle_unexpected_error(ctx, e.original)

async def try_silence(self, ctx: Context) -> bool:
"""
Attempt to invoke the silence or unsilence command if invoke with matches a pattern.
Expand Down
154 changes: 110 additions & 44 deletions bot/exts/filtering/_filter_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import typing
from collections.abc import Callable, Coroutine, Iterable
from dataclasses import dataclass, field, replace
from dataclasses import dataclass, field, replace as dataclass_replace
from enum import Enum, auto

import discord
Expand All @@ -25,60 +27,124 @@ class Event(Enum):


@dataclass
class FilterContext:
"""A dataclass containing the information that should be filtered, and output information of the filtering."""

# Input context
event: Event # The type of event
author: User | Member | None # Who triggered the event
channel: TextChannel | VoiceChannel | StageChannel | Thread | DMChannel | None # The channel involved
content: str | Iterable # What actually needs filtering. The Iterable type depends on the filter list.
message: Message | None # The message involved
embeds: list[Embed] = field(default_factory=list) # Any embeds involved
attachments: list[discord.Attachment | FileAttachment] = field(default_factory=list) # Any attachments sent.
class FilterSource:
"""The source/sender metadata for a filtering context."""

event: Event
author: User | Member | None
channel: TextChannel | VoiceChannel | StageChannel | Thread | DMChannel | None
message: Message | None
before_message: Message | None = None
message_cache: MessageCache | None = None
# Output context
dm_content: str = "" # The content to DM the invoker
dm_embed: str = "" # The embed description to DM the invoker
send_alert: bool = False # Whether to send an alert for the moderators
alert_content: str = "" # The content of the alert
alert_embeds: list[Embed] = field(default_factory=list) # Any embeds to add to the alert
action_descriptions: list[str] = field(default_factory=list) # What actions were taken
matches: list[str] = field(default_factory=list) # What exactly was found
notification_domain: str = "" # A domain to send the user for context
filter_info: dict[Filter, str] = field(default_factory=dict) # Additional info from a filter.
messages_deletion: bool = False # Whether the messages were deleted. Can't upload deletion log otherwise.
blocked_exts: set[str] = field(default_factory=set) # Any extensions blocked (used for snekbox)
potential_phish: dict[FilterList, set[str]] = field(default_factory=dict)
# Additional actions to perform


@dataclass
class FilterContent:
"""The content being filtered."""

content: str | Iterable
embeds: list[Embed] = field(default_factory=list)
attachments: list[discord.Attachment | FileAttachment] = field(default_factory=list)


@dataclass
class FilterNotifications:
"""DM and alert content produced by filtering."""

dm_content: str = ""
dm_embed: str = ""
send_alert: bool = False
alert_content: str = ""
alert_embeds: list[Embed] = field(default_factory=list)
notification_domain: str = ""
action_descriptions: list[str] = field(default_factory=list)


@dataclass
class FilterActions:
"""Side effects and deletion metadata produced by filtering."""

additional_actions: list[Callable[[FilterContext], Coroutine]] = field(default_factory=list)
related_messages: set[Message] = field(default_factory=set) # Deletion will include these.
messages_deletion: bool = False
related_messages: set[Message] = field(default_factory=set)
related_channels: set[TextChannel | Thread | DMChannel] = field(default_factory=set)
uploaded_attachments: dict[int, list[str]] = field(default_factory=dict) # Message ID to attachment URLs.
upload_deletion_logs: bool = True # Whether it's allowed to upload deletion logs.
uploaded_attachments: dict[int, list[str]] = field(default_factory=dict)
upload_deletion_logs: bool = True


@dataclass
class FilterResults:
"""Filter match results and tracking data."""

matches: list[str] = field(default_factory=list)
filter_info: dict[Filter, str] = field(default_factory=dict)
blocked_exts: set[str] = field(default_factory=set)
potential_phish: dict[FilterList, set[str]] = field(default_factory=dict)


def __post_init__(self):
# If it's in the context of a DM channel, self.channel won't be None, but self.channel.guild will.
self.in_guild = self.channel is None or self.channel.guild is not None
class FilterContext:
"""A context object containing the information that should be filtered, and output information of the filtering.

Attributes are delegated to sub-objects for organization:
- ``source``: event, author, channel, message, before_message, message_cache
- ``content``: content, embeds, attachments
- ``notifications``: dm_content, dm_embed, send_alert, alert_content, alert_embeds, notification_domain, action_descriptions
- ``actions``: additional_actions, messages_deletion, related_messages, related_channels, uploaded_attachments, upload_deletion_logs
- ``results``: matches, filter_info, blocked_exts, potential_phish
"""

def __init__(self, source, content, notifications=None, actions=None, results=None):
self._source = source
self._content = content
self._notifications = notifications or FilterNotifications()
self._actions = actions or FilterActions()
self._results = results or FilterResults()
self.in_guild = source.channel is None or source.channel.guild is not None

def __getattr__(self, name):
for obj in (self._source, self._content, self._notifications, self._actions, self._results):
if hasattr(obj, name):
return getattr(obj, name)
raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'")

def __setattr__(self, name, value):
if name.startswith('_') or name == 'in_guild':
object.__setattr__(self, name, value)
return
for obj in (self._source, self._content, self._notifications, self._actions, self._results):
if hasattr(obj, name):
setattr(obj, name, value)
return
object.__setattr__(self, name, value)

@classmethod
def from_message(
cls, event: Event, message: Message, before: Message | None = None, cache: MessageCache | None = None
) -> FilterContext:
"""Create a filtering context from the attributes of a message."""
return cls(
event,
message.author,
message.channel,
message.content,
message,
message.embeds,
message.attachments,
before,
cache
)
source = FilterSource(event, message.author, message.channel, message, before, cache)
content = FilterContent(message.content, message.embeds, message.attachments)
return cls(source, content)

def replace(self, **changes) -> FilterContext:
"""Return a new context object assigning new values to the specified fields."""
return replace(self, **changes)
sub_objects = {
'_source': self._source,
'_content': self._content,
'_notifications': self._notifications,
'_actions': self._actions,
'_results': self._results,
}
sub_changes = {}
for key, value in changes.items():
for attr_name, obj in sub_objects.items():
if hasattr(obj, key):
sub_changes.setdefault(attr_name, {})[key] = value
break
return FilterContext(
source=dataclass_replace(self._source, **sub_changes.get('_source', {})),
content=dataclass_replace(self._content, **sub_changes.get('_content', {})),
notifications=dataclass_replace(self._notifications, **sub_changes.get('_notifications', {})),
actions=dataclass_replace(self._actions, **sub_changes.get('_actions', {})),
results=dataclass_replace(self._results, **sub_changes.get('_results', {})),
)
4 changes: 2 additions & 2 deletions bot/exts/filtering/_filter_lists/antispam.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pydis_core.utils import scheduling
from pydis_core.utils.logging import get_logger

from bot.exts.filtering._filter_context import FilterContext
from bot.exts.filtering._filter_context import FilterContent, FilterContext, FilterSource
from bot.exts.filtering._filter_lists.filter_list import ListType, SubscribingAtomicList, UniquesListBase
from bot.exts.filtering._filters.antispam import antispam_filter_types
from bot.exts.filtering._filters.filter import Filter, UniqueFilter
Expand Down Expand Up @@ -158,7 +158,7 @@ async def send_alert(self, antispam_list: AntispamList) -> None:
return

ctx, *other_contexts = self.contexts
new_ctx = FilterContext(ctx.event, ctx.author, ctx.channel, ctx.content, ctx.message)
new_ctx = FilterContext(FilterSource(ctx.event, ctx.author, ctx.channel, ctx.message), FilterContent(ctx.content))
all_descriptions_counts = Counter(reduce(
add, (other_ctx.action_descriptions for other_ctx in other_contexts), ctx.action_descriptions
))
Expand Down
Loading
Loading