Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
74c6563
initial batching code
valentijnscholten Oct 7, 2025
01842eb
fix dedupe_inside_engagement
valentijnscholten Oct 13, 2025
ccc5ad1
all tests working incl sarif with internal dupes
valentijnscholten Oct 13, 2025
01c4911
cleanup
valentijnscholten Oct 15, 2025
53b2258
deduplication: add more importer unit tests
valentijnscholten Oct 19, 2025
4f6992d
deduplication: add more importer unit tests
valentijnscholten Oct 19, 2025
15a06e6
deduplication: log hash_code_fields_always
valentijnscholten Oct 19, 2025
8bb5292
view_finding: show unique_id_from_tool with hash_code
valentijnscholten Oct 18, 2025
b2ea7eb
view_finding: show unique_id_from_tool with hash_code
valentijnscholten Oct 18, 2025
99bafd3
uncomment tests
valentijnscholten Oct 19, 2025
4d470f0
add more assessments
valentijnscholten Oct 19, 2025
5d2768f
fix duplicate finding links
valentijnscholten Oct 19, 2025
8b272d9
Merge remote-tracking branch 'upstream/dev' into dedupe-batching
valentijnscholten Oct 21, 2025
cdabfea
split per algo, move into new file
valentijnscholten Oct 21, 2025
7f2f661
align logging
valentijnscholten Oct 21, 2025
301c3c3
better method name and param order
valentijnscholten Oct 21, 2025
18db8c9
Merge remote-tracking branch 'upstream/dev' into dedupe-batching
valentijnscholten Oct 21, 2025
e73ac73
ruff apps.py
valentijnscholten Oct 21, 2025
0945279
update task/query counts
valentijnscholten Oct 21, 2025
d9dad18
update comments, parameters names
valentijnscholten Oct 21, 2025
a1da692
finetune uidorhash logic
valentijnscholten Oct 21, 2025
2c6f941
fix tests to import from deduplication.py
valentijnscholten Oct 21, 2025
0efac0c
ruff unit tests
valentijnscholten Oct 21, 2025
76b78d6
simplify base queryset building
valentijnscholten Oct 21, 2025
58d6934
deduplication logic: add cross scanner unique_id tests
valentijnscholten Oct 22, 2025
74a8b2d
hook old per finding dedupe to batch dedupe code
valentijnscholten Oct 22, 2025
95974ca
fix and make uid_or_hash_code matching identical to old dedupe
valentijnscholten Oct 22, 2025
9a876e3
UNIQUE_ID_OR_HASH_CODE: dont stop after one candidate
valentijnscholten Oct 23, 2025
92a92ca
UNIQUE_ID_OR_HASH_CODE: dont stop after one candidate in Batch mode
valentijnscholten Oct 23, 2025
bf04c3c
uid_or_hash_code: fix self/older check
valentijnscholten Oct 23, 2025
a2f4b20
notifications test: replace hardcoded ids with references
valentijnscholten Oct 23, 2025
70031e2
optimize prefetching
valentijnscholten Oct 23, 2025
716e8b7
update query counts in test
valentijnscholten Oct 23, 2025
f347703
Merge remote-tracking branch 'upstream/dev' into dedupe-batching
valentijnscholten Nov 1, 2025
182d5c3
complete merge
valentijnscholten Nov 1, 2025
934cdba
Merge remote-tracking branch 'upstream/dev' into dedupe-batching
valentijnscholten Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
529 changes: 529 additions & 0 deletions dojo/finding/deduplication.py

Large diffs are not rendered by default.

59 changes: 58 additions & 1 deletion dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from time import strftime

from django.conf import settings
from django.db.models import Prefetch
from django.db.models.query_utils import Q
from django.db.models.signals import post_delete, pre_delete
from django.db.utils import IntegrityError
Expand All @@ -17,6 +18,7 @@
from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id
from dojo.endpoint.utils import save_endpoints_to_add
from dojo.file_uploads.helper import delete_related_files
from dojo.finding.deduplication import dedupe_batch_of_findings, do_dedupe_finding
from dojo.models import (
Endpoint,
Endpoint_Status,
Expand All @@ -35,7 +37,6 @@
from dojo.utils import (
calculate_grade,
close_external_issue,
do_dedupe_finding,
do_false_positive_history,
get_current_user,
mass_model_updater,
Expand Down Expand Up @@ -457,6 +458,62 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option
jira_helper.push_to_jira(finding.finding_group)


@dojo_async_task(signature=True)
@app.task
def post_process_findings_batch_signature(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True,
issue_updater_option=True, push_to_jira=False, user=None, **kwargs):
return post_process_findings_batch(finding_ids, dedupe_option, rules_option, product_grading_option,
issue_updater_option, push_to_jira, user, **kwargs)


@dojo_async_task
@app.task
def post_process_findings_batch(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True,
issue_updater_option=True, push_to_jira=False, user=None, **kwargs):

if not finding_ids:
return

system_settings = System_Settings.objects.get()

# use list() to force a complete query execution and related objects to be loaded once
findings = list(
Finding.objects.filter(id__in=finding_ids)
.select_related("test", "test__engagement", "test__engagement__product", "test__test_type")
.prefetch_related(
"endpoints",
# Prefetch duplicates of each new finding to avoid N+1 when set_duplicate iterates
Prefetch(
"original_finding",
queryset=Finding.objects.only("id", "duplicate_finding_id").order_by("-id"),
),
),
)

if not findings:
logger.debug(f"no findings found for batch deduplication with IDs: {finding_ids}")
return

# Batch dedupe with single queries per algorithm; fallback to per-finding for anything else
if dedupe_option and system_settings.enable_deduplication:
dedupe_batch_of_findings(findings)

# Non-status changing tasks
if issue_updater_option:
for finding in findings:
tool_issue_updater.async_tool_issue_update(finding)

if product_grading_option and system_settings.enable_product_grade:
calculate_grade(findings[0].test.engagement.product)

if push_to_jira:
for finding in findings:
if finding.has_jira_issue or not finding.finding_group:
jira_helper.push_to_jira(finding)
else:
jira_helper.push_to_jira(finding.finding_group)


@receiver(pre_delete, sender=Finding)
def finding_pre_delete(sender, instance, **kwargs):
logger.debug("finding pre_delete: %d", instance.id)
Expand Down
59 changes: 30 additions & 29 deletions dojo/importers/default_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,9 @@ def process_findings(
parsed_findings: list[Finding],
**kwargs: dict,
) -> list[Finding]:
# Progressive batching for chord execution
post_processing_task_signatures = []
current_batch_number = 1
max_batch_size = 1024
# Batched post-processing (no chord): dispatch a task per 1000 findings or on final finding
batch_finding_ids: list[int] = []
batch_max_size = 1000

"""
Saves findings in memory that were parsed from the scan report into the database.
Expand Down Expand Up @@ -233,32 +232,34 @@ def process_findings(
finding = self.process_vulnerability_ids(finding)
# Categorize this finding as a new one
new_findings.append(finding)
# all data is already saved on the finding, we only need to trigger post processing

# We create a signature for the post processing task so we can decide to apply it async or sync
# all data is already saved on the finding, we only need to trigger post processing in batches
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)
post_processing_task_signature = finding_helper.post_process_finding_save_signature(
finding,
dedupe_option=True,
rules_option=True,
product_grading_option=False,
issue_updater_option=True,
push_to_jira=push_to_jira,
)

post_processing_task_signatures.append(post_processing_task_signature)

# Check if we should launch a chord (batch full or end of findings)
if we_want_async(async_user=self.user) and post_processing_task_signatures:
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
post_processing_task_signatures,
current_batch_number,
max_batch_size,
is_final_finding,
)
else:
# Execute task immediately for synchronous processing
post_processing_task_signature()
batch_finding_ids.append(finding.id)

# If batch is full or we're at the end, dispatch one batched task
if len(batch_finding_ids) >= batch_max_size or is_final_finding:
finding_ids_batch = list(batch_finding_ids)
batch_finding_ids.clear()
if we_want_async(async_user=self.user):
finding_helper.post_process_findings_batch_signature(
finding_ids_batch,
dedupe_option=True,
rules_option=True,
product_grading_option=True,
issue_updater_option=True,
push_to_jira=push_to_jira,
)()
else:
finding_helper.post_process_findings_batch(
finding_ids_batch,
dedupe_option=True,
rules_option=True,
product_grading_option=True,
issue_updater_option=True,
push_to_jira=push_to_jira,
)

# No chord: tasks are dispatched immediately above per batch

for (group_name, findings) in group_names_to_findings_dict.items():
finding_helper.add_findings_to_auto_group(
Expand Down
58 changes: 31 additions & 27 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ def process_findings(
self.unchanged_items = []
self.group_names_to_findings_dict = {}
# Progressive batching for chord execution
post_processing_task_signatures = []
current_batch_number = 1
max_batch_size = 1024
# No chord: we dispatch per 1000 findings or on the final finding

logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.")
logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings")
Expand All @@ -205,6 +203,9 @@ def process_findings(
continue
cleaned_findings.append(sanitized)

batch_finding_ids: list[int] = []
batch_max_size = 1000

for idx, unsaved_finding in enumerate(cleaned_findings):
is_final = idx == len(cleaned_findings) - 1
# Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report)
Expand Down Expand Up @@ -255,31 +256,34 @@ def process_findings(
finding,
unsaved_finding,
)
# all data is already saved on the finding, we only need to trigger post processing

# Execute post-processing task immediately if async, otherwise execute synchronously
# all data is already saved on the finding, we only need to trigger post processing in batches
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)

post_processing_task_signature = finding_helper.post_process_finding_save_signature(
finding,
dedupe_option=True,
rules_option=True,
product_grading_option=False,
issue_updater_option=True,
push_to_jira=push_to_jira,
)
post_processing_task_signatures.append(post_processing_task_signature)

# Check if we should launch a chord (batch full or end of findings)
if we_want_async(async_user=self.user) and post_processing_task_signatures:
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
post_processing_task_signatures,
current_batch_number,
max_batch_size,
is_final,
)
else:
post_processing_task_signature()
batch_finding_ids.append(finding.id)

# If batch is full or we're at the end, dispatch one batched task
if len(batch_finding_ids) >= batch_max_size or is_final:
finding_ids_batch = list(batch_finding_ids)
batch_finding_ids.clear()
if we_want_async(async_user=self.user):
finding_helper.post_process_findings_batch_signature(
finding_ids_batch,
dedupe_option=True,
rules_option=True,
product_grading_option=True,
issue_updater_option=True,
push_to_jira=push_to_jira,
)()
else:
finding_helper.post_process_findings_batch(
finding_ids_batch,
dedupe_option=True,
rules_option=True,
product_grading_option=True,
issue_updater_option=True,
push_to_jira=push_to_jira,
)

# No chord: tasks are dispatched immediately above per batch

self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items))
# due to #3958 we can have duplicates inside the same report
Expand Down
6 changes: 4 additions & 2 deletions dojo/management/commands/dedupe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

from django.core.management.base import BaseCommand

from dojo.finding.deduplication import (
do_dedupe_finding,
do_dedupe_finding_task,
)
from dojo.models import Finding, Product
from dojo.utils import (
calculate_grade,
do_dedupe_finding,
do_dedupe_finding_task,
get_system_setting,
mass_model_updater,
)
Expand Down
Loading