diff --git a/.gitignore b/.gitignore index 1cdf995c5f4..857df7a34da 100644 --- a/.gitignore +++ b/.gitignore @@ -153,3 +153,5 @@ docs/.hugo_build.lock # claude etc MEMORY.md .claude/ +CLAUDE.md +CLAUDE.local.md diff --git a/docker-compose.override.integration_tests.yml b/docker-compose.override.integration_tests.yml index a281ae880a8..6ee96a896c4 100644 --- a/docker-compose.override.integration_tests.yml +++ b/docker-compose.override.integration_tests.yml @@ -46,6 +46,12 @@ services: environment: DD_DATABASE_URL: ${DD_TEST_DATABASE_URL:-postgresql://defectdojo:defectdojo@postgres:5432/test_defectdojo} DD_V3_FEATURE_LOCATIONS: ${DD_V3_FEATURE_LOCATIONS:-False} + # Delay deduplication batches so the async_wait integration test can + # deterministically distinguish a blocking join (async_wait) from a + # non-blocking one (async). Scoped by _FILTER to that test's findings so + # other dedupe tests are unaffected. Integration-test stack only; never prod. + DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY: 10 + DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER: "async_wait finding" initializer: environment: PYTHONWARNINGS: error # We are strict about Warnings during testing diff --git a/docs/content/en/open_source/upgrading/2.60.md b/docs/content/en/open_source/upgrading/2.60.md index e7811aa0b99..529dd8ce54c 100644 --- a/docs/content/en/open_source/upgrading/2.60.md +++ b/docs/content/en/open_source/upgrading/2.60.md @@ -2,6 +2,45 @@ title: 'Upgrading to DefectDojo Version 2.60.x' toc_hide: true weight: -20260601 -description: No special instructions. +description: New deduplication execution mode for import/reimport. --- -There are no special instructions for upgrading to 2.60.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.60.0) for the contents of the release. + +## Deduplication execution mode for import/reimport + +This release adds a new `deduplication_execution_mode` setting that controls how +import/reimport deduplication post-processing is dispatched and whether the API +response waits for it. It can be set per user (profile) and overridden per request +on the import and reimport endpoints. + +Modes: + +- `async` (default): deduplication and the rest of post-processing are dispatched + to the background and the response returns immediately. This is the historical + behavior; nothing changes for existing users. +- `async_wait`: post-processing is still dispatched to the background, but the + request waits for deduplication to finish before responding. As a result the + `scan_added` notification and the statistics in the import/reimport response + reflect the deduplicated state (findings that turned out to be duplicates are + no longer counted/listed as new). JIRA push, product grading and other + non-deduplication tasks remain asynchronous and are not awaited. +- `sync`: import deduplication runs inline in the web request. + +The wait in `async_wait` is bounded by the new `DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT` +environment variable (default `60` seconds). If no worker picks up the work within +the timeout, the request responds anyway (degrading to the `async` outcome) rather +than hanging. + +The import/reimport response now also includes a `deduplication_complete` boolean +indicating whether deduplication had finished by the time the response was produced. + +### Relationship to `block_execution` + +The existing `block_execution` profile flag is unchanged. It remains the global +switch that forces **all** of a user's asynchronous tasks (notifications, JIRA +push, product grading, deduplication, ...) to run in the foreground. +`deduplication_execution_mode` is independent and narrower — it only affects +import/reimport deduplication post-processing. A user who has `block_execution` +enabled continues to get fully synchronous imports; the upgrade migration seeds +their `deduplication_execution_mode` to `sync` so behavior is unchanged. + +No action is required to upgrade. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.60.0) for the contents of the release. diff --git a/docs/content/triage_findings/finding_deduplication/about_deduplication.md b/docs/content/triage_findings/finding_deduplication/about_deduplication.md index 7050711dcd1..e8e3cc2ee6a 100644 --- a/docs/content/triage_findings/finding_deduplication/about_deduplication.md +++ b/docs/content/triage_findings/finding_deduplication/about_deduplication.md @@ -108,6 +108,18 @@ The endpoints also have to match for the findings to be considered duplicates, s - Dedupe is triggered on import/reimport and during certain updates run via Celery in the background. +### Import/reimport deduplication execution mode + +For import and reimport you can control how deduplication post-processing is dispatched and whether the API response waits for it. Set it per user on the profile page (**Deduplication execution mode**), or override it per request with the `deduplication_execution_mode` field on the import/reimport endpoints (the request value takes precedence over the profile). + +- `async` (default): deduplication and the rest of post-processing run in the background and the response returns immediately. Historical behavior; the response is produced before findings are deduplicated. +- `async_wait`: post-processing is still dispatched to the background, but the request waits for deduplication to finish before responding. The `scan_added` notification and the statistics in the response then reflect the deduplicated state (findings that turned out to be duplicates are no longer counted/listed as new). JIRA push, product grading and other non-deduplication tasks remain asynchronous and are not awaited. The wait is bounded by `DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT` (default `60` seconds); if no worker picks up the work in time, the request responds anyway rather than hanging. +- `sync`: import deduplication runs inline in the web request. + +The import/reimport response includes a `deduplication_complete` boolean indicating whether deduplication had finished by the time the response was produced (`true` for `sync` and for a completed `async_wait`, `false` for `async`). + +This is independent of the global `block_execution` profile flag, which forces **all** of a user's asynchronous tasks (notifications, JIRA push, product grading, deduplication, ...) to the foreground. When no execution mode is set, `block_execution=True` falls back to `sync`. + ## Service field and its impact - By default, `HASH_CODE_FIELDS_ALWAYS = ["service"]`, meaning the `service` associated with a finding is appended to the hash for all scanners. diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 667dc9c4348..80485f7e8c4 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -24,12 +24,14 @@ from dojo.importers.default_reimporter import DefaultReImporter from dojo.location.models import Location from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_CHOICES, IMPORT_ACTIONS, SEVERITIES, SEVERITY_CHOICES, STATS_FIELDS, App_Analysis, Development_Environment, + Dojo_User, DojoMeta, Endpoint, Engagement, @@ -431,6 +433,16 @@ class CommonImportScanSerializer(serializers.Serializer): allow_null=True, default=None, queryset=User.objects.all(), ) push_to_jira = serializers.BooleanField(default=False) + deduplication_execution_mode = serializers.ChoiceField( + required=False, + allow_null=True, + choices=DEDUPLICATION_EXECUTION_MODE_CHOICES, + help_text="Override how import post-processing (deduplication, jira push, grading, ...) is executed for " + "this request. 'async' dispatches post-processing to the background and responds immediately (default). " + "'async_wait' dispatches to the background but waits for deduplication to finish before responding, so " + "notifications and the returned statistics reflect the deduplicated state. 'sync' runs everything inline. " + "If omitted, falls back to the user's profile setting (deduplication_execution_mode).", + ) environment = serializers.CharField(required=False) build_id = serializers.CharField( required=False, help_text="ID of the build that was scanned.", @@ -476,6 +488,14 @@ class CommonImportScanSerializer(serializers.Serializer): help_text=_("Also referred to as 'Organization' ID."), ) statistics = ImportStatisticsSerializer(read_only=True, required=False) + deduplication_complete = serializers.BooleanField( + read_only=True, + required=False, + help_text="Whether deduplication had finished by the time this response was produced. " + "True for 'sync' and for 'async_wait' when deduplication completed within the timeout; " + "False for 'async' (deduplication is still running in the background) or when an " + "'async_wait' import timed out waiting for it.", + ) pro = serializers.ListField(read_only=True, required=False) apply_tags_to_findings = serializers.BooleanField( help_text="If set to True, the tags will be applied to the findings", @@ -534,6 +554,7 @@ def process_scan( data["product_id"] = test.engagement.product.id data["product_type_id"] = test.engagement.product.prod_type.id data["statistics"] = {"after": test.statistics} + data["deduplication_complete"] = importer.deduplication_complete duration = time.perf_counter() - start_time LargeScanSizeProductAnnouncement(response_data=data, duration=duration) ScanTypeProductAnnouncement(response_data=data, scan_type=context.get("scan_type")) @@ -632,6 +653,14 @@ def setup_common_context(self, data: dict) -> dict: if eng_end_date: context["target_end"] = context.get("engagement_end_date") + # Resolve the effective import execution mode: request override (if any) + # takes precedence over the user's profile setting, otherwise default async. + request = self.context.get("request") + user = getattr(request, "user", None) + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode( + user, data.get("deduplication_execution_mode"), + ) + return context @@ -805,11 +834,11 @@ def process_scan( try: logger.debug(f"process_scan called with context: {context}") start_time = time.perf_counter() + processor = None if test := context.get("test"): statistics_before = test.statistics - context["test"], _, _, _, _, _, test_import = self.get_reimporter( - **context, - ).process_scan( + processor = self.get_reimporter(**context) + context["test"], _, _, _, _, _, test_import = processor.process_scan( context.pop("scan", None), ) if test_import: @@ -821,9 +850,10 @@ def process_scan( # Do not close old findings when creating a brand new test: there are no # existing findings to compare against, and close_old_findings would # incorrectly close findings from other tests in the same scope. - context["test"], _, _, _, _, _, _ = self.get_importer( + processor = self.get_importer( **{**context, "close_old_findings": False}, - ).process_scan( + ) + context["test"], _, _, _, _, _, _ = processor.process_scan( context.pop("scan", None), ) else: @@ -842,6 +872,8 @@ def process_scan( if statistics_delta: data["statistics"]["delta"] = statistics_delta data["statistics"]["after"] = test.statistics + if processor is not None: + data["deduplication_complete"] = processor.deduplication_complete duration = time.perf_counter() - start_time LargeScanSizeProductAnnouncement(response_data=data, duration=duration) ScanTypeProductAnnouncement(response_data=data, scan_type=context.get("scan_type")) diff --git a/dojo/celery_dispatch.py b/dojo/celery_dispatch.py index 38a3b4fa06c..445cf99fc1b 100644 --- a/dojo/celery_dispatch.py +++ b/dojo/celery_dispatch.py @@ -61,10 +61,11 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur - Inject `async_user_id` if missing. - Capture and inject pghistory context if available. - - Respect `force_sync=True` (foreground execution) and user `block_execution`. + - Respect `force_sync=True` (foreground execution) and the user's + block_execution flag. - Respect `force_async=True` (background execution even when the caller - would otherwise run synchronously, e.g. user has `block_execution`). - `force_async` wins over `force_sync` and `block_execution`. + would otherwise run synchronously, e.g. user has block_execution). + `force_async` wins over `force_sync` and block_execution. - Support `countdown=` for async dispatch. Returns: @@ -75,6 +76,11 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur from dojo.decorators import dojo_async_task_counter, we_want_async # noqa: PLC0415 circular import countdown = cast("int", kwargs.pop("countdown", 0)) + # Per-dispatch result storage. The task default is `ignore_result` (global + # CELERY_TASK_IGNORE_RESULT=True), so AsyncResult.get() is a no-op. Callers + # that need to join on the result later (e.g. import 'async_wait' mode) pass + # ignore_result=False to force this one dispatch to store its result. + ignore_result = kwargs.pop("ignore_result", None) injected = _inject_async_user(kwargs) injected = _inject_pghistory_context(injected) @@ -83,7 +89,10 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur if we_want_async(*sig.args, func=getattr(sig, "type", None), **sig_kwargs): # DojoAsyncTask.apply_async tracks async dispatch. Avoid double-counting here. - return sig.apply_async(countdown=countdown) + apply_kwargs = {"countdown": countdown} + if ignore_result is not None: + apply_kwargs["ignore_result"] = ignore_result + return sig.apply_async(**apply_kwargs) # Track foreground execution as a "created task" as well (matches historical dojo_async_task behavior) dojo_async_task_counter.incr(str(sig.task), args=sig.args, kwargs=sig_kwargs) diff --git a/dojo/db_migrations/0273_usercontactinfo_deduplication_execution_mode.py b/dojo/db_migrations/0273_usercontactinfo_deduplication_execution_mode.py new file mode 100644 index 00000000000..cb57c34b2fe --- /dev/null +++ b/dojo/db_migrations/0273_usercontactinfo_deduplication_execution_mode.py @@ -0,0 +1,16 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dojo', '0272_reencrypt_tool_config_credentials_aes_gcm'), + ] + + operations = [ + migrations.AddField( + model_name='usercontactinfo', + name='deduplication_execution_mode', + field=models.CharField(blank=True, choices=[('async', 'Async (do not wait)'), ('async_wait', 'Async, wait for deduplication'), ('sync', 'Synchronous (block)')], help_text="Controls how import/reimport deduplication post-processing is executed. 'Async' dispatches it to the background and returns immediately (default). 'Async, wait for deduplication' dispatches to the background but waits for deduplication to finish before responding, so notifications and statistics reflect the deduplicated state. 'Synchronous' runs the import deduplication inline. Can be overridden per request. Independent of block_execution, which forces all async tasks (notifications, jira, ...) to the foreground.", max_length=20, null=True), + ), + ] diff --git a/dojo/db_migrations/0274_seed_deduplication_execution_mode.py b/dojo/db_migrations/0274_seed_deduplication_execution_mode.py new file mode 100644 index 00000000000..cf9a239c707 --- /dev/null +++ b/dojo/db_migrations/0274_seed_deduplication_execution_mode.py @@ -0,0 +1,30 @@ +from django.db import migrations + + +def seed_deduplication_execution_mode(apps, schema_editor): + """ + Seed the new import deduplication execution mode from the legacy block_execution flag. + + block_execution remains the global "run all async tasks in the foreground" switch; + users who had it enabled get the synchronous deduplication mode so import behavior is + unchanged for them. + """ + UserContactInfo = apps.get_model("dojo", "UserContactInfo") + UserContactInfo.objects.filter(block_execution=True).update(deduplication_execution_mode="sync") + + +def unseed_deduplication_execution_mode(apps, schema_editor): + """Reverse: clear the seeded synchronous mode.""" + UserContactInfo = apps.get_model("dojo", "UserContactInfo") + UserContactInfo.objects.filter(deduplication_execution_mode="sync").update(deduplication_execution_mode=None) + + +class Migration(migrations.Migration): + + dependencies = [ + ('dojo', '0273_usercontactinfo_deduplication_execution_mode'), + ] + + operations = [ + migrations.RunPython(seed_deduplication_execution_mode, unseed_deduplication_execution_mode), + ] diff --git a/dojo/engagement/ui/views.py b/dojo/engagement/ui/views.py index 475ff9cf5f9..aeca7c2ad99 100644 --- a/dojo/engagement/ui/views.py +++ b/dojo/engagement/ui/views.py @@ -948,6 +948,10 @@ def process_form( "create_finding_groups_for_all_findings": form.cleaned_data.get("create_finding_groups_for_all_findings", None), "environment": self.get_development_environment(environment_name=form.cleaned_data.get("environment")), }) + # Honor the user's profile deduplication_execution_mode for UI imports. The API resolves + # this in the serializer; the UI has no per-import selector, so fall back to the profile + # (or block_execution) instead of silently defaulting to async. + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode(request.user) # Create the engagement if necessary self.create_engagement(context) # close_old_findings_product_scope is a modifier of close_old_findings. diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index d83d032b176..8ce40ed6765 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -2,7 +2,7 @@ from contextlib import suppress from datetime import datetime from itertools import batched -from time import strftime +from time import sleep, strftime from django.conf import settings from django.db import transaction @@ -470,6 +470,20 @@ def post_process_findings_batch( force_sync=False, **kwargs, ): + # Test-only hook: when DEDUPLICATION_BATCH_PROCESS_TEST_DELAY > 0 (set only in + # the integration-test stack) block this batch so the async_wait integration + # test can deterministically distinguish 'async_wait' (which joins on this + # task) from 'async' (which does not). Default 0 -> no effect in production. + # DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER (a finding-title prefix) scopes + # the delay to that one test's findings so unrelated dedupe tests are not slowed. + if (test_delay := settings.DEDUPLICATION_BATCH_PROCESS_TEST_DELAY) > 0: + delay_filter = settings.DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER + if not delay_filter or Finding.objects.filter(id__in=finding_ids, title__istartswith=delay_filter).exists(): + logger.warning( + "post_process_findings_batch: TEST-ONLY delay of %ss for %d finding(s) (filter=%r)", + test_delay, len(finding_ids) if finding_ids else 0, delay_filter, + ) + sleep(test_delay) logger.debug( f"post_process_findings_batch called: finding_ids_count={len(finding_ids) if finding_ids else 0}, " diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index fe015b610b0..1df5ad6d931 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -16,6 +16,8 @@ from dojo.jira.services import is_keep_in_sync from dojo.location.models import Location from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, # Import History States IMPORT_CLOSED_FINDING, IMPORT_CREATED_FINDING, @@ -81,6 +83,85 @@ def __init__( self.pending_vulnerability_ids: list[Vulnerability_Id] = [] self.pending_vuln_id_deletes: list[int] = [] self.pending_burp_rr: list[BurpRawRequestResponse] = [] + # Handles for async post-processing tasks to await in 'async_wait' mode. + # Set after ImporterOptions.__init__ so it stays out of field_names + # (and the compress/decompress cycle used for async dispatch). + self.post_processing_results = [] + # Whether deduplication is known to be finished by the time the response + # is built. True for 'sync' (ran inline) and for 'async_wait' when all + # batches completed within the timeout; False for 'async' (dispatched, + # not awaited) or when an 'async_wait' join timed out/errored. + self.deduplication_complete = False + + def post_processing_dispatch_kwargs(self, **kwargs): + """ + Translate the resolved import execution mode into the force flags that + dojo_dispatch_task understands: + - SYNC: run inline in the web process (force_sync). + - ASYNC_WAIT: guarantee background dispatch (force_async) so we get a + handle to await, regardless of the user's profile mode. + - ASYNC (default): preserve historical behavior, honoring any externally + supplied force_sync and the user's sync mode via we_want_async. + """ + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_SYNC: + return {"force_sync": True} + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + return {"force_async": True} + return {"force_sync": kwargs.get("force_sync", False)} + + def record_post_processing_result(self, result): + """ + Remember an async post-processing dispatch handle so it can be awaited + later when running in the 'async_wait' execution mode. No-op for the + other modes (no handle is recorded by the caller). + """ + if not hasattr(self, "post_processing_results"): + self.post_processing_results = [] + if result is not None: + self.post_processing_results.append(result) + + def wait_for_post_processing(self): + """ + Block until the deduplication (and other batch) post-processing tasks + dispatched during this import have finished, so notifications and the + returned statistics reflect the deduplicated state. + + Only relevant in the 'async_wait' execution mode; bounded by + settings.DEDUPLICATION_ASYNC_WAIT_TIMEOUT so a stuck/missing worker degrades + to the historical (respond-anyway) behavior instead of hanging. + """ + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_SYNC: + # Batches ran inline during process_findings, so dedup is already done. + self.deduplication_complete = True + return + if self.deduplication_execution_mode != DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + # 'async': post-processing was dispatched but is not awaited. + self.deduplication_complete = False + return + results = getattr(self, "post_processing_results", None) or [] + if not results: + # Nothing was dispatched (e.g. empty import) — dedup is trivially done. + self.deduplication_complete = True + return + timeout = getattr(settings, "DEDUPLICATION_ASYNC_WAIT_TIMEOUT", 60) + logger.debug("async_wait: waiting for %d post-processing task(s) (timeout=%ss)", len(results), timeout) + start = time.monotonic() + success = True + for result in results: + if result is None or not hasattr(result, "get"): + continue + try: + result.get(timeout=timeout, propagate=False) + except Exception as e: + logger.warning( + "async_wait: error/timeout after %.2fs waiting for post-processing task: %s", + time.monotonic() - start, e, + ) + success = False + elapsed = time.monotonic() - start + logger.debug("async_wait: waited %.2fs for %d post-processing task(s) (success=%s)", elapsed, len(results), success) + self.deduplication_complete = success + self.post_processing_results = [] def check_child_implementation_exception(self): """ @@ -880,10 +961,49 @@ def notify_scan_added( new_findings = [] logger.debug("Scan added notifications") + # When deduplication has finished (synchronous mode, or async_wait after the + # join), the in-memory findings still carry their pre-dedup duplicate=False + # flag because deduplication runs on separately-fetched instances. Refresh the + # flag from the database and split each list into "real" and duplicate findings + # so the notification reflects post-dedup reality instead of counting/listing + # deduplicated findings as brand new. In plain async mode dedup has not run yet, + # so we leave the lists untouched (best-effort, historical behavior). + findings_new_duplicate: list[Finding] = [] + findings_reactivated_duplicate: list[Finding] = [] + findings_untouched_duplicate: list[Finding] = [] + if getattr(self, "deduplication_complete", False): + all_ids = [f.id for f in (*new_findings, *findings_reactivated, *findings_untouched)] + duplicate_ids = set() + if all_ids: + duplicate_ids = set( + Finding.objects.filter(id__in=all_ids, duplicate=True).values_list("id", flat=True), + ) + + def _split(findings): + kept, duplicates = [], [] + for finding in findings: + if finding.id in duplicate_ids: + # refresh the in-memory flag so any template logic is correct + finding.duplicate = True + duplicates.append(finding) + else: + kept.append(finding) + return kept, duplicates + + new_findings, findings_new_duplicate = _split(new_findings) + findings_reactivated, findings_reactivated_duplicate = _split(findings_reactivated) + findings_untouched, findings_untouched_duplicate = _split(findings_untouched) + # Recompute the headline count to exclude findings that turned out to be + # duplicates of an existing finding (they are not genuinely new activity). + updated_count = len(new_findings) + len(findings_reactivated) + len(findings_mitigated) + new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) + findings_new_duplicate = sorted(findings_new_duplicate, key=lambda x: x.numerical_severity) + findings_reactivated_duplicate = sorted(findings_reactivated_duplicate, key=lambda x: x.numerical_severity) + findings_untouched_duplicate = sorted(findings_untouched_duplicate, key=lambda x: x.numerical_severity) title = ( f"Created/Updated {updated_count} findings for {test.engagement.product}: {test.engagement.name}: {test}" @@ -900,6 +1020,11 @@ def notify_scan_added( engagement=test.engagement, product=test.engagement.product, findings_untouched=findings_untouched, + # Findings deduplicated during post-processing, split by their import action. + # Populated only once deduplication has completed (sync / async_wait). + findings_new_duplicate=findings_new_duplicate, + findings_reactivated_duplicate=findings_reactivated_duplicate, + findings_untouched_duplicate=findings_untouched_duplicate, url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,)), ) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 6cdcb699024..377cee94a51 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -12,6 +12,7 @@ from dojo.importers.options import ImporterOptions from dojo.jira import services as jira_services from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Engagement, Finding, Test, @@ -134,6 +135,10 @@ def process_scan( new_findings=new_findings, closed_findings=closed_findings, ) + # In 'async_wait' mode, block until background deduplication has finished + # so notifications and statistics reflect the deduplicated state. + self.wait_for_post_processing() + # Send out some notifications to the user logger.debug("IMPORT_SCAN: Generating notifications") dojo_dispatch_task( @@ -292,7 +297,7 @@ def _process_findings_internal( batch_finding_ids.clear() logger.debug("process_findings: dispatching batch with push_to_jira=%s (batch_size=%d, is_final=%s)", push_to_jira, len(finding_ids_batch), is_final_finding) - dojo_dispatch_task( + result = dojo_dispatch_task( finding_helper.post_process_findings_batch, finding_ids_batch, dedupe_option=True, @@ -300,8 +305,13 @@ def _process_findings_internal( product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira, - force_sync=kwargs.get("force_sync", False), + # 'async_wait' joins on this dispatch via AsyncResult.get(), so its + # result must be stored despite the global CELERY_TASK_IGNORE_RESULT. + **({"ignore_result": False} if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT else {}), + **self.post_processing_dispatch_kwargs(**kwargs), ) + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + self.record_post_processing_result(result) # No chord: tasks are dispatched immediately above per batch diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 9defa8352f2..70bb9007250 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -18,6 +18,7 @@ from dojo.importers.options import ImporterOptions from dojo.jira import services as jira_services from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Development_Environment, Finding, Notes, @@ -142,6 +143,9 @@ def process_scan( ) # Send out som notifications to the user logger.debug("REIMPORT_SCAN: Generating notifications") + # In 'async_wait' mode, block until background deduplication has finished + # so notifications and statistics reflect the deduplicated state. + self.wait_for_post_processing() updated_count = ( len(closed_findings) + len(reactivated_findings) + len(new_findings) ) @@ -458,7 +462,7 @@ def _process_findings_internal( batch_findings.clear() finding_ids_batch = list(batch_finding_ids) batch_finding_ids.clear() - dojo_dispatch_task( + result = dojo_dispatch_task( finding_helper.post_process_findings_batch, finding_ids_batch, dedupe_option=True, @@ -467,8 +471,13 @@ def _process_findings_internal( issue_updater_option=True, push_to_jira=push_to_jira, jira_instance_id=getattr(self.jira_instance, "id", None), - force_sync=kwargs.get("force_sync", False), + # 'async_wait' joins on this dispatch via AsyncResult.get(), so its + # result must be stored despite the global CELERY_TASK_IGNORE_RESULT. + **({"ignore_result": False} if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT else {}), + **self.post_processing_dispatch_kwargs(**kwargs), ) + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + self.record_post_processing_result(result) # No chord: tasks are dispatched immediately above per batch diff --git a/dojo/importers/options.py b/dojo/importers/options.py index 02cecff1113..d25f1a68b29 100644 --- a/dojo/importers/options.py +++ b/dojo/importers/options.py @@ -12,6 +12,9 @@ from dojo.jira.services import get_instance as get_jira_instance from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_SYNC, + DEDUPLICATION_EXECUTION_MODES, Development_Environment, Dojo_User, Endpoint, @@ -68,6 +71,7 @@ def load_base_options( self.engagement: Engagement | None = self.validate_engagement(*args, **kwargs) self.environment: Development_Environment | None = self.validate_environment(*args, **kwargs) self.group_by: str = self.validate_group_by(*args, **kwargs) + self.deduplication_execution_mode: str = self.validate_deduplication_execution_mode(*args, **kwargs) self.import_type: str = self.validate_import_type(*args, **kwargs) self.lead: Dojo_User | None = self.validate_lead(*args, **kwargs) self.minimum_severity: str = self.validate_minimum_severity(*args, **kwargs) @@ -345,6 +349,25 @@ def validate_do_not_reactivate( **kwargs, ) + def validate_deduplication_execution_mode( + self, + *args: list, + **kwargs: dict, + ) -> str: + mode = self.validate( + "deduplication_execution_mode", + expected_types=[str], + required=False, + default=DEDUPLICATION_EXECUTION_MODE_ASYNC, + **kwargs, + ) + if mode not in DEDUPLICATION_EXECUTION_MODES: + mode = DEDUPLICATION_EXECUTION_MODE_ASYNC + # An explicit force_sync from a non-serializer caller still wins. + if kwargs.get("force_sync"): + mode = DEDUPLICATION_EXECUTION_MODE_SYNC + return mode + def validate_commit_hash( self, *args: list, diff --git a/dojo/middleware.py b/dojo/middleware.py index a576244312a..127ab462a09 100644 --- a/dojo/middleware.py +++ b/dojo/middleware.py @@ -281,8 +281,8 @@ def _drain_search_context_to_async(objects, source): for model_name, pk_list in model_groups.items(): batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)] # force_async=True keeps indexing off the request path even for users - # with block_execution=True — index updates are slow and never need - # to be synchronous from the user's perspective. + # with block_execution=True — index updates are slow and + # never need to be synchronous from the user's perspective. for i, batch in enumerate(batches, 1): logger.debug(f"{source}: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances") dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch, force_async=True) diff --git a/dojo/models.py b/dojo/models.py index c4d3d0aaa68..a58fa3c5a74 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -123,7 +123,16 @@ def __call__(self, model_instance, filename): from dojo.regulations.models import Regulation # noqa: E402, F401, I001 -- re-export; user/system_settings block intentionally out-of-order (load-order) -from dojo.user.models import Contact, Dojo_User, UserContactInfo # noqa: E402, F401 -- must precede system_settings (middleware load-order) +from dojo.user.models import ( # noqa: E402, F401 -- must precede system_settings (middleware load-order) + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_CHOICES, + DEDUPLICATION_EXECUTION_MODE_SYNC, + DEDUPLICATION_EXECUTION_MODES, + Contact, + Dojo_User, + UserContactInfo, +) from dojo.system_settings.models import System_Settings # noqa: E402, F401 -- re-export diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 3fcf12b78d4..735d7fa67b5 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -96,6 +96,16 @@ DD_CELERY_BROKER_PARAMS=(str, ""), DD_CELERY_BROKER_TRANSPORT_OPTIONS=(str, ""), DD_CELERY_TASK_IGNORE_RESULT=(bool, True), + # Max seconds the 'async_wait' deduplication execution mode will wait for + # background deduplication/post-processing to finish before responding anyway. + DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT=(int, 60), + # Test-only: artificial delay (seconds) injected at the start of + # post_process_findings_batch so integration tests can deterministically + # observe that 'async_wait' blocks on deduplication while 'async' does not. + # Must stay 0 in production. The _FILTER (a finding-title prefix) scopes the + # delay to a single test's findings so unrelated dedupe tests are not slowed. + DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY=(int, 0), + DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER=(str, ""), DD_CELERY_RESULT_BACKEND=(str, "django-db"), DD_CELERY_RESULT_EXPIRES=(int, 86400), DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")), @@ -862,6 +872,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param params=env("DD_CELERY_BROKER_PARAMS"), ) CELERY_TASK_IGNORE_RESULT = env("DD_CELERY_TASK_IGNORE_RESULT") +DEDUPLICATION_ASYNC_WAIT_TIMEOUT = env("DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT") +DEDUPLICATION_BATCH_PROCESS_TEST_DELAY = env("DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY") +DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER = env("DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY_FILTER") CELERY_RESULT_BACKEND = env("DD_CELERY_RESULT_BACKEND") CELERY_TIMEZONE = TIME_ZONE CELERY_RESULT_EXPIRES = env("DD_CELERY_RESULT_EXPIRES") diff --git a/dojo/templates/dojo/view_user.html b/dojo/templates/dojo/view_user.html index 4f1d0a3b04d..75c30fd6edf 100644 --- a/dojo/templates/dojo/view_user.html +++ b/dojo/templates/dojo/view_user.html @@ -288,6 +288,12 @@

+ + {% trans "Deduplication execution mode" %} + + {{ user.usercontactinfo.get_deduplication_execution_mode_display|default:_("Async (do not wait)") }} + + {% trans "Date Joined" %} {{ user.date_joined }} diff --git a/dojo/templates_classic/dojo/view_user.html b/dojo/templates_classic/dojo/view_user.html index e1fe9917722..ed9e90ee8cc 100644 --- a/dojo/templates_classic/dojo/view_user.html +++ b/dojo/templates_classic/dojo/view_user.html @@ -288,6 +288,12 @@

+ + {% trans "Deduplication execution mode" %} + + {{ user.usercontactinfo.get_deduplication_execution_mode_display|default:_("Async (do not wait)") }} + + {% trans "Date Joined" %} {{ user.date_joined }} diff --git a/dojo/templates_classic/notifications/mail/scan_added.tpl b/dojo/templates_classic/notifications/mail/scan_added.tpl index 263585246e0..567d80ee47e 100644 --- a/dojo/templates_classic/notifications/mail/scan_added.tpl +++ b/dojo/templates_classic/notifications/mail/scan_added.tpl @@ -26,6 +26,17 @@ {% endfor %}

+ {% if findings_new_duplicate %} +

+

+ {% blocktranslate %}New findings detected as duplicates{% endblocktranslate %} ({{ findings_new_duplicate | length }})
+ {% for finding in findings_new_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% blocktranslate %}Reactivated findings{% endblocktranslate %} ({{ findings_reactivated | length }})
@@ -37,6 +48,17 @@ {% endfor %}

+ {% if findings_reactivated_duplicate %} +

+

+ {% blocktranslate %}Reactivated findings detected as duplicates{% endblocktranslate %} ({{ findings_reactivated_duplicate | length }})
+ {% for finding in findings_reactivated_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% blocktranslate %}Closed findings{% endblocktranslate %} ({{ findings_mitigated | length }})
@@ -59,6 +81,17 @@ {% endfor %}

+ {% if findings_untouched_duplicate %} +

+

+ {% blocktranslate %}Existing findings detected as duplicates{% endblocktranslate %} ({{ findings_untouched_duplicate | length }})
+ {% for finding in findings_untouched_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% trans "Kind regards" %},

diff --git a/dojo/templates_classic/notifications/webhooks/scan_added.tpl b/dojo/templates_classic/notifications/webhooks/scan_added.tpl index b42096bfba2..0f68a72eb12 100644 --- a/dojo/templates_classic/notifications/webhooks/scan_added.tpl +++ b/dojo/templates_classic/notifications/webhooks/scan_added.tpl @@ -8,5 +8,11 @@ findings: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_reactivated %} mitigated: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_mitigated %} - untouched: + untouched: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_untouched %} + new_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_new_duplicate %} + reactivated_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_reactivated_duplicate %} + untouched_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_untouched_duplicate %} diff --git a/dojo/test/ui/views.py b/dojo/test/ui/views.py index b89c53cd4e3..b549a1c2236 100644 --- a/dojo/test/ui/views.py +++ b/dojo/test/ui/views.py @@ -44,6 +44,7 @@ from dojo.location.models import Location from dojo.models import ( BurpRawRequestResponse, + Dojo_User, Endpoint, Finding, Finding_Group, @@ -950,6 +951,10 @@ def process_form( "close_old_findings": form.cleaned_data.get("close_old_findings", None), "create_finding_groups_for_all_findings": form.cleaned_data.get("create_finding_groups_for_all_findings", None), }) + # Honor the user's profile deduplication_execution_mode for UI reimports. The API resolves + # this in the serializer; the UI has no per-import selector, so fall back to the profile + # (or block_execution) instead of silently defaulting to async. + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode(request.user) # Override the form values of active and verified if activeChoice := form.cleaned_data.get("active", None): if activeChoice == "force_to_true": diff --git a/dojo/user/models.py b/dojo/user/models.py index 7d88c731fce..4a597ed5277 100644 --- a/dojo/user/models.py +++ b/dojo/user/models.py @@ -6,6 +6,28 @@ User = get_user_model() +# Import post-processing execution modes. +# - ASYNC: post-processing (dedup, jira, grading, ...) runs in the background; +# the API responds immediately (default, historical behavior). +# - ASYNC_WAIT: post-processing is dispatched to the background as usual, but the +# request waits for the deduplication batches to finish before responding, so +# notifications and the returned statistics reflect the deduplicated state. +# - SYNC: post-processing runs inline in the web process (legacy block_execution). +DEDUPLICATION_EXECUTION_MODE_ASYNC = "async" +DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT = "async_wait" +DEDUPLICATION_EXECUTION_MODE_SYNC = "sync" +DEDUPLICATION_EXECUTION_MODES = ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, +) +DEDUPLICATION_EXECUTION_MODE_CHOICES = ( + (DEDUPLICATION_EXECUTION_MODE_ASYNC, _("Async (do not wait)")), + (DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, _("Async, wait for deduplication")), + (DEDUPLICATION_EXECUTION_MODE_SYNC, _("Synchronous (block)")), +) + + # proxy class for convenience and UI class Dojo_User(User): class Meta: @@ -21,8 +43,31 @@ def __str__(self): @staticmethod def wants_block_execution(user): # this return False if there is no user, i.e. in celery processes, unittests, etc. + # block_execution is the global "run all async tasks in the foreground" switch and + # governs every dojo_dispatch_task/dojo_async_task call (notifications, jira, grading, + # deduplication, ...). It is distinct from deduplication_execution_mode, which only + # controls how import/reimport deduplication post-processing is dispatched/awaited. return hasattr(user, "usercontactinfo") and user.usercontactinfo.block_execution + @staticmethod + def resolve_deduplication_execution_mode(user, override=None): + """ + Resolve the effective import/reimport deduplication execution mode. + + Priority: explicit request override > user profile deduplication_execution_mode > + legacy block_execution (which forces everything sync) > default async. + Returns one of DEDUPLICATION_EXECUTION_MODE_ASYNC / _ASYNC_WAIT / _SYNC. + """ + if override in DEDUPLICATION_EXECUTION_MODES: + return override + info = getattr(user, "usercontactinfo", None) + if info is not None: + if info.deduplication_execution_mode in DEDUPLICATION_EXECUTION_MODES: + return info.deduplication_execution_mode + if info.block_execution: + return DEDUPLICATION_EXECUTION_MODE_SYNC + return DEDUPLICATION_EXECUTION_MODE_ASYNC + @staticmethod def force_password_reset(user): return hasattr(user, "usercontactinfo") and user.usercontactinfo.force_password_reset @@ -63,6 +108,21 @@ class UserContactInfo(models.Model): slack_username = models.CharField(blank=True, null=True, max_length=150, help_text=_("Email address associated with your slack account"), verbose_name=_("Slack Email Address")) slack_user_id = models.CharField(blank=True, null=True, max_length=25) block_execution = models.BooleanField(default=False, help_text=_("Instead of async deduping a finding the findings will be deduped synchronously and will 'block' the user until completion.")) + deduplication_execution_mode = models.CharField( + max_length=20, + choices=DEDUPLICATION_EXECUTION_MODE_CHOICES, + null=True, + blank=True, + help_text=_( + "Controls how import/reimport deduplication post-processing is executed. " + "'Async' dispatches it to the background and returns immediately (default). " + "'Async, wait for deduplication' dispatches to the background but waits for " + "deduplication to finish before responding, so notifications and statistics " + "reflect the deduplicated state. 'Synchronous' runs the import deduplication " + "inline. Can be overridden per request. Independent of block_execution, which " + "forces all async tasks (notifications, jira, ...) to the foreground.", + ), + ) force_password_reset = models.BooleanField(default=False, help_text=_("Forces this user to reset their password on next login.")) ui_use_tailwind = models.BooleanField(default=False, verbose_name=_("Use new UI (beta)"), help_text=_("Opt in to the new Tailwind-based UI. Leave off for the classic UI.")) token_last_reset = models.DateTimeField(null=True, blank=True, help_text=_("Timestamp of the most recent API token reset for this user.")) diff --git a/dojo/user/ui/forms.py b/dojo/user/ui/forms.py index d32c3da187e..4affc20cc97 100644 --- a/dojo/user/ui/forms.py +++ b/dojo/user/ui/forms.py @@ -80,7 +80,7 @@ class Meta: # Swap order: password_last_reset before token_last_reset field_order = [ "title", "phone_number", "cell_number", "twitter_username", "github_username", - "slack_username", "ui_use_tailwind", "block_execution", "force_password_reset", "reset_api_token", + "slack_username", "ui_use_tailwind", "block_execution", "deduplication_execution_mode", "force_password_reset", "reset_api_token", "password_last_reset", "token_last_reset", ] diff --git a/tests/base_test_class.py b/tests/base_test_class.py index d9043a43fd5..d94b5020aed 100644 --- a/tests/base_test_class.py +++ b/tests/base_test_class.py @@ -10,7 +10,7 @@ from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support import expected_conditions -from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support.ui import Select, WebDriverWait # import time logging.basicConfig( @@ -354,6 +354,24 @@ def enable_block_execution(self): def disable_block_execution(self): self.set_block_execution(block_execution=False) + def set_deduplication_execution_mode(self, mode="async"): + # Set the admin user's (ourselves) deduplication_execution_mode profile + # field. "async_wait" makes import/reimport block until background + # deduplication has finished before returning. + logger.info("setting deduplication execution mode to: %s", mode) + driver = self.driver + driver.get(self.base_url + "profile") + select = Select(driver.find_element(By.ID, "id_deduplication_execution_mode")) + if select.first_selected_option.get_attribute("value") != mode: + select.select_by_value(mode) + # save settings + driver.find_element(By.CSS_SELECTOR, "input.btn.btn-primary").click() + # check it persisted after reload + driver.get(self.base_url + "profile") + select = Select(driver.find_element(By.ID, "id_deduplication_execution_mode")) + self.assertEqual(select.first_selected_option.get_attribute("value"), mode) + return driver + def enable_deduplication(self): return self.enable_system_setting("id_enable_deduplication") diff --git a/tests/dedupe_test.py b/tests/dedupe_test.py index d0ffee6fad2..0893bf9b034 100644 --- a/tests/dedupe_test.py +++ b/tests/dedupe_test.py @@ -1,3 +1,4 @@ +import json import logging import os import sys @@ -5,6 +6,7 @@ import unittest from pathlib import Path +import requests from base_test_class import BaseTestCase, on_exception_html_source_logger, set_suite_settings from product_test import ProductTest from selenium.common.exceptions import TimeoutException @@ -510,6 +512,187 @@ def test_check_service(self): self.check_nb_duplicates(0) +class ImportAsyncWaitApiTest(unittest.TestCase): + + """ + Deterministic API test for the import 'async_wait' deduplication mode. + + The Selenium async_wait test (removed) was only a UI smoke test: with few + findings and several page navigations between the import and the duplicate + check, the background dedupe finished regardless of whether the import + actually blocked, so it could not fail when the cross-process join is broken. + The eager unit tests (CELERY_TASK_ALWAYS_EAGER) and the mocked perf test can't + catch it either. + + This test runs against the real docker-compose stack: a separate celeryworker + process + broker, with the global CELERY_TASK_IGNORE_RESULT in effect. It is + the only coverage that can fail when the join is a no-op. + + Determinism comes from DD_DEDUPLICATION_BATCH_PROCESS_TEST_DELAY, set on the + integration-test celeryworker (see docker-compose.override.integration_tests.yml): + each deduplication batch for this test's findings sleeps a few seconds before + doing any work. 'async_wait' blocks on the worker round-trip, so by the time + the import response returns the delayed batch has finished and every duplicate + is already marked -> count == NUM_FINDINGS. A broken/no-op join would not block, + so the count would still be 0 at response time -> the test fails. The injected + delay makes that distinction independent of worker speed (a plain large-report + race would not, since dedup overlaps the import and can finish before the + response on a fast worker). + + The 'async' counterpart only asserts deduplication_complete is False: its + worker task races the import's own DB commit, so the marked count at response + time is not deterministic -- the count guarantee is asserted on async_wait. + """ + + # Small: determinism comes from the worker-side dedup delay, not report size. + NUM_FINDINGS = 25 + + @classmethod + def setUpClass(cls): + cls.base_url = os.environ["DD_BASE_URL"].rstrip("/") + cls.api = cls.base_url + "/api/v2" + resp = requests.post( + cls.api + "/api-token-auth/", + data={ + "username": os.environ["DD_ADMIN_USER"], + "password": os.environ["DD_ADMIN_PASSWORD"], + }, + timeout=30, + ) + resp.raise_for_status() + cls.headers = {"Authorization": "Token " + resp.json()["token"]} + # Deduplication must be enabled globally for the mode to do anything. + cls._patch("/system_settings/1/", {"enable_deduplication": True}) + + # --- thin API helpers ------------------------------------------------- + @classmethod + def _get(cls, path, **params): + r = requests.get(cls.api + path, headers=cls.headers, params=params, timeout=60) + r.raise_for_status() + return r.json() + + @classmethod + def _post(cls, path, payload): + r = requests.post(cls.api + path, headers=cls.headers, json=payload, timeout=60) + r.raise_for_status() + return r.json() + + @classmethod + def _patch(cls, path, payload): + r = requests.patch(cls.api + path, headers=cls.headers, json=payload, timeout=60) + r.raise_for_status() + return r.json() + + # --- fixtures --------------------------------------------------------- + def _make_engagement(self, name): + """Create a product + dedup-on-engagement engagement, return its id.""" + suffix = f"{os.getpid()}-{name}" + prod_types = self._get("/product_types/", limit=1)["results"] + prod_type_id = ( + prod_types[0]["id"] if prod_types + else self._post("/product_types/", {"name": f"async_wait pt {suffix}"})["id"] + ) + product = self._post("/products/", { + "name": f"async_wait prod {suffix}", + "description": "async_wait integration test", + "prod_type": prod_type_id, + }) + engagement = self._post("/engagements/", { + "name": f"async_wait eng {suffix}", + "product": product["id"], + "target_start": "2020-01-01", + "target_end": "2030-01-01", + "deduplication_on_engagement": True, + "engagement_type": "CI/CD", + }) + return engagement["id"] + + def _generic_report(self): + """ + A Generic Findings Import report of NUM_FINDINGS unique findings. + + Re-importing the identical content into a second test of the same + dedup-on-engagement engagement marks all NUM_FINDINGS as duplicates. + """ + findings = [ + { + "title": f"async_wait finding {i}", + "severity": "High", + "description": f"async_wait dedup finding number {i}", + } + for i in range(self.NUM_FINDINGS) + ] + return json.dumps({"findings": findings}) + + def _import(self, engagement_id, mode): + """POST /import-scan and return (response_json, test_id).""" + report = self._generic_report() + resp = requests.post( + self.api + "/import-scan/", + headers=self.headers, + data={ + "scan_type": "Generic Findings Import", + "engagement": engagement_id, + "minimum_severity": "Info", + "active": True, + "verified": False, + "deduplication_execution_mode": mode, + }, + files={"file": ("report.json", report, "application/json")}, + timeout=120, + ) + self.assertEqual(resp.status_code, 201, resp.text) + body = resp.json() + return body, body["test"] + + def _duplicates_marked(self, test_id): + """Count findings flagged duplicate in a test, WITHOUT any wait/retry.""" + return self._get("/findings/", test=test_id, duplicate=True, limit=1)["count"] + + # --- tests ------------------------------------------------------------ + def test_async_wait_blocks_until_dedupe_complete(self): + """async_wait: response reports completion AND all dupes already marked.""" + engagement_id = self._make_engagement("wait") + # First import populates the engagement. + self._import(engagement_id, "async_wait") + # Second identical import deduplicates against the first. + body, test_id = self._import(engagement_id, "async_wait") + + self.assertTrue( + body.get("deduplication_complete"), + f"async_wait did not report deduplication_complete: {body}", + ) + # No sleep/retry: async_wait must have blocked past the worker-side dedup + # delay until dedupe finished, so every finding is already marked duplicate. + marked = self._duplicates_marked(test_id) + self.assertEqual( + marked, self.NUM_FINDINGS, + f"async_wait returned with only {marked}/{self.NUM_FINDINGS} duplicates marked " + "-> the cross-process join did not block until deduplication finished", + ) + + def test_async_does_not_block(self): + """ + Control: plain async must NOT report deduplication as complete. + + Counterpart to the async_wait test: the same import in 'async' mode does + not await deduplication, so its response reports deduplication_complete + False. (We deliberately do not assert the duplicate count here: the async + worker task races the import's own DB commit, so how many are marked at + response time is not deterministic -- the meaningful, deterministic signal + is the flag. The duplicate-count guarantee is asserted on async_wait above, + where the worker-side delay makes it robust.) + """ + engagement_id = self._make_engagement("async") + self._import(engagement_id, "async") + body, _test_id = self._import(engagement_id, "async") + + self.assertFalse( + body.get("deduplication_complete"), + f"async unexpectedly reported deduplication_complete: {body}", + ) + + def add_dedupe_tests_to_suite(suite, *, jira=False, github=False, block_execution=False): suite.addTest(BaseTestCase("test_login")) set_suite_settings(suite, jira=jira, github=github, block_execution=block_execution) @@ -570,6 +753,10 @@ def suite(): suite = unittest.TestSuite() add_dedupe_tests_to_suite(suite, jira=False, github=False, block_execution=False) add_dedupe_tests_to_suite(suite, jira=True, github=True, block_execution=True) + # Deterministic real-worker guard for 'async_wait' (independent of jira/github, + # so added once rather than per add_dedupe_tests_to_suite run). + suite.addTest(ImportAsyncWaitApiTest("test_async_wait_blocks_until_dedupe_complete")) + suite.addTest(ImportAsyncWaitApiTest("test_async_does_not_block")) return suite diff --git a/unittests/test_import_execution_mode.py b/unittests/test_import_execution_mode.py new file mode 100644 index 00000000000..b240056a2af --- /dev/null +++ b/unittests/test_import_execution_mode.py @@ -0,0 +1,253 @@ +from unittest.mock import patch + +from django.test import override_settings + +from dojo.importers.default_importer import DefaultImporter +from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, + Development_Environment, + Dojo_User, + Engagement, + Finding, + Test, + UserContactInfo, +) + +from .dojo_test_case import DojoAPITestCase, DojoTestCase, get_unit_tests_path, versioned_fixtures + + +@versioned_fixtures +class ImportExecutionModeResolverTest(DojoTestCase): + + """resolve_deduplication_execution_mode: request override > profile > default.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + self.user = Dojo_User.objects.get(username="admin") + UserContactInfo.objects.filter(user=self.user).delete() + + def _set_profile(self, *, mode=None): + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"deduplication_execution_mode": mode}, + ) + self.user.refresh_from_db() + + def test_default_is_async(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_request_override_wins_over_profile(self): + self._set_profile(mode=DEDUPLICATION_EXECUTION_MODE_SYNC) + self.assertEqual( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + Dojo_User.resolve_deduplication_execution_mode(self.user, DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT), + ) + + def test_profile_mode_used_when_no_override(self): + self._set_profile(mode=DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_empty_profile_falls_back_to_async(self): + self._set_profile(mode=None) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_invalid_override_ignored(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user, "garbage")) + + def test_no_user(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(None)) + + def test_block_execution_falls_back_to_sync(self): + # legacy global block_execution flag implies synchronous deduplication + UserContactInfo.objects.update_or_create(user=self.user, defaults={"block_execution": True}) + self.user.refresh_from_db() + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_SYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_mode_takes_precedence_over_block_execution(self): + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"block_execution": True, "deduplication_execution_mode": DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT}, + ) + self.user.refresh_from_db() + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_wants_block_execution_reads_block_execution_not_mode(self): + # wants_block_execution is the global switch and is independent of the dedup mode + UserContactInfo.objects.update_or_create(user=self.user, defaults={"block_execution": True}) + self.user.refresh_from_db() + self.assertTrue(Dojo_User.wants_block_execution(self.user)) + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"block_execution": False, "deduplication_execution_mode": DEDUPLICATION_EXECUTION_MODE_SYNC}, + ) + self.user.refresh_from_db() + # a 'sync' dedup mode alone does NOT force global foreground execution + self.assertFalse(Dojo_User.wants_block_execution(self.user)) + + +@versioned_fixtures +class ImporterDispatchKwargsTest(DojoTestCase): + + """deduplication_execution_mode -> dojo_dispatch_task force flags.""" + + fixtures = ["dojo_testdata.json"] + + def _importer(self, mode, **extra): + return DefaultImporter( + scan_type="ZAP Scan", + engagement=Engagement.objects.first(), + environment=Development_Environment.objects.first(), + deduplication_execution_mode=mode, + **extra, + ) + + def test_sync_mode_forces_sync(self): + self.assertEqual({"force_sync": True}, self._importer(DEDUPLICATION_EXECUTION_MODE_SYNC).post_processing_dispatch_kwargs()) + + def test_async_wait_mode_forces_async(self): + self.assertEqual({"force_async": True}, self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT).post_processing_dispatch_kwargs()) + + def test_async_mode_preserves_external_force_sync(self): + importer = self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC) + self.assertEqual({"force_sync": False}, importer.post_processing_dispatch_kwargs()) + self.assertEqual({"force_sync": True}, importer.post_processing_dispatch_kwargs(force_sync=True)) + + def test_invalid_mode_defaults_to_async(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, self._importer("nonsense").deduplication_execution_mode) + + def test_external_force_sync_promotes_to_sync_mode(self): + importer = self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC, force_sync=True) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_SYNC, importer.deduplication_execution_mode) + + +@versioned_fixtures +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +class ImportExecutionModeAPITest(DojoAPITestCase): + + """ + End-to-end: the import endpoints accept and honor deduplication_execution_mode. + + CELERY_TASK_ALWAYS_EAGER runs dispatched tasks inline against the test DB, so + 'async_wait' can actually join its deduplication batch (a real broker/worker + runs against a different DB and could never see the test transaction's data). + """ + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + self.login_as_admin() + + def _payload(self, mode): + return { + "minimum_severity": "Low", + "scan_type": "ZAP Scan", + "engagement": 1, + "deduplication_execution_mode": mode, + } + + def test_import_async_wait_returns_statistics(self): + # NOTE: this assertion is not actually meaningful under CELERY_TASK_ALWAYS_EAGER: + # eager .get() returns an inline EagerResult, so deduplication_complete is True + # whether or not the real cross-process join works. It cannot fail when the join + # is broken. The genuine guarantee (async_wait blocks until dedupe finishes) is + # covered by the real-worker integration test, ImportAsyncWaitApiTest in + # tests/dedupe_test.py. Kept here only for endpoint/field plumbing coverage and + # future reference if this ever runs against a non-eager worker. + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT) + payload["file"] = testfile + result = self.import_scan(payload, 201) + self.assertIn("statistics", result) + self.assertIn("after", result["statistics"]) + # async_wait joins deduplication, so it must report completion + self.assertTrue(result["deduplication_complete"]) + + def test_import_async_does_not_await_deduplication(self): + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload(DEDUPLICATION_EXECUTION_MODE_ASYNC) + payload["file"] = testfile + result = self.import_scan(payload, 201) + self.assertFalse(result["deduplication_complete"]) + + def test_import_rejects_invalid_mode(self): + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload("not-a-mode") + payload["file"] = testfile + self.import_scan(payload, 400) + + +@versioned_fixtures +class NotificationDeduplicationRefreshTest(DojoTestCase): + + """notify_scan_added refreshes duplicate status from the DB once dedup is complete.""" + + fixtures = ["dojo_testdata.json"] + + def _importer(self): + test = Test.objects.first() + importer = DefaultImporter( + scan_type="ZAP Scan", + engagement=test.engagement, + environment=Development_Environment.objects.first(), + ) + return importer, test + + @patch("dojo.importers.base_importer.create_notification") + def test_deduplicated_new_findings_excluded_when_complete(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = True + + real = Finding(test=test, title="real finding", severity="High") + real.save() + dupe = Finding(test=test, title="dupe finding", severity="High") + dupe.save() + # Simulate background deduplication having flagged the second finding. + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=2, new_findings=[real, dupe]) + + kwargs = mock_notify.call_args.kwargs + self.assertEqual([f.id for f in kwargs["findings_new"]], [real.id]) + self.assertEqual([f.id for f in kwargs["findings_new_duplicate"]], [dupe.id]) + # headline count excludes the deduplicated finding + self.assertEqual(kwargs["finding_count"], 1) + self.assertEqual(kwargs["event"], "scan_added") + + @patch("dojo.importers.base_importer.create_notification") + def test_async_mode_does_not_refresh(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = False # plain async: dedup not awaited + + dupe = Finding(test=test, title="async dupe", severity="High") + dupe.save() + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=1, new_findings=[dupe]) + + kwargs = mock_notify.call_args.kwargs + # historical behavior: duplicate still listed/counted as new + self.assertEqual([f.id for f in kwargs["findings_new"]], [dupe.id]) + self.assertEqual(kwargs["findings_new_duplicate"], []) + self.assertEqual(kwargs["finding_count"], 1) + + @patch("dojo.importers.base_importer.create_notification") + def test_all_new_findings_duplicate_yields_empty_event(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = True + + dupe = Finding(test=test, title="only dupe", severity="Low") + dupe.save() + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=1, new_findings=[dupe]) + + kwargs = mock_notify.call_args.kwargs + self.assertEqual(kwargs["findings_new"], []) + self.assertEqual([f.id for f in kwargs["findings_new_duplicate"]], [dupe.id]) + self.assertEqual(kwargs["finding_count"], 0) + # net-new is zero -> empty scan notification + self.assertEqual(kwargs["event"], "scan_added_empty") diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index 47d30a99824..7a226f089a8 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -403,7 +403,7 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr ) # Deduplication is enabled in the tests above, but to properly test it we must run the same import twice and capture the results. - def _deduplication_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, *, check_duplicates=True): + def _deduplication_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, *, check_duplicates=True, dedup_mode=None): """ Test method to measure deduplication performance by importing the same scan twice. The second import should result in all findings being marked as duplicates. @@ -444,6 +444,7 @@ def _deduplication_performance(self, expected_num_queries1, expected_num_async_t "verified": True, "scan_type": STACK_HAWK_SCAN_TYPE, "engagement": engagement, + **({"deduplication_execution_mode": dedup_mode} if dedup_mode else {}), } importer = DefaultImporter(**import_options) _, _, len_new_findings1, len_closed_findings1, _, _, _ = importer.process_scan(scan) @@ -471,6 +472,7 @@ def _deduplication_performance(self, expected_num_queries1, expected_num_async_t "verified": True, "scan_type": STACK_HAWK_SCAN_TYPE, "engagement": engagement, + **({"deduplication_execution_mode": dedup_mode} if dedup_mode else {}), } importer = DefaultImporter(**import_options) _, _, len_new_findings2, len_closed_findings2, _, _, _ = importer.process_scan(scan) @@ -551,6 +553,41 @@ def test_deduplication_performance_pghistory_no_async(self): expected_num_async_tasks2=2, ) + @override_settings(ENABLE_AUDITLOG=True) + def test_deduplication_performance_pghistory_async_wait(self): + """ + Deduplication performance in the 'async_wait' execution mode: post-processing is + dispatched to a background worker, then the request joins on the result before + responding. The dedup queries run in the worker (a separate connection), NOT in + the web request, so the only web-side cost over the plain async path is +2: the + post-dedup notification refresh SELECT, plus one result-backend write from the + per-dispatch ignore_result=False that lets the request join via AsyncResult.get(). + + We do not use CELERY_TASK_ALWAYS_EAGER here — that would run the dispatched task + inline on the request's connection and wrongly count the worker's dedup queries. + Instead the dedup batch is dispatched async (not executed in-process) and the join + (AsyncResult.get) is mocked to return immediately, simulating a worker that has + finished. deduplication_complete is therefore True (so the refresh runs), but the + findings are not actually deduplicated in-test, so check_duplicates is False. + """ + configure_audit_system() + configure_pghistory_triggers() + + # Enable deduplication + self.system_settings(enable_deduplication=True) + + # Simulate the background worker's post-processing having completed so the join + # returns instantly without executing dedup on the request's DB connection. + with patch("celery.result.AsyncResult.get", return_value=None): + self._deduplication_performance( + expected_num_queries1=94, + expected_num_async_tasks1=2, + expected_num_queries2=74, + expected_num_async_tasks2=2, + dedup_mode="async_wait", + check_duplicates=False, + ) + @tag("performance") @override_settings(V3_FEATURE_LOCATIONS=True)