#!/usr/bin/env python3 import os import sys import tempfile import shutil import logging import subprocess import boto3 class S3Grabber: def __init__(self, bucket_arg, visibility, aws_access_key, aws_secret_key): parts = bucket_arg.split("/", 1) self.bucket_name = parts[0] self.prefix = parts[1] if len(parts) > 1 else "" self.visibility = visibility self.region = self.get_bucket_region( aws_access_key, aws_secret_key, self.bucket_name) session = boto3.session.Session( aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=self.region ) s3 = session.resource('s3') self.bucket = s3.Bucket(self.bucket_name) def get_bucket_region(self, aws_access_key, aws_secret_key, bucket_name): s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) try: response = s3_client.get_bucket_location(Bucket=bucket_name) return response.get('LocationConstraint', 'us-east-1') except Exception as e: logging.error(f"Unable to get region for bucket: {e}") return 'us-east-1' def repodata_info(self): prefix = os.path.join(self.prefix, "repodata").replace("\\", "/") return list(self.bucket.objects.filter(Prefix=prefix)) def download_repodata(self, local_dir): listrepo = self.repodata_info() os.makedirs(os.path.join(local_dir, "repodata"), exist_ok=True) for obj in listrepo: basename = os.path.basename(obj.key) local_path = os.path.join(local_dir, "repodata", basename) logging.info("Downloading Repodata item: s3://%s/%s -> %s", self.bucket_name, obj.key, local_path) self.bucket.download_file(obj.key, local_path) def upload_file(self, local_path, remote_key): s3_key = os.path.join(self.prefix, remote_key).replace("\\", "/") logging.info("Uploading %s -> s3://%s/%s", local_path, self.bucket_name, s3_key) self.bucket.upload_file(local_path, s3_key, ExtraArgs={ 'ACL': self.visibility}) def upload_dir(self, local_dir, remote_subdir, password=None): prefix_dir = os.path.join( self.prefix, remote_subdir).replace("\\", "/") repomd_path = os.path.join(local_dir, "repomd.xml") if os.path.exists(repomd_path): logging.info("Signing repomd.xml before upload...") sign_repo(repomd_path, password) for root, _, files in os.walk(local_dir): for f in files: local_path = os.path.join(root, f) rel = os.path.relpath(local_path, local_dir) s3_key = os.path.join(prefix_dir, rel).replace("\\", "/") logging.info("Uploading file: %s -> s3://%s/%s", local_path, self.bucket_name, s3_key) self.bucket.upload_file(local_path, s3_key, ExtraArgs={ 'ACL': self.visibility}) def sign_repo(repomd_path, password=None): if password: command = f""" expect -c ' set timeout 60 spawn gpg --pinentry-mode loopback --force-v3-sigs --verbose --detach-sign --armor {repomd_path} expect -re "Enter passphrase.*" send -- "{password}\r" expect eof lassign [wait] _ _ _ code exit $code ' """ subprocess.run(command, shell=True, check=True) else: logging.info(f"Signing {repomd_path} (interactive passphrase).") subprocess.run( ["gpg", "--detach-sign", "--armor", repomd_path], check=True) def sign_rpm(rpm_path, password=None): if password: command = f""" expect -c ' set timeout 60 spawn rpmsign --addsign {rpm_path} expect -re "Enter passphrase.*" send -- "{password}\r" expect eof lassign [wait] _ _ _ code exit $code ' """ subprocess.run(command, shell=True, check=True) else: logging.info(f"Signing {rpm_path} (interactive passphrase).") subprocess.run(["rpm", "--addsign", rpm_path], check=True) def update_repodata(rpmfiles, options): logging.info("RPM files: %s", rpmfiles) tmpdir = tempfile.mkdtemp() old_repo_dir = os.path.join(tmpdir, "old_repo") new_repo_dir = os.path.join(tmpdir, "new_repo") merged_repo_dir = os.path.join(tmpdir, "merged_repo") os.makedirs(old_repo_dir, exist_ok=True) os.makedirs(new_repo_dir, exist_ok=True) os.makedirs(merged_repo_dir, exist_ok=True) grabber = S3Grabber( bucket_arg=options.bucket, visibility=options.visibility, aws_access_key=options.aws_access_key, aws_secret_key=options.aws_secret_key ) repodata = grabber.repodata_info() if len(repodata) > 0: logging.info("Found existing repodata in S3. Downloading for merge.") grabber.download_repodata(old_repo_dir) else: logging.info("No existing repodata on S3. We'll create a fresh repo.") for rpm in rpmfiles: if options.sign: sign_rpm(rpm, options.sign_pass) basename = os.path.basename(rpm) local_dest = os.path.join(new_repo_dir, basename) logging.info("Copying new RPM: %s -> %s", rpm, local_dest) shutil.copy(rpm, local_dest) logging.info("Running createrepo_c for new RPMs only.") subprocess.run(["createrepo_c", "--checksum", "sha256", new_repo_dir], check=True) repodata_new = os.path.join(new_repo_dir, "repodata") repomd_new = os.path.join(repodata_new, "repomd.xml") if not os.path.exists(repomd_new): logging.error("repomd.xml not found in new repo.") raise FileNotFoundError( "No repomd.xml after createrepo_c for new RPMs.") if len(repodata) > 0: logging.info( "Merging old repo metadata with new repo using mergerepo_c.") cmd_merge = [ "mergerepo_c", f"--repo={old_repo_dir}", f"--repo={new_repo_dir}", "--all", "--omit-baseurl", "-o", merged_repo_dir ] logging.info("Running command: %s", " ".join(cmd_merge)) subprocess.run(cmd_merge, check=True) merged_repodata_dir = os.path.join(merged_repo_dir, "repodata") logging.info("Uploading merged repodata to S3...") grabber.upload_dir(merged_repodata_dir, "repodata", options.sign_pass) else: logging.info("Uploading new repo (no old repo to merge).") grabber.upload_dir(repodata_new, "repodata", options.sign_pass) for rpm in rpmfiles: basename = os.path.basename(rpm) local_dest = os.path.join(new_repo_dir, basename) grabber.upload_file(local_dest, basename) def main(): import argparse parser = argparse.ArgumentParser( description="Keep multiple versions in S3-based repo without renaming, optionally sign RPMs." ) parser.add_argument("-b", "--bucket", required=True, help="S3 bucket/prefix") parser.add_argument("--visibility", default="private", help="S3 ACL") parser.add_argument("--aws-access-key", required=True, help="AWS Access Key ID") parser.add_argument("--aws-secret-key", required=True, help="AWS Secret Access Key") parser.add_argument("--sign", action="store_true", help="Sign RPMs with rpmsign") parser.add_argument("--sign-pass", default=None, help="Passphrase for signing") parser.add_argument("rpmfiles", nargs="+", help="List of RPMs to add") options = parser.parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") update_repodata(options.rpmfiles, options) if __name__ == "__main__": main()