from pyalpm import Handle from sys import argv, exit, stdout from os.path import exists, basename from logging import warning, info, basicConfig, INFO from requests import get, post, put, delete, RequestException, Response from argparse import ArgumentParser default_server = "https://repo-updater.classfun.cn" def FormatSize(size: int) -> str: units = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'GB'] unit = units[0] for unit in units: if size < 1024: break size /= 1024 return "{:.2f} {}".format(size, unit) def GetReasonText(res: Response, default: str = "Unknown") -> str: if "text/plain" not in res.headers["Content-Type"]: return default length = res.headers["Content-Length"] if length is None or int(length) >= 128: return default return res.text.strip() def UploadFile(url: str, file: str, code: int = 201): with open(file, "rb") as f: res = put(url, f.read()) text = GetReasonText(res) if res.status_code != code: raise RequestException( "upload %s status not %d: %d (%s)" % (file, code, res.status_code, text) ) def UploadPackage( pkg: str, sign: str = None, arch: str = None, server: str = default_server, handle: Handle = None ): if sign is None: sign = pkg + ".sig" if handle is None: handle = Handle("/", "/var/lib/pacman") if not exists(pkg): raise FileNotFoundError("Target package file %s not found" % pkg) if not exists(sign): raise FileNotFoundError("Target signature file %s not found" % pkg) if pkg + ".sig" != sign: warning("signature filename mismatch with package file name") alpm = handle.load_pkg(pkg) if not alpm: raise IOError("Open package %s failed" % pkg) info("package name: %s" % alpm.name) info("package version: %s" % alpm.version) info("package architecture: %s" % alpm.arch) info("package packager: %s" % alpm.packager) info("package size: %s" % FormatSize(alpm.size)) info("package installed size: %s" % FormatSize(alpm.isize)) info("package url: %s" % alpm.url) name = "%s-%s-%s" % (alpm.name, alpm.version, alpm.arch) res = get("%s/api/info" % server) if arch is None: arch = alpm.arch if arch == "any": raise ValueError("Unable to detect target architecture") if res.status_code != 200: raise RequestException("status not 200: %d" % res.status_code) base = res.json() pkg_ext = next((i for i in base["pkg_exts"] if pkg.endswith(i)), None) sign_ext = next((i for i in base["sign_exts"] if sign.endswith(i)), None) if pkg_ext is None: raise ValueError("Unknown package type") if sign_ext is None: raise ValueError("Unknown signature type") if arch not in base["arch"]: raise ValueError("Target architecture not found") pkg_name = name + pkg_ext sign_name = name + sign_ext if pkg_name != basename(pkg): warning("package filename mismatch with metadata") info("upload as %s" % pkg_name) try: UploadFile("%s/%s" % (server, sign_name), sign) UploadFile("%s/%s" % (server, pkg_name), pkg) res = post( "%s/api/update" % server, json={"arch": arch, "target": pkg_name}, headers={"Content-Type": "application/json"} ) text = GetReasonText(res) if res.status_code != 200: raise RequestException( "update status not 201: %d (%s)" % (res.status_code, text) ) info("upload done") finally: delete("%s/%s" % (server, pkg_name)) delete("%s/%s" % (server, sign_name)) def main(args: list) -> int: prs = ArgumentParser("Renegade Project Arch Linux Repo Uploader") prs.add_argument("-a", "--arch", help="Target repo architecture", required=False) prs.add_argument("-p", "--pkg", help="Package tarball file", required=True) prs.add_argument("-s", "--sign", help="Package signature file", required=False) prs.add_argument("-u", "--url", help="Updater Server URL", required=False, default=default_server) ps = prs.parse_args(args[1:]) basicConfig(level=INFO, stream=stdout) UploadPackage(ps.pkg, ps.sign, ps.arch, ps.url) return 0 if __name__ == '__main__': exit(main(argv))