feat: allow import/reimport to wait for deduplication to complete#15007
feat: allow import/reimport to wait for deduplication to complete#15007valentijnscholten wants to merge 12 commits into
Conversation
|
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
Introduce a third import/reimport post-processing execution mode and make it configurable per request and via the user profile. - async (default): dispatch post-processing to the background, respond immediately - async_wait (new): dispatch to the background but wait for deduplication to finish before responding, so scan_added notifications and the returned statistics reflect the deduplicated state - sync: run post-processing inline in the web process Replace the legacy UserContactInfo.block_execution boolean with an import_execution_mode CharField; a data migration maps block_execution=True to the 'sync' mode. wants_block_execution() now derives from the sync mode, preserving global foreground-execution semantics for all async tasks. Add a request field import_execution_mode (ImportScanSerializer/ ReImportScanSerializer) resolved against the profile (request override wins), and a deduplication_complete boolean in the import/reimport response. The post_process_findings_batch task now stores its result (ignore_result=False) so async_wait can join on it via AsyncResult.get(); the join is bounded by the new DD_IMPORT_ASYNC_WAIT_TIMEOUT setting.
notify_scan_added used the importer's in-memory finding lists, whose duplicate flag is stale because deduplication runs on separately-fetched instances. Once deduplication has completed (sync or async_wait, signalled by deduplication_complete), refresh the duplicate flag from the database and split each action list into "real" and duplicate findings: - findings_new -> net-new (excludes deduplicated) - findings_new_duplicate / findings_reactivated_duplicate / findings_untouched_duplicate - finding_count -> recomputed to exclude new/reactivated duplicates (so an all-duplicate import sends scan_added_empty) In plain async mode (dedup not awaited) the lists are left untouched, preserving historical behavior. Mail and webhook scan_added templates render the new duplicate sections. Note: import/reimport response statistics are already dedup-accurate once awaited — Test.statistics / Test_Import.statistics query Finding and annotate the duplicate count directly; only the notification used stale in-memory data.
Move wait_for_post_processing() ahead of the test_added notification dispatch (not just notify_scan_added) so both notifications sent during a fresh import are emitted after deduplication has completed in async_wait mode. The reimporter already orders the await before its single notification.
The mode governs whether the request waits for deduplication, so name it for that. Rename the UserContactInfo field, the request/serializer field, the ImporterOptions attribute and validator, the resolver, and the IMPORT_EXECUTION_MODE_* constants to DEDUPLICATION_EXECUTION_MODE_*. Add a RenameField migration (0271) rather than mutating the add/remove migrations introduced earlier on this branch.
…NC_WAIT_TIMEOUT, default 60s
…ion_mode for import dedup block_execution gates every async task (notifications, jira, grading, deletes, reindex, ...) via we_want_async, so it must remain. Restore it as the global "run all async tasks in the foreground" flag and make deduplication_execution_mode an independent field that only controls how import/reimport deduplication post-processing is dispatched and awaited. - Restore the block_execution field; wants_block_execution() reads it again. - resolve_deduplication_execution_mode() falls back to block_execution -> sync. - Replace the destructive remove migration (0270) with a seed-only data migration that maps block_execution=True -> deduplication_execution_mode 'sync'; keep both. - Restore fixtures, the profile form field, and the user detail view (both fields). - Revert the test sweep: tests that need global foreground use block_execution=True again; the dedicated deduplication_execution_mode tests are updated for the split.
…ort/reimport The API resolves deduplication_execution_mode in the serializer, but the UI import/reimport views built their context straight from the form and never resolved it, so UI imports silently defaulted to async regardless of the user's profile. Resolve it from the profile in both UI process_form paths.
Option B keeps block_execution as a checkbox in the profile form, so the integration helper toggles id_block_execution again instead of the deduplication_execution_mode select (which no longer existed under that id).
Collapse the add/rename pair into a single AddField that creates deduplication_execution_mode with its final name, and keep the seed as a separate data migration, per the convention of keeping schema and data migrations apart. Drops the interim rename migration.
…E_LOCATIONS The CI 'true' matrix variant enables V3_FEATURE_LOCATIONS, where loading dojo_testdata.json (containing Endpoints) raises NotImplementedError. Decorate the test classes with @versioned_fixtures so the locations fixture is used in that mode, matching the other import/reimport test suites.
Covers the 'async_wait' execution mode: post-processing is dispatched to a background worker and the request joins on it before responding. The dedup queries run in the worker (separate connection), not the web request, so the only web-side delta over the plain async path is the post-dedup notification refresh SELECT (+1): 109->110 first import, 89->90 second. Does not use CELERY_TASK_ALWAYS_EAGER (that would run the task inline on the request connection and wrongly count worker queries). The dedup batch is dispatched async and the join (AsyncResult.get) is mocked to return instantly, simulating a finished worker. Adds an optional dedup_mode param to _deduplication_performance.
69aa97b to
506fbbc
Compare
|
Conflicts have been resolved. A maintainer will review the pull request shortly. |
|
This pull request has conflicts, please resolve those before we can evaluate the pull request. |
Maffooch
left a comment
There was a problem hiding this comment.
Review (code + local e2e). Reviewed the diff and exercised it end-to-end against the running stack.
Verification (all green):
unittests.test_import_execution_mode→ 20 passed;unittests.test_importers_performance→ 11 passed / 6 skipped (incl. the newtest_deduplication_performance_pghistory_async_wait).ruff --config ruff.toml→ clean on all changed non-migration files.- Live e2e against a real Celery worker + broker +
django-dbresult backend (not eager):async_waitimport →deduplication_complete=truewith correct stats;async→false; invalid mode → 400. So the cross-processAsyncResult.get()join genuinely works with the globalCELERY_TASK_IGNORE_RESULT=True— confirming the open question in thehelper.pyNOTE (see inline comment there). - Server-rendered UI:
view_usershows the new row and the profile form dropdown persists the setting.
One blocking item (the migration collision, #1) and four non-blocking suggestions inline below. Nice feature — the duplicate-aware notification split is a clear improvement.
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('dojo', '0268_release_authorization_to_pro'), |
There was a problem hiding this comment.
Blocking: migration number collision. dev already ships 0269_normalize_blank_finding_components, which also declares dependencies = [('dojo', '0268_release_authorization_to_pro')]. With this PR there are two 0269 migrations on the same parent → Django reports "Conflicting migrations detected; multiple leaf nodes in the migration graph" on migrate / makemigrations --check, and the merge fails (this is the conflicts-detected label).
Fix: renumber both files and re-chain them after the current leaf:
0269_usercontactinfo_deduplication_execution_mode→0270_..., withdependencies = [('dojo', '0269_normalize_blank_finding_components')]0270_seed_deduplication_execution_mode→0271_..., withdependencies = [('dojo', '0270_usercontactinfo_deduplication_execution_mode')]
The seed migration itself is fine and idempotent.
| if result is None or not hasattr(result, "get"): | ||
| continue | ||
| try: | ||
| result.get(timeout=timeout, propagate=False) |
There was a problem hiding this comment.
Non-blocking: the timeout is applied per batch, not per import. This loop does result.get(timeout=timeout) once for each dispatched batch, so worst-case total wait is N_batches × timeout. The PR body, 2.60.md, and the model help_text all say "maximum wait time is 1 minute" — a large import (many batches) with a stuck/overloaded worker can block the request for many minutes. It still degrades safely (never a true hang), so non-blocking, but the advertised bound is wrong.
Mitigation: track a single deadline so the whole join is bounded by one timeout:
import time
deadline = time.monotonic() + timeout
for result in results:
if result is None or not hasattr(result, "get"):
continue
remaining = deadline - time.monotonic()
if remaining <= 0:
success = False
break
try:
result.get(timeout=remaining, propagate=False)
except Exception as e:
logger.warning("async_wait: error/timeout waiting for post-processing: %s", e)
success = False| # appears to have worked with the global CELERY_TASK_IGNORE_RESULT=True and a | ||
| # Redis broker. Needs verification against a real broker/worker setup; if join | ||
| # works without it, this override can be removed to avoid storing extra results. | ||
| @app.task(ignore_result=False) |
There was a problem hiding this comment.
Non-blocking: this stores a result row for every batch of every import, in all modes. The result is only consumed in async_wait, but flipping the task default to ignore_result=False makes plain async imports (the default) write rows to the django-db result backend, retained for CELERY_RESULT_EXPIRES (24h). On a busy instance that's a steady stream of otherwise-useless celery_taskmeta rows.
Re: the NOTE's open question — I verified live that the join does work with this override against a real worker + django-db backend + global CELERY_TASK_IGNORE_RESULT=True. The remaining concern is just the storage cost for the non-async_wait modes.
Mitigation: leave the task default alone and request the result only when you'll await it, scoping it to the dispatch in post_processing_dispatch_kwargs / the apply_async call for async_wait, e.g. apply_async(..., ignore_result=False). (Whether .get() still works without the override under your broker is worth one quick check; if it does, the per-call override can be dropped entirely.)
| mode = DEDUPLICATION_EXECUTION_MODE_ASYNC | ||
| # An explicit force_sync from a non-serializer caller still wins. | ||
| if kwargs.get("force_sync"): | ||
| mode = DEDUPLICATION_EXECUTION_MODE_SYNC |
There was a problem hiding this comment.
Non-blocking: an explicit async_wait is silently downgraded to sync when force_sync=True is also passed. A caller supplying both deduplication_execution_mode='async_wait' and force_sync=True ends up in sync with no warning. The main API/UI paths don't combine them (the serializer/views resolve a clean mode without force_sync), so low risk — but it's a surprising precedence for internal/test callers.
Mitigation: only let force_sync promote to sync when no explicit mode was supplied, e.g. apply the force_sync override before defaulting, or skip it when kwargs.get("deduplication_execution_mode") is a valid mode. At minimum, document that force_sync wins over an explicit mode.
|
|
||
|
|
||
| @versioned_fixtures | ||
| @override_settings(CELERY_TASK_ALWAYS_EAGER=True) |
There was a problem hiding this comment.
Non-blocking: no regression guard for the real cross-process join. This suite (and the perf test, which mocks AsyncResult.get) runs under CELERY_TASK_ALWAYS_EAGER=True, which executes the dispatched task inline on the request's connection — so the actual worker→request join with CELERY_TASK_IGNORE_RESULT=True is never exercised by CI. I confirmed it works by hand in this review, but the behavior the helper.py NOTE worries about is exactly what isn't covered.
Mitigation: consider an opt-in integration test (real broker/worker, e.g. under the integration-test harness) that asserts async_wait blocks until dedup completes and returns deduplication_complete=true. The override_settings/eager docstring here is accurate about why eager is used — just call out that the cross-process path is verified elsewhere/manually.
Summary
While thinking about what a v3 import API could look like, I realized we're missing one piece of the puzzel in the current (re)import.
By default deduplication runs as an async celery task. The results are not awaited, so the (re)import process can finish before deduplication is complete.
Consequences are:
scan_addedcontains incorrect statisticsdeltastatitistics.With the recent optimizations on both import and deduplication the chances of these incorrect statistics has gone down. However the most confusing case happens when the deduplication is partly finished. In that case the statistics will contain some duplicates, but not all of them. Example report: #13777
Pro also suffers from this when not using the background-import. Also the pattern in this PR could be used for other Pro features like the current asynchronous prioritization.
The solution in this PR is to introduce a new
deduplication_execution_modeenum that controls deduplication execution:async (default): perform deduplication in the background, so not wait for the results.
sync (
block_execution==True): perform deduplication in the foreground/in-line with the (re)import.async_await: perform deduplication in the background and wait for the results.
The
async_awaitis the new option and could be the new default in the future. Theblock_executionuser profile variable is removed in this PR as it is obselete.Data is migrated. Using
block_execution==Truecould be used to achieve a similar result, but it would slow the import a lot more because deduplication would run in the foreground. It would also make everything synchronous inluding notifications, pushing to JIRA, etc causing further slowdowns.The new
deduplication_execution_modecan be set in the user profile, but also in the API requests.In most scenario's the await setting will not lead to significantly longer running import, may only 1 or 2 seconds extra.
The exception is on very busy/overloaded instances where the sync deduplication jobs are queueing up.
For this reason the maximum wait time is 1 minute. After 1 minute the (re)import no longer waits and returns statistics as-is.
A response variable
deduplication_completewill beFalsein this case.Details
deduplication_execution_mode(async/async_wait/sync) and overridable per request through adeduplication_execution_modefield on the import and reimport endpoints. The request value takes precedence over the profile, otherwise it falls back toasync.async_wait(new): deduplication/post-processing is still dispatched to the background, but the request waits for deduplication to finish before responding, soscan_addednotifications and the returned statistics reflect the deduplicated state. JIRA push, product grading and other non-deduplication tasks remain asynchronous and are not awaited.async(default, return immediately) andsync(run inline) round out the modes.block_executionflag, which is retained as the global "run all async tasks (notifications, JIRA, grading, ...) in the foreground" switch. A data migration seedsdeduplication_execution_mode='sync'for users who hadblock_execution=True, and the resolver falls back toblock_execution → sync, so import behavior is unchanged for them.post_process_findings_batchnow stores its result (ignore_result=False) soasync_waitcan join on the dispatched batch. The wait is bounded by the newDD_DEDUPLICATION_ASYNC_WAIT_TIMEOUTsetting (default 60s) so a missing or stuck worker degrades to responding anyway rather than hanging.deduplication_completeboolean to the import/reimport response indicating whether deduplication had finished by the time the response was produced (trueforsyncand forasync_waitwhen it completed within the timeout;falseforasync).scan_addednotifications deduplication-aware: once deduplication has completed, refresh the duplicate status of the affected findings from the database and split the new / reactivated / untouched lists into real and duplicate sub-lists, exclude duplicates from the headline count (so an all-duplicate import sendsscan_added_empty), and surface the duplicate groups in the mail and webhook templates. In plainasyncmode the lists are left untouched, preserving historical behavior.deduplication_execution_modefrom the user's profile (the API resolves it in the serializer), so UI imports honor the profile setting instead of always defaulting to async. (No per-import selector is added to the UI form in this change.)deduplication_execution_modein the user profile form and the user detail view, and add 2.60 upgrade notes.