From 730ae43370290817158db5fe675a94b91e419296 Mon Sep 17 00:00:00 2001 From: yma Date: Tue, 27 May 2025 10:46:43 +0800 Subject: [PATCH 1/2] Add Unit tests for RADAS signing results parse and generation --- charon/pkgs/maven.py | 2 +- charon/pkgs/radas_signature_handler.py | 4 + tests/test_radas_sign_handler.py | 162 +++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 tests/test_radas_sign_handler.py diff --git a/charon/pkgs/maven.py b/charon/pkgs/maven.py index 4ca1be0d..9d895365 100644 --- a/charon/pkgs/maven.py +++ b/charon/pkgs/maven.py @@ -423,7 +423,7 @@ def handle_maven_uploading( ) if not _generated_signs: logger.error( - "No sign result files were downloaded, " + "No sign result files were generated, " "please make sure the sign process is already done and without timeout") return (tmp_root, False) diff --git a/charon/pkgs/radas_signature_handler.py b/charon/pkgs/radas_signature_handler.py index 46794f49..5d25212d 100644 --- a/charon/pkgs/radas_signature_handler.py +++ b/charon/pkgs/radas_signature_handler.py @@ -103,6 +103,10 @@ def generate_radas_sign(top_level: str, sign_result_loc: str) -> Tuple[List[str] """ Generate .asc files based on RADAS sign result json file """ + if not os.path.isdir(sign_result_loc): + logger.error("Sign result loc dir does not exist: %s", sign_result_loc) + return [], [] + files = [ os.path.join(sign_result_loc, f) for f in os.listdir(sign_result_loc) diff --git a/tests/test_radas_sign_handler.py b/tests/test_radas_sign_handler.py new file mode 100644 index 00000000..f0b90351 --- /dev/null +++ b/tests/test_radas_sign_handler.py @@ -0,0 +1,162 @@ +""" +Copyright (C) 2022 Red Hat, Inc. (https://github.com/Commonjava/charon) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging +import unittest +import tempfile +import os +import json +import shutil +import builtins +from unittest import mock +from charon.utils.files import overwrite_file +from charon.pkgs.radas_signature_handler import generate_radas_sign + +logger = logging.getLogger(__name__) + + +class RadasSignHandlerTest(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.__prepare_sign_result_file() + + def tearDown(self) -> None: + super().tearDown() + self.__clear_sign_result_file() + + def test_multi_sign_files_generation(self): + self.__prepare_artifacts() + failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_loc) + self.assertEqual(failed, []) + expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc") + expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc") + self.assertEqual(len(generated), 2) + self.assertIn(expected_asc1, generated) + self.assertIn(expected_asc2, generated) + + with open(expected_asc1) as f: + content1 = f.read() + with open(expected_asc2) as f: + content2 = f.read() + self.assertIn("signature1@hash", content1) + self.assertIn("signature2@hash", content2) + + def test_sign_files_generation_with_missing_artifacts(self): + failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_loc) + self.assertEqual(failed, []) + expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc") + expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc") + self.assertEqual(generated, []) + self.assertFalse(os.path.exists(expected_asc1)) + self.assertFalse(os.path.exists(expected_asc2)) + + def test_sign_files_generation_with_failure(self): + self.__prepare_artifacts() + expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc") + expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc") + + # simulate expected_asc1 can not open to write properly + real_open = builtins.open + with mock.patch("builtins.open") as mock_open: + def side_effect(path, *args, **kwargs): + # this is for pylint check + mode = "r" + if len(args) > 0: + mode = args[0] + elif "mode" in kwargs: + mode = kwargs["mode"] + if path == expected_asc1 and "w" in mode: + raise IOError("mock write error") + return real_open(path, *args, **kwargs) + mock_open.side_effect = side_effect + failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_loc) + + self.assertEqual(len(failed), 1) + self.assertNotIn(expected_asc1, generated) + self.assertIn(expected_asc2, generated) + + def test_sign_files_generation_with_missing_result(self): + self.__prepare_artifacts() + # simulate missing pull result by removing the sign result file loc + shutil.rmtree(self.__sign_result_loc) + + failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_loc) + self.assertEqual(failed, []) + expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc") + expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc") + self.assertEqual(generated, []) + self.assertFalse(os.path.exists(expected_asc1)) + self.assertFalse(os.path.exists(expected_asc2)) + + def test_sign_files_generation_with_not_single_results(self): + self.__prepare_artifacts() + another_result_file = os.path.join(self.__sign_result_loc, "result2.json") + overwrite_file(another_result_file, "test_json") + + failed, generated = generate_radas_sign(self.__repo_dir, self.__sign_result_loc) + self.assertEqual(failed, []) + expected_asc1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar.asc") + expected_asc2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar.asc") + self.assertEqual(generated, []) + self.assertFalse(os.path.exists(expected_asc1)) + self.assertFalse(os.path.exists(expected_asc2)) + + def __prepare_sign_result_file(self): + self.__sign_result_loc = tempfile.mkdtemp() + self.__sign_result_file = os.path.join(self.__sign_result_loc, "result.json") + self.__repo_dir = os.path.join(tempfile.mkdtemp(), "maven-repository") + data = { + "request-id": "request-id", + "file-reference": "quay.io/org/maven-zip@hash", + "result": [ + { + "file": "maven-repository/foo/bar/1.0/foo-bar-1.0.jar", + "signature": ( + "-----BEGIN PGP SIGNATURE-----" + "signature1@hash" + "-----END PGP SIGNATURE-----" + ), + "checksum": "sha256:sha256-content", + }, + { + "file": "maven-repository/foo/bar/2.0/foo-bar-2.0.jar", + "signature": ( + "-----BEGIN PGP SIGNATURE-----" + "signature2@hash" + "-----END PGP SIGNATURE-----" + ), + "checksum": "sha256:sha256-content", + }, + ], + } + json_str = json.dumps(data, indent=2) + overwrite_file(self.__sign_result_file, json_str) + + def __prepare_artifacts(self): + os.makedirs(os.path.join(self.__repo_dir, "foo/bar/1.0"), exist_ok=True) + os.makedirs(os.path.join(self.__repo_dir, "foo/bar/2.0"), exist_ok=True) + artifact1 = os.path.join(self.__repo_dir, "foo/bar/1.0/foo-bar-1.0.jar") + artifact2 = os.path.join(self.__repo_dir, "foo/bar/2.0/foo-bar-2.0.jar") + with open(artifact1, "w") as f: + f.write("dummy1") + with open(artifact2, "w") as f: + f.write("dummy2") + + def __clear_sign_result_file(self): + if os.path.exists(self.__sign_result_loc): + shutil.rmtree(self.__sign_result_loc) + if os.path.exists(self.__repo_dir): + shutil.rmtree(self.__repo_dir) From 110279872edc07c92ce2af3ed87fcf2517dcff32 Mon Sep 17 00:00:00 2001 From: yma Date: Tue, 27 May 2025 11:08:33 +0800 Subject: [PATCH 2/2] Change 'result' to 'results' ref signing/radas-nonprod test samples --- charon/pkgs/radas_signature_handler.py | 2 +- tests/test_radas_sign_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/charon/pkgs/radas_signature_handler.py b/charon/pkgs/radas_signature_handler.py index 5d25212d..e670619c 100644 --- a/charon/pkgs/radas_signature_handler.py +++ b/charon/pkgs/radas_signature_handler.py @@ -160,7 +160,7 @@ async def generate_single_sign_file( failed_paths.append(signature_path) logger.error("Failed to write .asc file for %s: %s", artifact_path, e) - result = data.get("result", []) + result = data.get("results", []) return __do_path_cut_and(path_handler=generate_single_sign_file, data=result) diff --git a/tests/test_radas_sign_handler.py b/tests/test_radas_sign_handler.py index f0b90351..b667eaea 100644 --- a/tests/test_radas_sign_handler.py +++ b/tests/test_radas_sign_handler.py @@ -121,7 +121,7 @@ def __prepare_sign_result_file(self): data = { "request-id": "request-id", "file-reference": "quay.io/org/maven-zip@hash", - "result": [ + "results": [ { "file": "maven-repository/foo/bar/1.0/foo-bar-1.0.jar", "signature": (