Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions charon/cmd/cmd_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@
default=False
)
@option("--dryrun", "-n", is_flag=True, default=False)
@option(
"--sign_result_loc",
"-l",
default="/tmp/sign",
help="""
The local save path for oras to pull the radas signature result.
Sign request will use this path to download the signature result,
Upload will use the file on this path to generate the corresponding .asc files
""",
)
@command()
def upload(
repo: str,
Expand All @@ -150,7 +160,8 @@ def upload(
sign_key: str = "redhatdevel",
debug=False,
quiet=False,
dryrun=False
dryrun=False,
sign_result_loc="/tmp/sign"
):
"""Upload all files from a released product REPO to Ronda
Service. The REPO points to a product released tarball which
Expand Down Expand Up @@ -221,7 +232,8 @@ def upload(
key=sign_key,
dry_run=dryrun,
manifest_bucket_name=manifest_bucket_name,
config=config
config=config,
sign_result_loc=sign_result_loc
)
if not succeeded:
sys.exit(1)
Expand Down
44 changes: 35 additions & 9 deletions charon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import logging
import os
from typing import Dict, List, Optional
Expand All @@ -34,13 +35,20 @@ def __init__(self, data: Dict):
self.__client_key: str = data.get("client_key", None)
self.__client_key_pass_file: str = data.get("client_key_pass_file", None)
self.__root_ca: str = data.get("root_ca", "/etc/pki/tls/certs/ca-bundle.crt")
self.__quay_radas_registry_config: Optional[str] = data.get(
"quay_radas_registry_config", None
)
self.__radas_sign_timeout_retry_count: int = data.get("radas_sign_timeout_retry_count", 10)
self.__radas_sign_timeout_retry_interval: int = data.get(
"radas_sign_timeout_retry_interval", 60
)

def validate(self) -> bool:
if not self.__umb_host:
logger.error("Missing host name setting for UMB!")
return False
if not self.__result_queue:
logger.error("Missing the queue setting to receive siging result in UMB!")
logger.error("Missing the queue setting to receive signing result in UMB!")
return False
if not self.__request_queue:
logger.error("Missing the queue setting to send signing request in UMB!")
Expand All @@ -57,10 +65,17 @@ def validate(self) -> bool:
if self.__root_ca and not os.access(self.__root_ca, os.R_OK):
logger.error("The root ca file is not valid!")
return False
if self.__quay_radas_registry_config and not os.access(
self.__quay_radas_registry_config, os.R_OK
):
self.__quay_radas_registry_config = None
logger.warning(
"The quay registry config for oras is not valid, will ignore the registry config!"
)
return True

def umb_target(self) -> str:
return f'amqps://{self.__umb_host}:{self.__umb_host_port}'
return f"amqps://{self.__umb_host}:{self.__umb_host_port}"

def result_queue(self) -> str:
return self.__result_queue
Expand All @@ -77,7 +92,7 @@ def client_key(self) -> str:
def client_key_password(self) -> str:
pass_file = self.__client_key_pass_file
if os.access(pass_file, os.R_OK):
with open(pass_file, 'r') as f:
with open(pass_file, "r") as f:
return f.read()
elif pass_file:
logger.warning("The key password file is not accessible. Will ignore the password.")
Expand All @@ -86,6 +101,15 @@ def client_key_password(self) -> str:
def root_ca(self) -> str:
return self.__root_ca

def quay_radas_registry_config(self) -> Optional[str]:
return self.__quay_radas_registry_config

def radas_sign_timeout_retry_count(self) -> int:
return self.__radas_sign_timeout_retry_count

def radas_sign_timeout_retry_interval(self) -> int:
return self.__radas_sign_timeout_retry_interval


class CharonConfig(object):
"""CharonConfig is used to store all configurations for charon
Expand All @@ -102,9 +126,10 @@ def __init__(self, data: Dict):
self.__ignore_signature_suffix: Dict = data.get("ignore_signature_suffix", None)
self.__signature_command: str = data.get("detach_signature_command", None)
self.__aws_cf_enable: bool = data.get("aws_cf_enable", False)
self.__radas_config__: Optional[RadasConfig] = None
radas_config: Dict = data.get("radas", None)
if radas_config:
self.__radas_config__: RadasConfig = RadasConfig(radas_config)
self.__radas_config__ = RadasConfig(radas_config)

def get_ignore_patterns(self) -> List[str]:
return self.__ignore_patterns
Expand Down Expand Up @@ -133,22 +158,23 @@ def get_detach_signature_command(self) -> str:
def is_aws_cf_enable(self) -> bool:
return self.__aws_cf_enable

Copy link
Member

@ligangty ligangty May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should add a __is_radas_enabled field here for convenient, which set to self.__radas_config__ and self.__radas_config__.validate(). So then we can use this with a get method is_radas_enabled() for unified way in other place to check if radas is usable.

def get_radas_config(self) -> RadasConfig:
def is_radas_enabled(self) -> bool:
return bool(self.__radas_config__ and self.__radas_config__.validate())

def get_radas_config(self) -> Optional[RadasConfig]:
return self.__radas_config__


def get_config(cfgPath=None) -> CharonConfig:
config_file_path = cfgPath
if not config_file_path or not os.path.isfile(config_file_path):
config_file_path = os.path.join(os.getenv("HOME", ""), ".charon", CONFIG_FILE)
data = read_yaml_from_file_path(config_file_path, 'schemas/charon.json')
data = read_yaml_from_file_path(config_file_path, "schemas/charon.json")
return CharonConfig(data)


def get_template(template_file: str) -> str:
template = os.path.join(
os.getenv("HOME", ''), ".charon/template", template_file
)
template = os.path.join(os.getenv("HOME", ""), ".charon/template", template_file)
if os.path.isfile(template):
with open(template, encoding="utf-8") as file_:
return file_.read()
Expand Down
2 changes: 2 additions & 0 deletions charon/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,5 @@
DEFAULT_ERRORS_LOG = "errors.log"

DEFAULT_REGISTRY = "localhost"
DEFAULT_RADAS_SIGN_TIMEOUT_RETRY_COUNT = 10
DEFAULT_RADAS_SIGN_TIMEOUT_RETRY_INTERVAL = 60
41 changes: 35 additions & 6 deletions charon/pkgs/maven.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from charon.utils.files import HashType
import charon.pkgs.indexing as indexing
import charon.pkgs.signature as signature
import charon.pkgs.radas_signature_handler as radas_signature
from charon.utils.files import overwrite_file, digest, write_manifest
from charon.utils.archive import extract_zip_all
from charon.utils.strings import remove_prefix
Expand Down Expand Up @@ -274,7 +275,8 @@ def handle_maven_uploading(
key=None,
dry_run=False,
manifest_bucket_name=None,
config=None
config=None,
sign_result_loc="/tmp/sign"
) -> Tuple[str, bool]:
""" Handle the maven product release tarball uploading process.
* repo is the location of the tarball in filesystem
Expand Down Expand Up @@ -408,11 +410,38 @@ def handle_maven_uploading(
if cf_enable:
cf_invalidate_paths.extend(archetype_files)

# 10. Generate signature file if contain_signature is set to True
if gen_sign:
conf = get_config(config)
if not conf:
sys.exit(1)
# 10. Generate signature file if radas sign is enabled,
# or do detached sign if contain_signature is set to True
conf = get_config(config)
if not conf:
sys.exit(1)

if conf.is_radas_enabled():
logger.info("Start generating radas signature files for s3 bucket %s\n", bucket_name)
(_failed_metas, _generated_signs) = radas_signature.generate_radas_sign(
top_level=top_level, sign_result_loc=sign_result_loc
)
if not _generated_signs:
logger.error(
"No sign result files were downloaded, "
"please make sure the sign process is already done and without timeout")
return (tmp_root, False)

failed_metas.extend(_failed_metas)
generated_signs.extend(_generated_signs)
logger.info("Radas signature files generation done.\n")

logger.info("Start upload radas signature files to s3 bucket %s\n", bucket_name)
_failed_metas = s3_client.upload_signatures(
meta_file_paths=generated_signs,
target=(bucket_name, prefix),
product=None,
root=top_level
)
failed_metas.extend(_failed_metas)
logger.info("Radas signature files uploading done.\n")

elif gen_sign:
suffix_list = __get_suffix(PACKAGE_TYPE_MAVEN, conf)
command = conf.get_detach_signature_command()
artifacts = [s for s in valid_mvn_paths if not s.endswith(tuple(suffix_list))]
Expand Down
72 changes: 72 additions & 0 deletions charon/pkgs/oras_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Copyright (C) 2022 Red Hat, Inc. (https://github.com/Commonjava/charon)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import oras.client
import logging
from charon.config import get_config
from typing import List
from urllib.parse import urlparse

logger = logging.getLogger(__name__)


class OrasClient:
"""
Wrapper for oras‑py’s OrasClient, deciding whether to login based on config.
"""

def __init__(self):
self.conf = get_config()
self.client = oras.client.OrasClient()

def login_if_needed(self, registry: str) -> None:
"""
If quay_radas_registry_config is provided, call login to authenticate.
"""
if not registry.startswith("http://") and not registry.startswith("https://"):
registry = "https://" + registry
registry = urlparse(registry).netloc

rconf = self.conf.get_radas_config() if self.conf else None
if rconf and rconf.quay_radas_registry_config():
logger.info("Logging in to registry: %s", registry)
res = self.client.login(
hostname=registry,
config_path=rconf.quay_radas_registry_config(),
)
logger.info(res)
else:
logger.info("Registry config is not provided, skip login.")

def pull(self, result_reference_url: str, sign_result_loc: str) -> List[str]:
"""
Call oras‑py’s pull method to pull the remote file to local.
Args:
result_reference_url (str):
Reference of the remote file (e.g. “quay.io/repository/signing/radas@hash”).
sign_result_loc (str):
Local save path (e.g. “/tmp/sign”).
"""
files = []
try:
self.login_if_needed(registry=result_reference_url)
files = self.client.pull(target=result_reference_url, outdir=sign_result_loc)
logger.info("Pull file from %s to %s", result_reference_url, sign_result_loc)
except Exception as e:
logger.error(
"Failed to pull file from %s to %s: %s", result_reference_url, sign_result_loc, e
)
return files
Loading