Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ci/src/dependencies/integration/github/github_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import logging
import os
import traceback
Expand All @@ -7,6 +8,7 @@
import requests
from github import Github
from github.GithubException import GithubException
from integration.github.github_dependabot import GHDependabotAlert, GHDependabotSearchQuery
from integration.github.github_dependency_submission import GHSubDetector, GHSubJob, GHSubRequest
from integration.github.github_workflow_config import GithubWorklow

Expand Down Expand Up @@ -69,6 +71,20 @@ def run_workflow(self, workflow: GithubWorklow) -> bool:
logging.debug(f"Could not run workflow {workflow}.\nReason: {traceback.format_exc()}")
return False

def get_dependabot_alerts(self, query: GHDependabotSearchQuery) -> typing.List[GHDependabotAlert]:
repo = self.github.get_repo(f"{query.owner}/{query.repo}", lazy=True)

alerts = []
# unfortunately the SDK doesn't support retrieving alerts for multiple values at once, so we have to do it for each tuple separately
for state, severity, ecosystem in itertools.product(
query.get_states(), query.get_severities(), query.get_ecosystems()
):
# pagination is handled by the SDK
for alert in repo.get_dependabot_alerts(state, severity, ecosystem):
alerts.append(GHDependabotAlert(alert.html_url, alert.security_vulnerability.severity))

return alerts

@staticmethod
def submit_dependencies(toml_lock_filenames: typing.List[typing.Tuple[str, str]]) -> None:
def get_sha():
Expand Down
94 changes: 94 additions & 0 deletions ci/src/dependencies/integration/github/github_dependabot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import typing
from dataclasses import dataclass
from enum import Enum, auto
from typing import Any, List

from github.GithubObject import NotSet


@dataclass
class GHDependabotAlertState(Enum):
AUTO_DISMISSED = auto()
DISMISSED = auto()
FIXED = auto()
OPEN = auto()

def __repr__(self):
return self.name


@dataclass
class GHDependabotAlertSeverity(Enum):
LOW = auto()
MEDIUM = auto()
HIGH = auto()
CRITICAL = auto()

@staticmethod
def from_str(severity: str) -> "GHDependabotAlertSeverity":
if severity is None:
raise RuntimeError("severity cannot be None")
return GHDependabotAlertSeverity.__members__[severity.upper()]

def __repr__(self):
return self.name


@dataclass
class GHDependabotAlertEcosystem(Enum):
COMPOSER = auto()
GO = auto()
MAVEN = auto()
NPM = auto()
NUGET = auto()
PIP = auto()
PUB = auto()
RUBYGEMS = auto()
RUST = auto()

def __repr__(self):
return self.name


@dataclass
class GHDependabotSearchQuery:
owner: str
repo: str
state: List[GHDependabotAlertState] = None
severity: List[GHDependabotAlertSeverity] = None
ecosystem: List[GHDependabotAlertEcosystem] = None

def __post_init__(self):
assert self.owner is not None and len(self.owner) > 0
assert self.repo is not None and len(self.repo) > 0
for field in [self.state, self.severity, self.ecosystem]:
if field is not None:
assert len(field) == len(set([x.name for x in field]))

@staticmethod
def __get_field(field: List[Any]) -> typing.List[typing.Union[str, NotSet]]:
if field is None or len(field) == 0:
return [NotSet]
return [x.name.lower() for x in field]

def get_states(self) -> typing.List[typing.Union[str, NotSet]]:
return GHDependabotSearchQuery.__get_field(self.state)

def get_severities(self) -> typing.List[typing.Union[str, NotSet]]:
return GHDependabotSearchQuery.__get_field(self.severity)

def get_ecosystems(self) -> typing.List[typing.Union[str, NotSet]]:
return GHDependabotSearchQuery.__get_field(self.ecosystem)


@dataclass
class GHDependabotAlert:
html_url: str
severity: GHDependabotAlertSeverity

def __init__(self, html_url: str, severity: str):
assert html_url is not None and len(html_url) > 0
assert severity is not None and len(severity) > 0

self.html_url = html_url
self.severity = GHDependabotAlertSeverity.from_str(severity)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from data_source.jira_finding_data_source import JiraFindingDataSource
from integration.github.github_api import GithubApi
from model.ic import (
REPO_NAME,
get_ic_repo_ci_pipeline_base_url,
Expand Down Expand Up @@ -40,5 +41,6 @@
BazelRustDependencyManager(),
JiraFindingDataSource(finding_data_source_subscribers, app_owner_msg_subscriber=notifier),
scanner_subscribers,
github_api=GithubApi(),
)
scanner_job.do_release_scan(get_ic_repo_for_rust())
76 changes: 28 additions & 48 deletions ci/src/dependencies/scanner/dependency_scanner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import logging
import os
import pathlib
Expand All @@ -12,9 +11,14 @@
from data_source.findings_failover_data_store import FindingsFailoverDataStore
from integration.github.github_api import GithubApi
from integration.github.github_app import GithubApp
from integration.github.github_dependabot import (
GHDependabotAlertEcosystem,
GHDependabotAlertSeverity,
GHDependabotAlertState,
GHDependabotSearchQuery,
)
from model.finding import Finding
from model.repository import Repository
from model.security_risk import SecurityRisk
from scanner.manager.dependency_manager import DependencyManager
from scanner.process_executor import ProcessExecutor
from scanner.scanner_job_type import ScannerJobType
Expand All @@ -35,12 +39,14 @@ def __init__(
scanner_subscribers: typing.List[ScannerSubscriber],
failover_data_store: typing.Optional[FindingsFailoverDataStore] = None,
github_app: GithubApp = None,
github_api: GithubApi = None,
):
self.subscribers = scanner_subscribers
self.dependency_manager = dependency_manager
self.finding_data_source = finding_data_source
self.failover_data_store = failover_data_store
self.github_app = github_app
self.github_api = github_api
self.job_id = os.environ.get("CI_PIPELINE_ID", "CI_PIPELINE_ID")
self.root = PROJECT_ROOT

Expand Down Expand Up @@ -278,65 +284,39 @@ def do_merge_request_scan(self, repository: Repository):

def do_release_scan(self, repository: Repository):
should_fail_job = False
scanner_id = "DEPENDABOT"
try:
findings = self.dependency_manager.get_findings(repository.name, repository.projects[0], None)
failures: typing.List = []

if len(findings) == 0:
return

for finding in findings:
vulnerable_dependency = finding.vulnerable_dependency
jira_finding = self.finding_data_source.get_open_finding(
repository.name,
self.dependency_manager.get_scanner_id(),
vulnerable_dependency.id,
vulnerable_dependency.version,
# we only want to fail if there are open rust findings with severity HIGH or CRITICAL
findings = self.github_api.get_dependabot_alerts(
GHDependabotSearchQuery(
"dfinity",
repository.projects[0].path,
[GHDependabotAlertState.OPEN],
[GHDependabotAlertSeverity.HIGH, GHDependabotAlertSeverity.CRITICAL],
[GHDependabotAlertEcosystem.RUST],
)
if jira_finding:
if not jira_finding.risk:
failures.append(f"Risk assessment not done for {jira_finding.more_info}")

if (jira_finding.risk == SecurityRisk.HIGH or jira_finding.risk == SecurityRisk.CRITICAL) and (
not jira_finding.due_date
or jira_finding.due_date - int(datetime.datetime.utcnow().timestamp()) < 0
):
failures.append(
f"Risk for finding {jira_finding.more_info} crosses release threshold and due date for fixing it has passed"
)
else:
failures.append(f"New finding has been found {finding}")

if len(failures) == 0:
return

git_commit_sha = os.environ.get("CI_COMMIT_SHA", "CI_COMMIT_SHA")
exception = self.finding_data_source.commit_has_block_exception(CommitType.RELEASE_COMMIT, git_commit_sha)
)

if exception:
# no open findings -> all good
if len(findings) == 0:
return

# At this point, there are failures and there is no exceptions
# Job must be failed
# we have open rust findings -> fail job
for subscriber in self.subscribers:
subscriber.on_release_build_blocked(self.dependency_manager.get_scanner_id(), self.job_id)
logging.error("Release job failed with failures.")
logging.info(f"Release job failed with failures : {failures}")
subscriber.on_release_build_blocked(scanner_id, self.job_id)
# safe to print finding details because these contain only severity and link to dependabot finding which you can only open if you're authorized
logging.error(f"Release job failed with failures : {findings}")

sys.exit(1)
except Exception as err:
should_fail_job = True
logging.error(f"{self.dependency_manager.get_scanner_id()} for {repository.name} failed for {self.job_id}.")
logging.error(f"{scanner_id} for {repository.name} failed for {self.job_id}.")
logging.debug(
f"{self.dependency_manager.get_scanner_id()} for {repository.name} failed for {self.job_id} with error:\n{traceback.format_exc()}"
f"{scanner_id} for {repository.name} failed for {self.job_id} with error:\n{traceback.format_exc()}"
)
for subscriber in self.subscribers:
subscriber.on_scan_job_failed(
self.dependency_manager.get_scanner_id(), ScannerJobType.RELEASE_SCAN, self.job_id, str(err)
)
subscriber.on_scan_job_failed(scanner_id, ScannerJobType.RELEASE_SCAN, self.job_id, str(err))
finally:
if not should_fail_job:
for subscriber in self.subscribers:
subscriber.on_scan_job_succeeded(
self.dependency_manager.get_scanner_id(), ScannerJobType.RELEASE_SCAN, self.job_id
)
subscriber.on_scan_job_succeeded(scanner_id, ScannerJobType.RELEASE_SCAN, self.job_id)
Loading
Loading