diff --git a/docs/content/releases/os_upgrading/3.2.md b/docs/content/releases/os_upgrading/3.2.md new file mode 100644 index 00000000000..1d2df7bf376 --- /dev/null +++ b/docs/content/releases/os_upgrading/3.2.md @@ -0,0 +1,53 @@ +--- +title: 'Upgrading to DefectDojo Version 3.2.x' +toc_hide: true +weight: -20260701 +description: Findings can now carry multiple CWEs via a new Finding_CWE relationship; vulnerability ids gain an autodetected type. A migration creates the CWE table, adds the type column, de-duplicates vulnerability-id rows, and adds a uniqueness constraint. Existing hash codes are unaffected. +--- + +## Multiple CWEs per finding + +A finding could previously store only one CWE (the integer `cwe` field). This release adds a +dedicated `Finding_CWE` relationship so a finding can carry **multiple CWEs**. The primary CWE +stays on `Finding.cwe` (unchanged — legacy deduplication and hash codes still use it); additional +CWEs live in the relationship. + +CWE is modeled separately from vulnerability identifiers (CVE, GHSA, …) on purpose: a CWE is a +weakness *class*, not a vulnerability *instance* identifier, so it must not participate in +`hash_code`, vulnerability-id deduplication, or the `cve` field. Because of this separation, +**existing hash codes and deduplication are unaffected** by this change. + +CWEs are populated automatically on import and when a finding is created or edited (from the +finding's CWE field, plus any additional CWEs a parser supplies). The finding exposes them via +`finding.cwes` (primary first, deduplicated). + +## Vulnerability id type + +Each `Vulnerability_Id` gains an autodetected `vulnerability_id_type` — the identifier's leading +prefix (`CVE-2024-1234` → `CVE`, `GHSA-…` → `GHSA`, `RUSTSEC-…` → `RUSTSEC`). It is derived +structurally (no registry) and stored (indexed) so identifiers can be filtered and grouped by type +efficiently. It is `NULL` when there is no non-numeric prefix. + +## Database migration + +A migration (`0273_finding_cwe_and_vulnerability_id_type`) runs automatically on upgrade and: + +- creates the `Finding_CWE` table (unique per `(finding, cwe)`); +- adds the indexed `vulnerability_id_type` column to the vulnerability-id table; +- removes duplicate `(finding, vulnerability_id)` rows, keeping the earliest of each — such + duplicates are unintended, and consolidating them allows a uniqueness constraint to be added; +- adds a unique constraint on `(finding, vulnerability_id)`. + +### What you need to do + +The migration is applied automatically. New and edited findings populate their CWE relationship +automatically. To backfill `Finding_CWE` rows for **existing** findings, run the command after +upgrading: + +``` +manage.py migrate_cwe +``` + +The command is idempotent (safe to re-run). + +For more information, check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/3.2.0). diff --git a/dojo/db_migrations/0273_finding_cwe_and_vulnerability_id_type.py b/dojo/db_migrations/0273_finding_cwe_and_vulnerability_id_type.py new file mode 100644 index 00000000000..912ab5b9689 --- /dev/null +++ b/dojo/db_migrations/0273_finding_cwe_and_vulnerability_id_type.py @@ -0,0 +1,48 @@ +import django.db.models.deletion +from django.db import migrations, models + + +def dedupe_vulnerability_ids(apps, schema_editor): + """Remove duplicate (finding, vulnerability_id) rows, keeping the lowest id, so the + unique constraint below can be added. Postgres.""" + schema_editor.execute( + """ + DELETE FROM dojo_vulnerability_id a + USING dojo_vulnerability_id b + WHERE a.finding_id = b.finding_id + AND a.vulnerability_id = b.vulnerability_id + AND a.id > b.id + """, + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ("dojo", "0272_reencrypt_tool_config_credentials_aes_gcm"), + ] + + operations = [ + migrations.CreateModel( + name="Finding_CWE", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("cwe", models.IntegerField(db_index=True)), + ("finding", models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, to="dojo.finding")), + ], + ), + migrations.AddConstraint( + model_name="finding_cwe", + constraint=models.UniqueConstraint(fields=("finding", "cwe"), name="unique_finding_cwe"), + ), + migrations.AddField( + model_name="vulnerability_id", + name="vulnerability_id_type", + field=models.CharField(blank=True, db_index=True, editable=False, max_length=20, null=True), + ), + migrations.RunPython(dedupe_vulnerability_ids, migrations.RunPython.noop), + migrations.AddConstraint( + model_name="vulnerability_id", + constraint=models.UniqueConstraint(fields=("finding", "vulnerability_id"), name="unique_finding_vulnerability_id"), + ), + ] diff --git a/dojo/finding/api/serializer.py b/dojo/finding/api/serializer.py index 360f08b7926..efebfb798ff 100644 --- a/dojo/finding/api/serializer.py +++ b/dojo/finding/api/serializer.py @@ -16,11 +16,13 @@ from dojo.authorization.authorization import user_has_permission from dojo.celery_dispatch import dojo_dispatch_task from dojo.finding.helper import ( + save_cwes, save_endpoints_template, save_vulnerability_ids, save_vulnerability_ids_template, ) from dojo.finding.models import BurpRawRequestResponse +from dojo.finding.vulnerability_id import cwe_number from dojo.jira import services as jira_services from dojo.jira.api.serializers import JIRAIssueSerializer from dojo.location.models import LocationFindingReference @@ -32,6 +34,7 @@ Endpoint, Engagement, Finding, + Finding_CWE, Finding_Group, Finding_Template, Note_Type, @@ -298,6 +301,29 @@ class Meta: fields = ["vulnerability_id"] +class CweField(serializers.Field): + + """Serialize a CWE as the canonical ``CWE-`` string; accept ``"CWE-79"`` or ``"79"`` on write.""" + + def to_representation(self, value): + return f"CWE-{value}" + + def to_internal_value(self, data): + number = cwe_number(data) + if number is None: + msg = "Enter a CWE number, e.g. 89 or CWE-89." + raise serializers.ValidationError(msg) + return number + + +class FindingCweSerializer(serializers.ModelSerializer): + cwe = CweField() + + class Meta: + model = Finding_CWE + fields = ["cwe"] + + class FindingSerializer(serializers.ModelSerializer): mitigated = serializers.DateTimeField(required=False, allow_null=True) mitigated_by = serializers.PrimaryKeyRelatedField(required=False, allow_null=True, queryset=User.objects.all()) @@ -321,6 +347,9 @@ class FindingSerializer(serializers.ModelSerializer): vulnerability_ids = VulnerabilityIdSerializer( source="vulnerability_id_set", many=True, required=False, ) + cwes = FindingCweSerializer( + source="finding_cwe_set", many=True, required=False, + ) reporter = serializers.PrimaryKeyRelatedField( required=False, queryset=User.objects.all(), ) @@ -417,6 +446,13 @@ def update(self, instance, validated_data): logger.debug("SETTING CVE FROM VULNERABILITY_ID_SET: %s", parsed_vulnerability_ids[0]) validated_data["cve"] = parsed_vulnerability_ids[0] + # CWEs (mirror vulnerability_ids): the first entry is the primary Finding.cwe; the rest + # become Finding_CWE rows via save_cwes() below. + parsed_cwes = None + if (cwes := validated_data.pop("finding_cwe_set", None)) is not None: + parsed_cwes = [entry["cwe"] for entry in cwes] + validated_data["cwe"] = parsed_cwes[0] if parsed_cwes else 0 + # Save the reporter on the finding if reporter_id := validated_data.get("reporter"): instance.reporter = reporter_id @@ -445,6 +481,11 @@ def update(self, instance, validated_data): instance, validated_data, ) + # Sync the CWE relation (separate from vulnerability ids) after the new cwe is applied. + if parsed_cwes is not None: + instance.unsaved_cwes = parsed_cwes[1:] + save_cwes(instance) + if settings.V3_FEATURE_LOCATIONS and locations is not None: for location_ref in instance.locations.all(): location_ref.location.disassociate_from_finding(instance) @@ -561,6 +602,9 @@ class FindingCreateSerializer(serializers.ModelSerializer): vulnerability_ids = VulnerabilityIdSerializer( source="vulnerability_id_set", many=True, required=False, ) + cwes = FindingCweSerializer( + source="finding_cwe_set", many=True, required=False, + ) reporter = serializers.PrimaryKeyRelatedField( required=False, queryset=User.objects.all(), ) @@ -601,6 +645,12 @@ def create(self, validated_data): validated_data["cve"] = parsed_vulnerability_ids[0] # validated_data["unsaved_vulnerability_ids"] = parsed_vulnerability_ids + # CWEs (mirror vulnerability_ids): first entry is the primary cwe, the rest are extras. + parsed_cwes = None + if (cwes := validated_data.pop("finding_cwe_set", None)) is not None: + parsed_cwes = [entry["cwe"] for entry in cwes] + validated_data["cwe"] = parsed_cwes[0] if parsed_cwes else 0 + # super.create() doesn't accept unsaved_vulnerability_ids or dedupe_option=False, so call save directly. new_finding = Finding(**validated_data) new_finding.unsaved_vulnerability_ids = parsed_vulnerability_ids or [] @@ -617,6 +667,9 @@ def create(self, validated_data): new_finding.reviewers.set(reviewers) if parsed_vulnerability_ids: save_vulnerability_ids(new_finding, parsed_vulnerability_ids) + if parsed_cwes is not None: + new_finding.unsaved_cwes = parsed_cwes[1:] + save_cwes(new_finding) if push_to_jira: jira_services.push(new_finding) diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py index bf91b610b6d..dfb7747ddca 100644 --- a/dojo/finding/deduplication.py +++ b/dojo/finding/deduplication.py @@ -340,11 +340,11 @@ def build_candidate_scope_queryset(test, mode="deduplication", service=None): queryset = Finding.objects.filter(scope_q) if settings.V3_FEATURE_LOCATIONS: - prefetch_list = ["locations__location__url", "vulnerability_id_set", "found_by"] + prefetch_list = ["locations__location__url", "vulnerability_id_set", "finding_cwe_set", "found_by"] else: # TODO: Delete this after the move to Locations # Base prefetches for both modes - prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"] + prefetch_list = ["endpoints", "vulnerability_id_set", "finding_cwe_set", "found_by"] # Prefetch all endpoint statuses with their endpoint for reimport mode. # The non-special filtering (excluding false_positive, out_of_scope, risk_accepted) diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index d83d032b176..b78cfb8e356 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -26,6 +26,7 @@ do_false_positive_history_batch, get_finding_models_for_deduplication, ) +from dojo.finding.vulnerability_id import resolve_vulnerability_id_type from dojo.jira import services as jira_services from dojo.location.models import Location from dojo.location.status import FindingLocationStatus @@ -36,6 +37,7 @@ Engagement, FileUpload, Finding, + Finding_CWE, Finding_Group, JIRA_Instance, Notes, @@ -1004,7 +1006,7 @@ def save_vulnerability_ids(finding, vulnerability_ids, *, delete_existing: bool Vulnerability_Id.objects.filter(finding=finding).delete() Vulnerability_Id.objects.bulk_create([ - Vulnerability_Id(finding=finding, vulnerability_id=vid) + Vulnerability_Id(finding=finding, vulnerability_id=vid, vulnerability_id_type=resolve_vulnerability_id_type(vid)) for vid in vulnerability_ids ]) @@ -1015,6 +1017,28 @@ def save_vulnerability_ids(finding, vulnerability_ids, *, delete_existing: bool finding.cve = None +def save_cwes(finding, *, delete_existing: bool = True): + """ + Persist the finding's CWEs as Finding_CWE rows. + + The primary Finding.cwe plus any parser-supplied unsaved_cwes. CWE is a weakness class, + kept separate from vulnerability ids. + """ + cwe_numbers = [] + if finding.cwe and finding.cwe > 0: + cwe_numbers.append(finding.cwe) + cwe_numbers += [cwe for cwe in (getattr(finding, "unsaved_cwes", None) or []) if cwe and cwe > 0] + cwe_numbers = list(dict.fromkeys(cwe_numbers)) + + if delete_existing: + Finding_CWE.objects.filter(finding=finding).delete() + + Finding_CWE.objects.bulk_create( + [Finding_CWE(finding=finding, cwe=cwe) for cwe in cwe_numbers], + ignore_conflicts=True, + ) + + def save_vulnerability_ids_template(finding_template, vulnerability_ids): """Save vulnerability IDs as newline-separated string in TextField.""" # Remove duplicates and empty strings diff --git a/dojo/finding/models.py b/dojo/finding/models.py index 366ca3618e3..e7ed020a652 100644 --- a/dojo/finding/models.py +++ b/dojo/finding/models.py @@ -22,6 +22,7 @@ from titlecase import titlecase from dojo.base_models.base import BaseModel +from dojo.finding.vulnerability_id import resolve_vulnerability_id_type # get_current_date/tomorrow/copy_model_util are defined early in dojo.models, before the # re-export that loads this module — so this resolves despite the partial circular load, and @@ -525,6 +526,9 @@ def __init__(self, *args, **kwargs): self.unsaved_tags = None self.unsaved_files = None self.unsaved_vulnerability_ids = None + # Extra CWE numbers a parser wants to attach in addition to the primary self.cwe. + # Persisted as Finding_CWE rows (multiple CWEs per finding). None = none supplied. + self.unsaved_cwes = None def __str__(self): return self.title @@ -680,6 +684,11 @@ def copy(self, test=None): copy.found_by.set(old_found_by) # Assign any tags copy.tags.set(old_tags) + # Copy the vulnerability ids and CWEs (relation rows aren't copied by copy_model_util) + for vulnerability_id in self.vulnerability_id_set.all(): + Vulnerability_Id.objects.create(finding=copy, vulnerability_id=vulnerability_id.vulnerability_id) + for finding_cwe in self.finding_cwe_set.all(): + Finding_CWE.objects.create(finding=copy, cwe=finding_cwe.cwe) return copy @@ -1334,6 +1343,15 @@ def vulnerability_ids(self): # Remove duplicates return list(dict.fromkeys(vulnerability_ids)) + @cached_property + def cwes(self): + # All CWEs for this finding in canonical CWE- form: the primary self.cwe plus any + # additional Finding_CWE rows (multiple CWEs per finding), primary first, deduplicated. + cwe_numbers = [row.cwe for row in self.finding_cwe_set.all()] + if self.cwe and self.cwe > 0: + cwe_numbers.insert(0, self.cwe) + return [f"CWE-{cwe_number}" for cwe_number in dict.fromkeys(cwe_numbers)] + @property def violates_sla(self): return (self.sla_expiration_date and self.sla_expiration_date < timezone.now().date()) @@ -1357,10 +1375,43 @@ def set_hash_code(self, dedupe_option): class Vulnerability_Id(models.Model): finding = models.ForeignKey("dojo.Finding", editable=False, on_delete=models.CASCADE) vulnerability_id = models.TextField(max_length=50, blank=False, null=False) + # Autodetected from the id prefix (CVE, GHSA, ...); NULL when there is no non-numeric + # prefix. Denormalized/indexed so type-scoped queries (e.g. GROUP BY type) stay cheap. + vulnerability_id_type = models.CharField(max_length=20, null=True, blank=True, editable=False, db_index=True) + + class Meta: + constraints = [ + models.UniqueConstraint(fields=["finding", "vulnerability_id"], name="unique_finding_vulnerability_id"), + ] def __str__(self): return self.vulnerability_id + def save(self, *args, **kwargs): + # bulk_create paths set the type at construction; this covers save()/get_or_create. + self.vulnerability_id_type = resolve_vulnerability_id_type(self.vulnerability_id) + super().save(*args, **kwargs) + + def get_absolute_url(self): + return reverse("view_finding", args=[str(self.finding.id)]) + + +class Finding_CWE(models.Model): + # A CWE weakness associated with a finding. Separate from Vulnerability_Id because a CWE is a + # weakness class, not a vulnerability instance identifier — it must not participate in + # hash_code, vulnerability-id deduplication, or the cve field. The primary CWE stays on + # Finding.cwe; this relation lets a finding carry multiple CWEs. + finding = models.ForeignKey("dojo.Finding", editable=False, on_delete=models.CASCADE) + cwe = models.IntegerField(db_index=True) + + class Meta: + constraints = [ + models.UniqueConstraint(fields=["finding", "cwe"], name="unique_finding_cwe"), + ] + + def __str__(self): + return f"CWE-{self.cwe}" + def get_absolute_url(self): return reverse("view_finding", args=[str(self.finding.id)]) diff --git a/dojo/finding/ui/forms.py b/dojo/finding/ui/forms.py index 79f3e317059..3ca5c6410b0 100644 --- a/dojo/finding/ui/forms.py +++ b/dojo/finding/ui/forms.py @@ -8,6 +8,7 @@ from dojo.endpoint.utils import validate_endpoints_to_add from dojo.finding.queries import get_authorized_findings +from dojo.finding.vulnerability_id import cwe_number, parse_cwes from dojo.jira import services as jira_services from dojo.location.models import Location from dojo.location.utils import validate_locations_to_add @@ -42,6 +43,36 @@ "You may enter one vulnerability id per line.", widget=forms.widgets.Textarea(attrs={"rows": "3", "cols": "400"})) +cwes_field = forms.CharField(max_length=500, + required=False, + label="CWEs", + help_text="CWE numbers associated with this finding. You may enter one per line (e.g. 89 or CWE-89). The first is the primary CWE.", + widget=forms.widgets.Textarea(attrs={"rows": "3", "cols": "400"})) + + +class CweFormMixin: + + """ + Persist the 'cwes' textarea as the primary Finding.cwe plus extra Finding_CWE rows. + + Mirrors how the 'vulnerability_ids' textarea maps to cve + Vulnerability_Id rows. + """ + + def clean_cwes(self): + value = self.cleaned_data.get("cwes", "") + invalid = [token for token in value.replace(",", "\n").split() if cwe_number(token) is None] + if invalid: + msg = f"Invalid CWE(s): {', '.join(invalid)}. Enter numbers like 89 or CWE-89, one per line." + raise forms.ValidationError(msg) + return value + + def save(self, commit=True): # noqa: FBT002 + cwes = parse_cwes(self.cleaned_data.get("cwes")) + self.instance.cwe = cwes[0] if cwes else 0 + self.instance.unsaved_cwes = cwes[1:] + return super().save(commit=commit) + + EFFORT_FOR_FIXING_INVALID_CHOICE = _("Select valid choice: Low,Medium,High") @@ -183,11 +214,11 @@ def __init__(self, *args, **kwargs): self.fields["accepted_findings"].queryset = get_authorized_findings("edit") -class AddFindingForm(forms.ModelForm): +class AddFindingForm(CweFormMixin, forms.ModelForm): title = forms.CharField(max_length=1000) date = forms.DateField(required=True, widget=forms.TextInput(attrs={"class": "datepicker", "autocomplete": "off"})) - cwe = forms.IntegerField(required=False) + cwes = cwes_field vulnerability_ids = vulnerability_ids_field cvssv3 = forms.CharField(label="CVSS3 Vector", max_length=117, required=False, widget=forms.TextInput(attrs={"class": "cvsscalculator", "data-toggle": "dropdown", "aria-haspopup": "true", "aria-expanded": "false"})) cvssv3_score = forms.FloatField(label="CVSS3 Score", required=False, max_value=10.0, min_value=0.0) @@ -219,7 +250,7 @@ class AddFindingForm(forms.ModelForm): "invalid_choice": EFFORT_FOR_FIXING_INVALID_CHOICE}) # the only reliable way without hacking internal fields to get predicatble ordering is to make it explicit - field_order = ("title", "date", "cwe", "vulnerability_ids", "severity", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", "impact", "request", "response", "steps_to_reproduce", + field_order = ("title", "date", "cwes", "vulnerability_ids", "severity", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", "impact", "request", "response", "steps_to_reproduce", "severity_justification", "endpoints", "endpoints_to_add", "references", "active", "verified", "false_p", "duplicate", "out_of_scope", "risk_accepted", "under_defect_review") @@ -279,15 +310,15 @@ def clean_tags(self): class Meta: model = Finding - exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "inherited_tags", + exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "cwe", "inherited_tags", "review_requested_by", "is_mitigated", "jira_creation", "jira_change", "endpoints", "sla_start_date") -class AdHocFindingForm(forms.ModelForm): +class AdHocFindingForm(CweFormMixin, forms.ModelForm): title = forms.CharField(max_length=1000) date = forms.DateField(required=True, widget=forms.TextInput(attrs={"class": "datepicker", "autocomplete": "off"})) - cwe = forms.IntegerField(required=False) + cwes = cwes_field vulnerability_ids = vulnerability_ids_field cvss_info = forms.CharField( @@ -327,7 +358,7 @@ class AdHocFindingForm(forms.ModelForm): "invalid_choice": EFFORT_FOR_FIXING_INVALID_CHOICE}) # the only reliable way without hacking internal fields to get predicatble ordering is to make it explicit - field_order = ("title", "date", "cwe", "vulnerability_ids", "severity", "cvss_info", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", + field_order = ("title", "date", "cwes", "vulnerability_ids", "severity", "cvss_info", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", "impact", "request", "response", "steps_to_reproduce", "severity_justification", "endpoints", "endpoints_to_add", "references", "active", "verified", "false_p", "duplicate", "out_of_scope", "risk_accepted", "under_defect_review", "sla_start_date", "sla_expiration_date") @@ -385,16 +416,16 @@ def clean_tags(self): class Meta: model = Finding - exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "inherited_tags", + exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "cwe", "inherited_tags", "review_requested_by", "is_mitigated", "jira_creation", "jira_change", "endpoints", "sla_start_date", "sla_expiration_date") -class PromoteFindingForm(forms.ModelForm): +class PromoteFindingForm(CweFormMixin, forms.ModelForm): title = forms.CharField(max_length=1000) date = forms.DateField(required=True, widget=forms.TextInput(attrs={"class": "datepicker", "autocomplete": "off"})) - cwe = forms.IntegerField(required=False) + cwes = cwes_field vulnerability_ids = vulnerability_ids_field cvss_info = forms.CharField( @@ -423,7 +454,7 @@ class PromoteFindingForm(forms.ModelForm): references = forms.CharField(widget=forms.Textarea, required=False) # the onyl reliable way without hacking internal fields to get predicatble ordering is to make it explicit - field_order = ("title", "group", "date", "sla_start_date", "sla_expiration_date", "cwe", "vulnerability_ids", "severity", "cvss_info", "cvssv3", + field_order = ("title", "group", "date", "sla_start_date", "sla_expiration_date", "cwes", "vulnerability_ids", "severity", "cvss_info", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", "impact", "request", "response", "steps_to_reproduce", "severity_justification", "endpoints", "endpoints_to_add", "references", "active", "mitigated", "mitigated_by", "verified", "false_p", "duplicate", "out_of_scope", "risk_accept", "under_defect_review") @@ -469,16 +500,16 @@ def clean_tags(self): class Meta: model = Finding - exclude = ("reporter", "url", "numerical_severity", "active", "false_p", "verified", "endpoint_status", "cve", "inherited_tags", + exclude = ("reporter", "url", "numerical_severity", "active", "false_p", "verified", "endpoint_status", "cve", "cwe", "inherited_tags", "duplicate", "out_of_scope", "under_review", "reviewers", "review_requested_by", "is_mitigated", "jira_creation", "jira_change", "planned_remediation_date", "planned_remediation_version", "effort_for_fixing") -class FindingForm(forms.ModelForm): +class FindingForm(CweFormMixin, forms.ModelForm): title = forms.CharField(max_length=1000) group = forms.ModelChoiceField(required=False, queryset=Finding_Group.objects.none(), help_text="The Finding Group to which this finding belongs, leave empty to remove the finding from the group. Groups can only be created via Bulk Edit for now.") date = forms.DateField(required=True, widget=forms.TextInput(attrs={"class": "datepicker", "autocomplete": "off"})) - cwe = forms.IntegerField(required=False) + cwes = cwes_field vulnerability_ids = vulnerability_ids_field cvss_info = forms.CharField( @@ -522,7 +553,7 @@ class FindingForm(forms.ModelForm): "invalid_choice": EFFORT_FOR_FIXING_INVALID_CHOICE}) # the only reliable way without hacking internal fields to get predicatble ordering is to make it explicit - field_order = ("title", "group", "date", "sla_start_date", "sla_expiration_date", "cwe", "vulnerability_ids", "severity", "cvss_info", "cvssv3", + field_order = ("title", "group", "date", "sla_start_date", "sla_expiration_date", "cwes", "vulnerability_ids", "severity", "cvss_info", "cvssv3", "cvssv3_score", "cvssv4", "cvssv4_score", "description", "mitigation", "impact", "request", "response", "steps_to_reproduce", "severity_justification", "endpoints", "endpoints_to_add", "references", "active", "mitigated", "mitigated_by", "verified", "false_p", "duplicate", "out_of_scope", "risk_accept", "under_defect_review") @@ -537,6 +568,10 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + # Pre-fill all CWEs (primary first, CWE- form) on edit; mirrors the vulnerability_ids field. + if self.instance and self.instance.pk: + self.fields["cwes"].initial = "\n".join(self.instance.cwes) + if settings.V3_FEATURE_LOCATIONS: self.fields["endpoints"].queryset = Location.objects.filter(products__product=self.instance.test.engagement.product) if self.instance and self.instance.pk: @@ -634,7 +669,7 @@ def _post_clean(self): class Meta: model = Finding - exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "inherited_tags", + exclude = ("reporter", "url", "numerical_severity", "under_review", "reviewers", "cve", "cwe", "inherited_tags", "review_requested_by", "is_mitigated", "jira_creation", "jira_change", "sonarqube_issue", "endpoints", "endpoint_status") diff --git a/dojo/finding/ui/views.py b/dojo/finding/ui/views.py index e4ed5327e1c..1436edd98fe 100644 --- a/dojo/finding/ui/views.py +++ b/dojo/finding/ui/views.py @@ -939,6 +939,7 @@ def process_finding_form(self, request: HttpRequest, finding: Finding, context: self.process_burp_request_response(new_finding, context) # Save the vulnerability IDs finding_helper.save_vulnerability_ids(new_finding, context["form"].cleaned_data["vulnerability_ids"].split()) + finding_helper.save_cwes(new_finding) # Add a success message messages.add_message( request, diff --git a/dojo/finding/vulnerability_id.py b/dojo/finding/vulnerability_id.py new file mode 100644 index 00000000000..a8ed8dae217 --- /dev/null +++ b/dojo/finding/vulnerability_id.py @@ -0,0 +1,39 @@ +"""Pure helpers for vulnerability identifiers (no model imports, safe to import anywhere).""" + + +def cwe_number(value) -> int | None: + """``"CWE-79"`` / ``"79"`` / ``79`` -> ``79`` (case-insensitive); anything else -> ``None``.""" + if value is None: + return None + token = str(value).strip().upper().removeprefix("CWE-") + return int(token) if token.isdigit() else None + + +def parse_cwes(text: str | None) -> list[int]: + """ + Parse CWE numbers from user text (one per line or comma-separated). + + Accepts ``89`` or ``CWE-89`` (case-insensitive); ignores anything non-numeric. + """ + result = [] + for token in (text or "").replace(",", "\n").split(): + number = cwe_number(token) + if number is not None: + result.append(number) + return result + + +def resolve_vulnerability_id_type(vulnerability_id: str | None) -> str | None: + """ + Autodetect the type from the id's leading prefix (the part before the first ``-``). + + Structural, no registry: ``CVE-2024-1 -> "CVE"``, ``GHSA-... -> "GHSA"``, + ``RUSTSEC-2021-0001 -> "RUSTSEC"``, ``ALINUX2-SA-... -> "ALINUX2"``. Returns the uppercased + prefix, or ``None`` when there is no non-numeric prefix (bare numbers / UUIDs / no dash). + """ + if not vulnerability_id: + return None + prefix = str(vulnerability_id).strip().split("-", 1)[0].strip() + if not prefix or prefix.isdigit(): + return None + return prefix.upper() diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index fe015b610b0..1f31cb4784a 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -12,6 +12,7 @@ import dojo.finding.helper as finding_helper import dojo.risk_acceptance.helper as ra_helper +from dojo.finding.vulnerability_id import resolve_vulnerability_id_type from dojo.importers.options import ImporterOptions from dojo.jira.services import is_keep_in_sync from dojo.location.models import Location @@ -27,6 +28,7 @@ Endpoint, FileUpload, Finding, + Finding_CWE, Test, Test_Import, Test_Import_Finding_Action, @@ -80,6 +82,8 @@ def __init__( ImporterOptions.__init__(self, *args, **kwargs) self.pending_vulnerability_ids: list[Vulnerability_Id] = [] self.pending_vuln_id_deletes: list[int] = [] + self.pending_cwes: list[Finding_CWE] = [] + self.pending_cwe_deletes: list[int] = [] self.pending_burp_rr: list[BurpRawRequestResponse] = [] def check_child_implementation_exception(self): @@ -791,23 +795,54 @@ def store_vulnerability_ids( vulnerability_ids_to_process = list(dict.fromkeys(finding.unsaved_vulnerability_ids or [])) vulnerability_ids_to_process = [x for x in vulnerability_ids_to_process if x.strip()] self.pending_vulnerability_ids.extend([ - Vulnerability_Id(finding=finding, vulnerability_id=vid) + Vulnerability_Id(finding=finding, vulnerability_id=vid, vulnerability_id_type=resolve_vulnerability_id_type(vid)) for vid in vulnerability_ids_to_process ]) if vulnerability_ids_to_process: finding.cve = vulnerability_ids_to_process[0] else: finding.cve = None + self.store_cwes(finding) return finding + def finding_cwe_numbers(self, finding: Finding) -> list[int]: + """Primary Finding.cwe plus any parser-supplied unsaved_cwes, positive and deduplicated.""" + cwe_numbers = [] + if finding.cwe and finding.cwe > 0: + cwe_numbers.append(finding.cwe) + cwe_numbers += [cwe for cwe in (getattr(finding, "unsaved_cwes", None) or []) if cwe and cwe > 0] + return list(dict.fromkeys(cwe_numbers)) + + def store_cwes(self, finding: Finding) -> None: + """Accumulate Finding_CWE rows for bulk insert at the batch boundary (via flush_vulnerability_ids).""" + self.pending_cwes.extend([ + Finding_CWE(finding=finding, cwe=cwe) for cwe in self.finding_cwe_numbers(finding) + ]) + + def reconcile_cwes(self, finding: Finding) -> None: + """Accumulate a delete+insert of Finding_CWE rows for a reimported finding when its CWEs changed.""" + new_cwes = set(self.finding_cwe_numbers(finding)) + # finding_cwe_set is prefetched on reimport candidates (build_candidate_scope_queryset). + existing_cwes = {row.cwe for row in finding.finding_cwe_set.all()} + if existing_cwes == new_cwes: + return + self.pending_cwe_deletes.append(finding.id) + self.pending_cwes.extend([Finding_CWE(finding=finding, cwe=cwe) for cwe in new_cwes]) + def flush_vulnerability_ids(self) -> None: - """Delete stale and bulk-insert accumulated Vulnerability_Id objects, then clear buffers.""" + """Delete stale and bulk-insert accumulated Vulnerability_Id / Finding_CWE objects, then clear buffers.""" if self.pending_vuln_id_deletes: Vulnerability_Id.objects.filter(finding_id__in=self.pending_vuln_id_deletes).delete() self.pending_vuln_id_deletes.clear() if self.pending_vulnerability_ids: Vulnerability_Id.objects.bulk_create(self.pending_vulnerability_ids, batch_size=1000) self.pending_vulnerability_ids.clear() + if self.pending_cwe_deletes: + Finding_CWE.objects.filter(finding_id__in=self.pending_cwe_deletes).delete() + self.pending_cwe_deletes.clear() + if self.pending_cwes: + Finding_CWE.objects.bulk_create(self.pending_cwes, batch_size=1000, ignore_conflicts=True) + self.pending_cwes.clear() def process_files( self, diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 9defa8352f2..e0664ca4d37 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -13,6 +13,7 @@ find_candidates_for_deduplication_unique_id, find_candidates_for_reimport_legacy, ) +from dojo.finding.vulnerability_id import resolve_vulnerability_id_type from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.base_location_manager import LocationHandler from dojo.importers.options import ImporterOptions @@ -966,6 +967,10 @@ def reconcile_vulnerability_ids( vulnerability_ids_to_process = list(dict.fromkeys(finding.unsaved_vulnerability_ids or [])) vulnerability_ids_to_process = [x for x in vulnerability_ids_to_process if x.strip()] + # Reconcile CWEs independently of the vulnerability_ids early-exit below (CWEs may change + # while vulnerability_ids do not, and vice versa). + self.reconcile_cwes(finding) + # Use prefetched data directly without triggering queries existing_vuln_ids = {v.vulnerability_id for v in finding.vulnerability_id_set.all()} new_vuln_ids = set(vulnerability_ids_to_process) @@ -981,7 +986,7 @@ def reconcile_vulnerability_ids( # Accumulate delete + insert for batch flush self.pending_vuln_id_deletes.append(finding.id) self.pending_vulnerability_ids.extend([ - Vulnerability_Id(finding=finding, vulnerability_id=vid) + Vulnerability_Id(finding=finding, vulnerability_id=vid, vulnerability_id_type=resolve_vulnerability_id_type(vid)) for vid in vulnerability_ids_to_process ]) if vulnerability_ids_to_process: diff --git a/dojo/management/commands/migrate_cwe.py b/dojo/management/commands/migrate_cwe.py new file mode 100644 index 00000000000..64492138c08 --- /dev/null +++ b/dojo/management/commands/migrate_cwe.py @@ -0,0 +1,34 @@ +import logging + +from django.core.management.base import BaseCommand + +from dojo.models import Finding, Finding_CWE +from dojo.utils import mass_model_updater + +logger = logging.getLogger(__name__) + + +def create_finding_cwe(finding): + if finding.cwe and finding.cwe > 0: + # Unique (finding, cwe) constraint makes this idempotent. + Finding_CWE.objects.get_or_create(finding=finding, cwe=finding.cwe) + + +class Command(BaseCommand): + + """This management command creates Finding_CWE rows from the cwe field for all findings.""" + + help = "Usage: manage.py migrate_cwe" + + def handle(self, *args, **options): + + logger.info("Starting migration of cwes for Findings") + findings = Finding.objects.filter(cwe__gt=0) + mass_model_updater( + Finding, + findings, + create_finding_cwe, + fields=None, + page_size=100, + log_prefix="creating finding cwes: ", + ) diff --git a/dojo/models.py b/dojo/models.py index c4d3d0aaa68..582708eaf64 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -390,6 +390,7 @@ class Meta: CWE, # noqa: F401 -- re-export BurpRawRequestResponse, # noqa: F401 -- re-export Finding, + Finding_CWE, # noqa: F401 -- re-export Finding_Group, # noqa: F401 -- re-export Finding_Template, Vulnerability_Id, # noqa: F401 -- re-export diff --git a/dojo/product/ui/views.py b/dojo/product/ui/views.py index 5d6924ae033..6fd06517051 100644 --- a/dojo/product/ui/views.py +++ b/dojo/product/ui/views.py @@ -1494,6 +1494,7 @@ def process_forms(self, request: HttpRequest, test: Test, context: dict): if all_forms_valid: # if we're removing the "duplicate" in the edit finding screen finding_helper.save_vulnerability_ids(finding, context["form"].cleaned_data["vulnerability_ids"].split()) + finding_helper.save_cwes(finding) # Push things to jira if needed finding.save(push_to_jira=push_to_jira) # Save the burp req resp diff --git a/dojo/templates/dojo/ad_hoc_findings.html b/dojo/templates/dojo/ad_hoc_findings.html index d088b5c0570..bd9fc1af544 100644 --- a/dojo/templates/dojo/ad_hoc_findings.html +++ b/dojo/templates/dojo/ad_hoc_findings.html @@ -64,7 +64,7 @@

Github

elem.id = "req" } - if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && !$(elem).hasClass('select2-search__field')) { + if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && elem.name != 'cwes' && !$(elem).hasClass('select2-search__field')) { var mde = new EasyMDE({ spellChecker: false, inputStyle: "contenteditable", diff --git a/dojo/templates/dojo/add_findings.html b/dojo/templates/dojo/add_findings.html index d515bf7ee28..38f83dc23cd 100644 --- a/dojo/templates/dojo/add_findings.html +++ b/dojo/templates/dojo/add_findings.html @@ -111,7 +111,7 @@

JIRA

elem.id = "req" } - if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && !$(elem).hasClass('select2-search__field')) { + if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && elem.name != 'cwes' && !$(elem).hasClass('select2-search__field')) { var mde = new EasyMDE({ spellChecker: false, inputStyle: "contenteditable", diff --git a/dojo/templates/dojo/edit_finding.html b/dojo/templates/dojo/edit_finding.html index c6726860709..24267a368a5 100644 --- a/dojo/templates/dojo/edit_finding.html +++ b/dojo/templates/dojo/edit_finding.html @@ -171,7 +171,7 @@

GitHub

elem.id = "req" } - if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && !$(elem).hasClass('select2-search__field')) { + if (elem.name != 'endpoints_to_add' && elem.name != 'vulnerability_ids' && elem.name != 'cwes' && !$(elem).hasClass('select2-search__field')) { var mde = new EasyMDE({ spellChecker: false, inputStyle: "contenteditable", diff --git a/dojo/templates/dojo/view_finding.html b/dojo/templates/dojo/view_finding.html index 19d94fe2942..9f4fc6b682b 100755 --- a/dojo/templates/dojo/view_finding.html +++ b/dojo/templates/dojo/view_finding.html @@ -464,6 +464,27 @@

{% endif %} {% endwith %} + {% with finding.cwes|slice:"1:" as additional_cwes %} + {% if additional_cwes %} +
+ + + + + + + +
Additional CWEs
+ {% for cwe in additional_cwes %} + + {{ cwe }} + {% if not forloop.last %}, {% endif %} + {% endfor %} +
+
+ {% endif %} + {% endwith %} + {% if finding.static_finding or finding.line > 0 %} {% if finding.sast_source_object or finding.sast_sink_object or finding.sast_source_file_path or finding.sast_source_line > 0 %} {# For tools that give information on both source (start) and sink (end) of the attack vector #} diff --git a/unittests/test_vulnerability_id_cwe.py b/unittests/test_vulnerability_id_cwe.py new file mode 100644 index 00000000000..9302caa9471 --- /dev/null +++ b/unittests/test_vulnerability_id_cwe.py @@ -0,0 +1,112 @@ +"""CWE-as-a-separate-relationship (Finding_CWE), the API, and vulnerability-id type autodetection.""" +from django.test import SimpleTestCase +from rest_framework.authtoken.models import Token +from rest_framework.test import APIClient + +from dojo.finding.helper import save_cwes +from dojo.finding.vulnerability_id import cwe_number, parse_cwes, resolve_vulnerability_id_type +from dojo.models import Finding, Finding_CWE, User +from unittests.dojo_test_case import DojoAPITestCase, DojoTestCase, versioned_fixtures + + +class TestVulnerabilityIdTypeResolver(SimpleTestCase): + + def test_autodetect(self): + self.assertEqual(resolve_vulnerability_id_type("CVE-2024-1234"), "CVE") + self.assertEqual(resolve_vulnerability_id_type("GHSA-9v3m-xxxx"), "GHSA") + self.assertEqual(resolve_vulnerability_id_type("RUSTSEC-2021-0001"), "RUSTSEC") + self.assertEqual(resolve_vulnerability_id_type("ALINUX2-SA-2021"), "ALINUX2") + self.assertEqual(resolve_vulnerability_id_type("cve-2024-1"), "CVE") + # No non-numeric prefix -> None + self.assertIsNone(resolve_vulnerability_id_type("1234")) + self.assertIsNone(resolve_vulnerability_id_type("")) + self.assertIsNone(resolve_vulnerability_id_type(None)) + + def test_cwe_number(self): + self.assertEqual(cwe_number("CWE-79"), 79) + self.assertEqual(cwe_number("cwe-79"), 79) + self.assertEqual(cwe_number("79"), 79) + self.assertEqual(cwe_number(79), 79) + self.assertIsNone(cwe_number("foo")) + self.assertIsNone(cwe_number("CWE-")) + self.assertIsNone(cwe_number(None)) + + def test_parse_cwes(self): + self.assertEqual(parse_cwes("79\n89\nCWE-22"), [79, 89, 22]) + self.assertEqual(parse_cwes("79, 89"), [79, 89]) + self.assertEqual(parse_cwes("cwe-79"), [79]) + self.assertEqual(parse_cwes("not-a-cwe\n89"), [89]) + self.assertEqual(parse_cwes(""), []) + self.assertEqual(parse_cwes(None), []) + + +@versioned_fixtures +class TestFindingCwe(DojoTestCase): + fixtures = ["dojo_testdata.json"] + + def setUp(self): + self.finding = Finding.objects.get(id=2) + Finding_CWE.objects.filter(finding=self.finding).delete() + + def _stored_cwes(self): + return set(Finding_CWE.objects.filter(finding=self.finding).values_list("cwe", flat=True)) + + def test_primary_cwe_saved_and_exposed(self): + self.finding.cwe = 79 + save_cwes(self.finding) + self.assertEqual(self._stored_cwes(), {79}) + # cwes property returns canonical CWE- strings + self.assertEqual(Finding.objects.get(id=2).cwes, ["CWE-79"]) + + def test_multiple_cwes(self): + self.finding.cwe = 79 + self.finding.unsaved_cwes = [89, 89, 22] # includes a duplicate + save_cwes(self.finding) + self.assertEqual(self._stored_cwes(), {79, 89, 22}) + # primary first, deduplicated, CWE- form + self.assertEqual(Finding.objects.get(id=2).cwes, ["CWE-79", "CWE-89", "CWE-22"]) + + def test_no_cwe_stores_nothing(self): + self.finding.cwe = 0 + save_cwes(self.finding) + self.assertEqual(self._stored_cwes(), set()) + self.assertEqual(Finding.objects.get(id=2).cwes, []) + + def test_copy_finding_copies_cwes(self): + self.finding.cwe = 79 + self.finding.unsaved_cwes = [89] + save_cwes(self.finding) + copy = self.finding.copy() + self.assertEqual({79, 89}, set(copy.finding_cwe_set.values_list("cwe", flat=True))) + + +@versioned_fixtures +class TestFindingCwesAPI(DojoAPITestCase): + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + self.system_settings(enable_jira=True) + self.testuser = User.objects.get(username="admin") + self.testuser.usercontactinfo.block_execution = True + self.testuser.usercontactinfo.save() + token = Token.objects.get(user=self.testuser) + self.client = APIClient() + self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self.client.force_login(self.get_test_admin()) + + def test_finding_create_with_cwes(self): + # cwes mirror vulnerability_ids: nested [{"cwe": "CWE-79"}], first is the primary Finding.cwe + finding_details = self.get_finding_api(2) + del finding_details["id"] + finding_details.pop("cve", None) + finding_details.pop("cwe", None) + new_cwes = [{"cwe": "CWE-79"}, {"cwe": "CWE-89"}] + finding_details["cwes"] = new_cwes + response = self.post_new_finding_api(finding_details) + # response echoes the CWEs in CWE- form + self.assertEqual({"CWE-79", "CWE-89"}, {entry["cwe"] for entry in response.get("cwes")}) + finding = Finding.objects.get(id=response.get("id")) + # primary cwe (int) set from the first entry; both stored as Finding_CWE rows + self.assertEqual(79, finding.cwe) + self.assertEqual({79, 89}, set(finding.finding_cwe_set.values_list("cwe", flat=True)))