import os import pyalpm import logging import shutil import libarchive from logging import getLogger from builder.lib.serializable import SerializableDict from builder.lib.context import ArchBuilderContext from builder.lib.config import ArchBuilderConfigError from builder.lib.subscript import resolve_simple_values log = getLogger(__name__) def log_cb(level, line): if level & pyalpm.LOG_ERROR: ll = logging.ERROR elif level & pyalpm.LOG_WARNING: ll = logging.WARNING else: return log.log(ll, line.strip()) def dl_cb(filename, ev, data): match ev: case 0: log.debug(f"pacman downloading {filename}") case 2: log.warning(f"pacman retry download {filename}") case 3: log.info(f"pacman downloaded {filename}") def progress_cb(target, percent, n, i): if len(target) <= 0 or percent != 0: return log.info(f"processing {target} ({i}/{n})") class PacmanRepoServer(SerializableDict): url: str = None name: str = None mirror: bool = False def __init__( self, name: str = None, url: str = None, mirror: bool = None ): if url is not None: self.url = url if name is not None: self.name = name if mirror is not None: self.mirror = mirror class PacmanRepo(SerializableDict): name: str = None priority: int = 10000 servers: list[PacmanRepoServer] = None def __init__( self, name: str = None, priority: int = None, servers: list[PacmanRepoServer] = None ): if name is not None: self.name = name if priority is not None: self.priority = priority if servers is not None: self.servers = servers else: self.servers = [] def add_server( self, name: str = None, url: str = None, mirror: bool = None ): self.servers.append(PacmanRepoServer( name=name, url=url, mirror=mirror, )) class Pacman: handle: pyalpm.Handle ctx: ArchBuilderContext root: str databases: dict[str: pyalpm.DB] config: dict caches: list[str] repos: list[PacmanRepo] def append_repos(self, lines: list[str]): """ Add all databases into config """ for repo in self.repos: lines.append(f"[{repo.name}]\n") for server in repo.servers: if server.mirror: lines.append(f"# Mirror {server.name}\n") log.debug(f"use mirror {server.name} url {server.url}") else: lines.append("# Original Repo\n") log.debug(f"use original repo url {server.url}") lines.append(f"Server = {server.url}\n") def append_config(self, lines: list[str]): """ Add basic pacman config for host """ siglevel = ("Required DatabaseOptional" if self.ctx.gpgcheck else "Never") lines.append("[options]\n") for cache in self.caches: lines.append(f"CacheDir = {cache}\n") lines.append(f"RootDir = {self.root}\n") lines.append(f"GPGDir = {self.handle.gpgdir}\n") lines.append(f"LogFile = {self.handle.logfile}\n") lines.append("HoldPkg = pacman glibc\n") lines.append(f"Architecture = {self.ctx.tgt_arch}\n") lines.append("UseSyslog\n") lines.append("Color\n") lines.append("CheckSpace\n") lines.append("VerbosePkgLists\n") lines.append("ParallelDownloads = 5\n") lines.append(f"SigLevel = {siglevel}\n") lines.append("LocalFileSigLevel = Optional\n") self.append_repos(lines) def init_keyring(self): """ Initialize pacman keyring """ path = os.path.join(self.ctx.work, "rootfs") keyring = os.path.join(path, "etc/pacman.d/gnupg") if not self.ctx.gpgcheck: return if os.path.exists(os.path.join(keyring, "trustdb.gpg")): log.debug("skip initialize pacman keyring when exists") return log.info("initializing pacman keyring") self.pacman_key(["--init"]) def init_config(self): """ Create host pacman.conf """ config = os.path.join(self.ctx.work, "pacman.conf") if os.path.exists(config): os.remove(config) log.info(f"generate pacman config {config}") lines = [] self.append_config(lines) log.debug("config content: %s", "\t".join(lines).strip()) log.debug(f"writing {config}") with open(config, "w") as f: f.writelines(lines) def pacman_key(self, args: list[str]): """ Call pacman-key for rootfs """ if not self.ctx.gpgcheck: raise RuntimeError("GPG check disabled") keyring = os.path.join(self.root, "etc/pacman.d/gnupg") config = os.path.join(self.ctx.work, "pacman.conf") cmds = ["pacman-key"] cmds.append(f"--gpgdir={keyring}") cmds.append(f"--config={config}") cmds.extend(args) ret = self.ctx.run_external(cmds) if ret != 0: raise OSError(f"pacman-key failed with {ret}") def pacman(self, args: list[str]): """ Call pacman for rootfs """ config = os.path.join(self.ctx.work, "pacman.conf") cmds = ["pacman"] cmds.append("--noconfirm") cmds.append(f"--root={self.root}") cmds.append(f"--config={config}") cmds.extend(args) ret = self.ctx.run_external(cmds) if ret != 0: raise OSError(f"pacman failed with {ret}") def load_databases(self): """ Add all databases and load them """ for mirror in self.repos: # register database if mirror.name not in self.databases: self.databases[mirror.name] = self.handle.register_syncdb( mirror.name, pyalpm.SIG_DATABASE_MARGINAL_OK ) db = self.databases[mirror.name] # add databases servers servers: list[str] = [] for server in mirror.servers: servers.append(server.url) db.servers = servers # update database now via pyalpm log.info(f"updating database {mirror.name}") db.update(False) self.init_config() self.refresh() def lookup_package(self, name: str) -> list[pyalpm.Package]: """ Lookup pyalpm package by name """ # pass a filename, load it directly if ".pkg.tar." in name: pkg = self.handle.load_pkg(name) if pkg is None: raise RuntimeError(f"load package {name} failed") return [pkg] s = name.split("/") if len(s) == 2: # use DATABASE/PACKAGE, find it in database if s[0] not in self.databases and s[0] != "local": raise ValueError(f"database {s[0]} not found") db = (self.handle.get_localdb() if s[0] == "local" else self.databases[s[0]]) pkg = db.get_pkg(s[1]) if pkg: return [pkg] raise ValueError(f"package {s[1]} not found") elif len(s) == 1: # use PACKAGE, find it in all databases or find as group # try find it as group pkg = pyalpm.find_grp_pkgs(self.databases.values(), name) if len(pkg) > 0: return pkg # try find it as package for dbn in self.databases: db = self.databases[dbn] pkg = db.get_pkg(name) if pkg: return [pkg] raise ValueError(f"package {name} not found") raise ValueError(f"bad package name {name}") def init_cache(self): """ Initialize pacman cache folder """ host_cache = "/var/cache/pacman/pkg" # host cache work_cache = os.path.join(self.ctx.work, "packages") # workspace cache root_cache = os.path.join(self.root, "var/cache/pacman/pkg") # rootfs cache self.caches.clear() # host cache is existing, use host cache folder if os.path.exists(host_cache): self.caches.append(host_cache) self.caches.append(work_cache) self.caches.append(root_cache) os.makedirs(work_cache, mode=0o0755, exist_ok=True) os.makedirs(root_cache, mode=0o0755, exist_ok=True) def add_repo(self, repo: PacmanRepo): if not repo or not repo.name or len(repo.servers) <= 0: raise ArchBuilderConfigError("bad repo") self.repos.append(repo) self.repos.sort(key=lambda r: r.priority) def init_repos(self): """ Initialize mirrors """ if "repo" not in self.config: raise ArchBuilderConfigError("no repos found in config") mirrors = self.ctx.get("mirrors", []) for repo in self.config["repo"]: if "name" not in repo: raise ArchBuilderConfigError("repo name not set") # never add local into database if repo["name"] == "local" or "/" in repo["name"]: raise ArchBuilderConfigError("bad repo name") # create pacman repo instance pacman_repo = PacmanRepo(name=repo["name"]) if "priority" in repo: pacman_repo.priority = repo["priority"] originals: list[str] = [] servers: list[str] = [] # add all original repo url if "server" in repo: servers.append(repo["server"]) if "servers" in repo: servers.extend(repo["server"]) if len(servers) <= 0: raise ArchBuilderConfigError("no any original repo url found") # resolve original repo url for server in servers: originals.append(resolve_simple_values(server, { "arch": self.ctx.tgt_arch, "repo": repo["name"], })) # add repo mirror url for mirror in mirrors: if "name" not in mirror: raise ArchBuilderConfigError("mirror name not set") if "repos" not in mirror: raise ArchBuilderConfigError("repos list not set") for repo in mirror["repos"]: if "original" not in repo: raise ArchBuilderConfigError("original url not set") if "mirror" not in repo: raise ArchBuilderConfigError("mirror url not set") for original in originals: if original.startswith(repo["original"]): path = original[len(repo["original"]):] real_url = repo["mirror"] + path pacman_repo.add_server( name=mirror["name"], url=real_url, mirror=True, ) # add original url for original in originals: pacman_repo.add_server( url=original, mirror=False ) self.add_repo(pacman_repo) def __init__(self, ctx: ArchBuilderContext): """ Initialize pacman context """ self.ctx = ctx if "pacman" not in ctx.config: raise ArchBuilderConfigError("no pacman found in config") self.config = ctx.config["pacman"] self.root = ctx.get_rootfs() db = os.path.join(self.root, "var/lib/pacman") self.handle = pyalpm.Handle(self.root, db) self.handle.arch = ctx.tgt_arch self.handle.logfile = os.path.join(self.ctx.work, "pacman.log") self.handle.gpgdir = os.path.join(self.root, "etc/pacman.d/gnupg") self.handle.logcb = log_cb self.handle.dlcb = dl_cb self.handle.progresscb = progress_cb self.databases = {} self.caches = [] self.repos = [] self.init_cache() self.init_repos() for cache in self.caches: self.handle.add_cachedir(cache) self.init_config() def uninstall(self, pkgs: list[str]): """ Uninstall packages via pacman """ if len(pkgs) == 0: return ps = " ".join(pkgs) log.info(f"removing packages {ps}") args = ["--needed", "--remove"] args.extend(pkgs) self.pacman(args) def install( self, pkgs: list[str], /, force: bool = False, asdeps: bool = False, nodeps: bool = False, ): """ Install packages via pacman """ if len(pkgs) == 0: return core_db = "var/lib/pacman/sync/core.db" if not os.path.exists(os.path.join(self.root, core_db)): self.refresh() ps = " ".join(pkgs) log.info(f"installing packages {ps}") args = ["--sync"] if not force: args.append("--needed") if asdeps: args.append("--asdeps") if nodeps: args.extend(["--nodeps", "--nodeps"]) args.extend(pkgs) self.pacman(args) def download(self, pkgs: list[str]): """ Download packages via pacman """ if len(pkgs) == 0: return core_db = "var/lib/pacman/sync/core.db" if not os.path.exists(os.path.join(self.root, core_db)): self.refresh() log.info("downloading packages %s", " ".join(pkgs)) args = ["--sync", "--downloadonly", "--nodeps", "--nodeps"] args.extend(pkgs) self.pacman(args) def install_local(self, files: list[str]): """ Install a local packages via pacman """ if len(files) == 0: return log.info("installing local packages %s", " ".join(files)) args = ["--needed", "--upgrade"] args.extend(files) self.pacman(args) def refresh(self, /, force: bool = False): """ Update local databases via pacman """ log.info("refresh pacman database") args = ["--sync", "--refresh"] if force: args.append("--refresh") self.pacman(args) def recv_keys(self, keys: str | list[str]): """ Receive a key via pacman-key """ args = ["--recv-keys"] if type(keys) is str: args.append(keys) elif type(keys) is list: if len(keys) <= 0: return args.extend(keys) else: raise TypeError("bad keys type") self.pacman_key(args) def lsign_key(self, key: str): """ Local sign a key via pacman-key """ self.pacman_key(["--lsign-key", key]) def pouplate_keys( self, names: str | list[str] = None, folder: str = None ): """ Populate all keys via pacman-key """ args = ["--populate"] if folder: args.extend(["--populate-from", folder]) if names is None: pass elif type(names) is str: args.append(names) elif type(names) is list: args.extend(names) else: raise TypeError("bad names type") self.pacman_key(args) def find_package_file(self, pkg: pyalpm.Package) -> str | None: """ Find out pacman package archive file in cache """ for cache in self.caches: p = os.path.join(cache, pkg.filename) if os.path.exists(p): return p return None def trust_keyring_pkg(self, pkg: pyalpm.Package): """ Trust a keyring package from file without install it """ if not self.ctx.gpgcheck: return names: list[str] = [] target = os.path.join(self.ctx.work, "keyrings") keyring = "usr/share/pacman/keyrings/" # find out file path path = self.find_package_file(pkg) # cleanup keyring extract folder if os.path.exists(target): shutil.rmtree(target) os.makedirs(target, mode=0o0755) if path is None: raise RuntimeError( f"package {pkg.name} not found" ) # open keyring package to extract log.debug(f"processing keyring package {pkg.name}") with libarchive.file_reader(path) as archive: for file in archive: pn: str = file.pathname if not pn.startswith(keyring): continue # get the filename of file fn = pn[len(keyring):] if len(fn) <= 0: continue # add keyring name to populate if fn.endswith(".gpg"): names.append(fn[:-4]) # extract file dest = os.path.join(target, fn) log.debug(f"extracting {pn} to {dest}") with open(dest, "wb") as f: for block in file.get_blocks(file.size): f.write(block) fd = f.fileno() os.fchmod(fd, file.mode) os.fchown(fd, file.uid, file.gid) # trust extracted keyring self.pouplate_keys(names, target) def add_trust_keyring_pkg(self, pkgnames: list[str]): """ Trust a keyring package from file without install it """ if not self.ctx.gpgcheck: return if len(pkgnames) <= 0: return self.download(pkgnames) for pkgname in pkgnames: pkgs = self.lookup_package(pkgname) for pkg in pkgs: self.trust_keyring_pkg(pkg)