Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
from vulnerabilities.pipelines.v2_importers import openssl_importer as openssl_importer_v2
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import (
postgresql_live_importer as postgresql_live_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import (
project_kb_msr2019_importer as project_kb_msr2019_importer_v2,
)
Expand Down Expand Up @@ -196,3 +199,9 @@
for key, value in IMPORTERS_REGISTRY.items()
if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo
]

LIVE_IMPORTERS_REGISTRY = create_registry(
[
postgresql_live_importer_v2.PostgreSQLLiveImporterPipeline,
]
)
77 changes: 77 additions & 0 deletions vulnerabilities/pipelines/v2_importers/postgresql_live_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
#

import logging
from typing import Iterable

from packageurl import PackageURL
from univers.versions import InvalidVersion
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.pipelines.v2_importers.postgresql_importer import PostgreSQLImporterPipeline

logger = logging.getLogger(__name__)


class PostgreSQLLiveImporterPipeline(PostgreSQLImporterPipeline):
"""
Live importer for PostgreSQL that filters the batch output to a single PURL.
"""

pipeline_id = "postgresql_live_importer_v2"
supported_types = ["generic"]

@classmethod
def steps(cls):
return (
cls.get_purl_inputs,
cls.collect_and_store_advisories,
)

def get_purl_inputs(self):
purl = self.inputs.get("purl")
if not purl:
raise ValueError("PURL is required for PostgreSQLLiveImporterPipeline")

if isinstance(purl, str):
purl = PackageURL.from_string(purl)

if not isinstance(purl, PackageURL):
raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance")

if purl.type not in self.supported_types:
raise ValueError(
f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}"
)

if not purl.version:
raise ValueError(f"PURL: {purl!s} is expected to have a version")
self.purl = purl

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
for advisory in super().collect_advisories():
if self._advisory_related_purl(advisory):
yield advisory

def _advisory_related_purl(self, advisory: AdvisoryDataV2) -> bool:
if not advisory.affected_packages:
return False

try:
package_version = SemverVersion(self.purl.version)
except InvalidVersion as e:
logger.debug(f"Invalid PURL version {self.purl.version!r}: {e}")
return False

for ap in advisory.affected_packages:
if (ap.affected_version_range and package_version in ap.affected_version_range) or (
ap.fixed_version_range and package_version in ap.fixed_version_range
):
return True

return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
#

import pytest
import requests
from packageurl import PackageURL

from vulnerabilities.pipelines.v2_importers.postgresql_live_importer import (
PostgreSQLLiveImporterPipeline,
)

HTML_BASE = """
<html>
<body>
<table>
<tbody>
<tr>
<td>
<span class="nobr"><a href="/support/security/CVE-2022-1234/">CVE-2022-1234</a></span><br>
<a href="/about/news/postgresql-175-169-1513-1418-and-1321-released-3072/">Announcement</a><br>
</td>
<td>{affected}</td>
<td>{fixed}</td>
<td><a href="/vector?vector=CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H">9.8</a></td>
<td>{summary}</td>
</tr>
</tbody>
</table>
</body>
</html>
"""


class DummyResponse:
def __init__(self, content):
self.content = content.encode("utf-8")


def test_affected_version(monkeypatch):
html = HTML_BASE.format(affected="10.0, 10.1", fixed="10.2", summary="Issue affects all")
monkeypatch.setattr(requests, "get", lambda url: DummyResponse(html))

purl = PackageURL(type="generic", name="postgresql", version="10.1")
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())
assert [adv.to_dict() for adv in advisories] == [
{
"advisory_id": "CVE-2022-1234",
"affected_packages": [
{
"affected_version_range": "vers:generic/10.0.0|10.1.0",
"fixed_by_commit_patches": [],
"fixed_version_range": "vers:generic/10.2.0",
"introduced_by_commit_patches": [],
"package": {
"name": "postgresql",
"namespace": "",
"qualifiers": "",
"subpath": "",
"type": "generic",
"version": "",
},
}
],
"aliases": [],
"date_published": None,
"patches": [],
"references": [
{
"reference_id": "",
"reference_type": "",
"url": "https://www.postgresql.org/support/security/CVE-2022-1234/",
},
{
"reference_id": "",
"reference_type": "",
"url": "https://www.postgresql.org/about/news/postgresql-175-169-1513-1418-and-1321-released-3072/",
},
],
"severities": [
{
"scoring_elements": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
"system": "cvssv3",
"value": "9.8",
}
],
"summary": "Issue affects all",
"url": "https://www.postgresql.org/support/security/",
"weaknesses": [],
}
]


def test_unaffected_version(monkeypatch):
html = HTML_BASE.format(affected="10.0, 10.1", fixed="10.2", summary="Issue affects all")
monkeypatch.setattr(requests, "get", lambda url: DummyResponse(html))

purl = PackageURL(type="generic", name="postgresql", version="14.3")
pipeline = PostgreSQLLiveImporterPipeline(purl=purl)
pipeline.get_purl_inputs()
advisories = list(pipeline.collect_advisories())

assert len(advisories) == 0


def test_invalid_purl():
pipeline = PostgreSQLLiveImporterPipeline()

pipeline.inputs = {"purl": "pkg:pypi/postgresql@10.1"}
with pytest.raises(ValueError):
pipeline.get_purl_inputs()