diff --git a/upload-pkg.py b/upload-pkg.py new file mode 100644 index 0000000..f718c80 --- /dev/null +++ b/upload-pkg.py @@ -0,0 +1,123 @@ +from pyalpm import Handle +from sys import argv, exit, stdout +from os.path import exists, basename +from logging import warning, info, basicConfig, INFO +from requests import get, post, put, delete, RequestException, Response +from argparse import ArgumentParser + +default_server = "https://repo-updater.classfun.cn" + + +def FormatSize(size: int) -> str: + units = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'GB'] + unit = units[0] + for unit in units: + if size < 1024: + break + size /= 1024 + return "{:.2f} {}".format(size, unit) + + +def GetReasonText(res: Response, default: str = "Unknown") -> str: + if "text/plain" not in res.headers["Content-Type"]: + return default + length = res.headers["Content-Length"] + if length is None or int(length) >= 128: + return default + return res.text.strip() + + +def UploadFile(url: str, file: str, code: int = 201): + with open(file, "rb") as f: + res = put(url, f.read()) + text = GetReasonText(res) + if res.status_code != code: + raise RequestException( + "upload %s status not %d: %d (%s)" % + (file, code, res.status_code, text) + ) + + +def UploadPackage( + pkg: str, + sign: str = None, + arch: str = None, + server: str = default_server, + handle: Handle = None +): + if sign is None: + sign = pkg + ".sig" + if handle is None: + handle = Handle("/", "/var/lib/pacman") + if not exists(pkg): + raise FileNotFoundError("Target package file %s not found" % pkg) + if not exists(sign): + raise FileNotFoundError("Target signature file %s not found" % pkg) + if pkg + ".sig" != sign: + warning("signature filename mismatch with package file name") + alpm = handle.load_pkg(pkg) + if not alpm: + raise IOError("Open package %s failed" % pkg) + info("package name: %s" % alpm.name) + info("package version: %s" % alpm.version) + info("package architecture: %s" % alpm.arch) + info("package packager: %s" % alpm.packager) + info("package size: %s" % FormatSize(alpm.size)) + info("package installed size: %s" % FormatSize(alpm.isize)) + info("package url: %s" % alpm.url) + name = "%s-%s-%s" % (alpm.name, alpm.version, alpm.arch) + res = get("%s/api/info" % server) + if arch is None: + arch = alpm.arch + if arch == "any": + raise ValueError("Unable to detect target architecture") + if res.status_code != 200: + raise RequestException("status not 200: %d" % res.status_code) + base = res.json() + pkg_ext = next((i for i in base["pkg_exts"] if pkg.endswith(i)), None) + sign_ext = next((i for i in base["sign_exts"] if sign.endswith(i)), None) + if pkg_ext is None: + raise ValueError("Unknown package type") + if sign_ext is None: + raise ValueError("Unknown signature type") + if arch not in base["arch"]: + raise ValueError("Target architecture not found") + pkg_name = name + pkg_ext + sign_name = name + sign_ext + if pkg_name != basename(pkg): + warning("package filename mismatch with metadata") + info("upload as %s" % pkg_name) + try: + UploadFile("%s/%s" % (server, sign_name), sign) + UploadFile("%s/%s" % (server, pkg_name), pkg) + res = post( + "%s/api/update" % server, + json={"arch": arch, "target": pkg_name}, + headers={"Content-Type": "application/json"} + ) + text = GetReasonText(res) + if res.status_code != 200: + raise RequestException( + "update status not 201: %d (%s)" % + (res.status_code, text) + ) + info("upload done") + finally: + delete("%s/%s" % (server, pkg_name)) + delete("%s/%s" % (server, sign_name)) + + +def main(args: list) -> int: + prs = ArgumentParser("Renegade Project Arch Linux Repo Uploader") + prs.add_argument("-a", "--arch", help="Target repo architecture", required=False) + prs.add_argument("-p", "--pkg", help="Package tarball file", required=True) + prs.add_argument("-s", "--sign", help="Package signature file", required=False) + prs.add_argument("-u", "--url", help="Updater Server URL", required=False, default=default_server) + ps = prs.parse_args(args[1:]) + basicConfig(level=INFO, stream=stdout) + UploadPackage(ps.pkg, ps.sign, ps.arch, ps.url) + return 0 + + +if __name__ == '__main__': + exit(main(argv))