add more comments

Signed-off-by: BigfootACA <>
This commit is contained in:
BigfootACA 2024-05-20 09:56:42 +08:00
parent 970a0a5cde
commit 067ee2d341
30 changed files with 828 additions and 36 deletions

View File

@ -6,41 +6,75 @@ log = getLogger(__name__)
def write_fstab(ctx: ArchBuilderContext):
Generate fstab and write to rootfs
"generate fstab:\n\t%s",
# WORKSPACE/TARGET/rootfs/etc/fstab
path = os.path.join(ctx.get_rootfs(), "etc/fstab")
with open_config(path) as f:
def mount_all(ctx: ArchBuilderContext):
Mount all filesystems in fstab for build
path = ctx.get_mount()
root = ctx.get_rootfs()
# ensure WORKSPACE/TARGET/mount is existing
if not os.path.exists(path):
os.mkdir(path, mode=0o0755)
# the first item must be ROOT (sorted by ctx.fstab.resort())
if ctx.fstab[0].target != "/":
raise RuntimeError("no root to mount")
for mnt in ctx.fstab:
# do not change original item
m = mnt.clone()
# skip virtual source device
if m.source == "none": continue
# we should mount virtual device
# original: /dev/mmcblk0p1, PARTLABEL=linux
# we need: /dev/loop0, /dev/loop1
# see
if m.source not in ctx.fsmap:
raise RuntimeError(f"source {m.source} cannot map to host")
m.source = ctx.fsmap[m.source]
if == "/": in_mnt, in_root = path, root
if == "/":
# process ROOT resolve unneeded
in_mnt, in_root = path, root
# resolve to ROOT and MOUNT
# /boot
# in_mnt: WORKSPACE/TARGET/mount/boot
# in_root: WORKSPACE/TARGET/rootfs/boot
folder =[1:]
in_mnt = os.path.join(path, folder)
in_root = os.path.join(root, folder)
elif m.fstype == "swap" or == "none": continue
elif m.fstype == "swap" or == "none":
# skip mount virtual fs and swap
else: raise RuntimeError(f"target {} cannot map to host")
if in_mnt:
# ensure mount target is exists = in_mnt
if not os.path.exists(in_mnt):
os.makedirs(in_mnt, mode=0o0755)
if in_root and not os.path.exists(in_root):
# ensure the folder is also exists in rootfs
os.makedirs(in_root, mode=0o0755)
# invoke real mount
ctx.mounted.insert(0, m)

View File

@ -27,6 +27,9 @@ def get_prop(
path: bool = False,
multi: bool = False,
) -> str | None:
Get a config value for grub
value = ctx.get(f"kernel.{name}", None)
if name in cfg: value = cfg[name]
if value is None: return None
@ -34,6 +37,7 @@ def get_prop(
value = [value]
if len(value) == 0: return None
if path:
# must starts with /
for i in range(len(value)):
if not value[i].startswith("/"):
value[i] = "/" + value[i]
@ -43,6 +47,9 @@ def get_prop(
def fstype_to_mod(name: str) -> str:
Map filesystem type to GRUB2 modules name
match name:
case "ext3": return "ext2"
case "ext4": return "ext2"
@ -51,38 +58,72 @@ def fstype_to_mod(name: str) -> str:
case "fat16": return "fat"
case "fat32": return "fat"
case "msdos": return "fat"
# TODO: add more filesystems
case _: return name
def gen_menuentry(ctx: ArchBuilderContext, cfg: dict) -> str:
Generate a menuentry config for grub
ret = ""
# menuentry name (default to Linux)
name = cfg["name"] if "name" in cfg else "Linux"
# kernel image path
kernel = get_prop(ctx, "kernel", cfg, True)
# initramfs image path (supports multiples)
initramfs = get_prop(ctx, "initramfs", cfg, True, True)
# device tree blob path (supports multiples)
devicetree = get_prop(ctx, "devicetree", cfg, True, True)
# kernel command line
cmdline = get_prop(ctx, "cmdline", cfg, False, True)
# the folder to place these files
path = get_prop(ctx, "path", cfg, False, False)
if kernel is None: raise ArchBuilderConfigError("no kernel for grub")
if cmdline is None: cmdline = ""
ret += f"menuentry '{name}' {{\n"
# if path set: load filesystem module and search to set root
if path:
# find out the mount point in fstab
fs = ctx.fstab.find_target(path)
if fs is None or len(fs) == 0 or fs[0] is None:
raise ArchBuilderConfigError(f"mountpoint {path} not found")
dev = fs[0].source
# map to virtual file system
if dev in ctx.fsmap: dev = ctx.fsmap[dev]
# get the filesystem UUID to search
uuid = blkid.get_tag_value(None, "UUID", dev)
if uuid is None: raise RuntimeError(f"cannot detect uuid for {path}")
# load filesystem module and search target
ret += "\tinsmod %s\n" % fstype_to_mod(fs[0].fstype)
ret += f"\tsearch --no-floppy --fs-uuid --set=root {uuid}\n"
# add device tree blob field
if devicetree:
ret += "\techo 'Loading Device Tree...'\n"
ret += f"\tdevicetree {devicetree}\n"
# add kernel path field and kernel command line
ret += "\techo 'Loading Kernel...'\n"
ret += f"\tlinux {kernel} {cmdline}\n"
# add initramfs field
if initramfs:
ret += "\techo 'Loading Initramfs...'\n"
ret += f"\tinitrd {initramfs}\n"
# boot into linux (not add 'boot' command, its imply by menuentry)
ret += "\techo 'Booting...'\n"
ret += f"}}\n"
return ret
@ -90,18 +131,26 @@ def gen_menuentry(ctx: ArchBuilderContext, cfg: dict) -> str:
def gen_basic(ctx: ArchBuilderContext) -> str:
ret = ""
# load generic modules
ret += "insmod part_gpt\n"
ret += "insmod part_msdos\n"
ret += "insmod all_video\n"
# setup console and serial
ret += "terminal_input console\n"
ret += "terminal_output console\n"
ret += "if serial --unit=0 --speed=115200; then\n"
ret += "\tterminal_input --append console\n"
ret += "\tterminal_output --append console\n"
ret += "fi\n"
# set grub timeout seconds
ret += "set timeout_style=menu\n"
timeout = ctx.get("bootloader.timeout", 5)
ret += f"set timeout={timeout}\n"
# find out the default entry
default = 0
items = ctx.get("bootloader.items", [])
for idx in range(len(items)):
@ -109,10 +158,14 @@ def gen_basic(ctx: ArchBuilderContext) -> str:
if "default" in item and item["default"]:
default = idx
ret += f"set default={default}\n"
return ret
def mkconfig(ctx: ArchBuilderContext) -> str:
Generate a full grub config for current rootfs
ret = ""
ret += gen_basic(ctx)
for item in ctx.get("bootloader.items", []):
@ -121,15 +174,23 @@ def mkconfig(ctx: ArchBuilderContext) -> str:
def proc_targets(ctx: ArchBuilderContext, install: str):
Copy grub target folder directly
copies = [".mod", ".lst"]
folder = os.path.join(ctx.get_rootfs(), "usr/lib/grub")
for target in ctx.get("grub.targets", []):
# target name format: i386-pc, arm64-efi
if "/" in target: raise ArchBuilderConfigError(f"bad target {target}")
base = os.path.join(folder, target)
# at least we need linux.mod
if not os.path.exists(os.path.join(base, "linux.mod")):
raise ArchBuilderConfigError(f"target {target} not found")
dest = os.path.join(install, target)
os.makedirs(dest, mode=0o0755, exist_ok=True)
# copy grub target
for file in os.listdir(base):
if not any((file.endswith(name) for name in copies)):
@ -141,6 +202,9 @@ def proc_targets(ctx: ArchBuilderContext, install: str):
def proc_config(ctx: ArchBuilderContext, install: str):
Generate a full grub config for current rootfs and write to install folder
content = mkconfig(ctx)
cfg = os.path.join(install, "grub.cfg")
with open(cfg, "w") as f:
@ -149,6 +213,9 @@ def proc_config(ctx: ArchBuilderContext, install: str):
def efi_arch_name(target: str) -> str:
Map grub target name to UEFI arch name
match target:
case "arm64-efi": return "aa64"
case "x86_64-efi": return "x64"
@ -161,50 +228,97 @@ def efi_arch_name(target: str) -> str:
def efi_boot_name(target: str) -> str:
Map grub target name to UEFI default boot file name
name = efi_arch_name(target)
return f"boot{name}.efi"
def proc_mkimage_efi(ctx: ArchBuilderContext, target: str):
Create GRUB EFI image for boot
cmds = ["grub-mkimage"]
root = ctx.get_rootfs()
# allowed esp folders
efi_folders = ["/boot", "/boot/efi", "/efi", "/esp"]
# grub2 source folder in rootfs (WORKSPACE/TARGET/rootfs/usr/lib/grub/x86_64-efi)
base = os.path.join(root, "usr/lib/grub", target)
# install path in rootfs (/boot/grub)
install = ctx.get("grub.path", "/boot/grub")
# why this function called proc_mkimage_efi?
if not target.endswith("-efi"):
raise RuntimeError("mkimage efi only for *-efi")
esp: MountPoint | None = None
grub: MountPoint | None = None
fdir = install + "/"
# must ends with /
fdir = os.path.realpath(install) + "/"
# find out requires mount point
esp: MountPoint | None = None # UEFI system partition
grub: MountPoint | None = None # GRUB install folder
for mnt in ctx.fstab:
# esp must be fat
if fstype_to_mod(mnt.fstype) == "fat":
if in efi_folders:
esp = mnt
# add an end slash to avoid same prefix (likes /boot /bootfs)
tdir =
if not tdir.endswith("/"): tdir += "/"
# grub install folder
if fdir.startswith(tdir):
# find out the deepest mount point
# to avoid / but installed in /boot
if (not grub) or mnt.level >= grub.level:
grub = mnt
if esp is None: raise RuntimeError("efi partiton not found")
if esp is None: raise RuntimeError("efi partition not found")
if grub is None: raise RuntimeError("grub install folder not found")
esp_dest =
if esp_dest.startswith("/"): esp_dest = esp_dest[1:]
if not install.startswith("/"): install = "/" + install
# grub install target folder (/boot/grub)
if not install.startswith("/"):
install = "/" + install
# must in grub install folder
if not install.startswith(
raise RuntimeError("grub install prefix not found")
# get grub install path in target partition
# Mount GRUB Install Prefix
# /boot /boot/grub /grub
# / /boot/grub /boot/grub
prefix = install[len(]
if not prefix.startswith("/"): prefix = "/" + prefix
# get UUID of grub installed filesystem UUID
device = (ctx.fsmap[grub.source] if grub.source in ctx.fsmap else grub.source)
uuid = blkid.get_tag_value(None, "UUID", device)
if not uuid: raise RuntimeError(
"failed to detect uuid for grub install path"
# esp install target folder (boot/efi)
esp_dest =
if esp_dest.startswith("/"):
esp_dest = esp_dest[1:]
# esp install target folder in rootfs (WORKSPACE/TARGET/rootfs/boot/efi)
efi_folder = os.path.join(root, esp_dest)
# grub install target folder in rootfs (WORKSPACE/TARGET/rootfs/boot/grub)
grub_folder = os.path.join(root, install[1:])
# put builtin config into grub install folder
builtin = os.path.join(grub_folder, "grub.builtin.cfg")
with open(builtin, "w") as f:
f.write(f"search --no-floppy --fs-uuid --set=root {uuid}\n")
@ -213,62 +327,98 @@ def proc_mkimage_efi(ctx: ArchBuilderContext, target: str):
f.write("echo \"Failed to switch into normal mode\"\n")
f.write("sleep 5\n")
# efi boot image install folder (WORKSPACE/TARGET/rootfs/boot/efi/efi/boot)
efi = os.path.join(efi_folder, "efi/boot")
os.makedirs(efi, mode=0o0755, exist_ok=True)
# efi boot image (WORKSPACE/TARGET/rootfs/boot/efi/efi/boot/bootx64.efi)
out = os.path.join(efi, efi_boot_name(target))
if os.path.exists(out): os.remove(out)
# run grub-mkimage
ret = ctx.run_external(cmds)
if ret != 0: raise OSError("grub-mkimage failed")"generated grub {target} efi image {out}")
def proc_bootsec(ctx: ArchBuilderContext, target: str):
Install boot sector for x86 PC for grub-install
mods = []
cmds = ["grub-install"]
if target != "i386-pc":
raise RuntimeError("bootsec only for i386-pc")
mount = ctx.get_mount()
root = ctx.get_rootfs()
# get grub install base folder (boot)
install: str = ctx.get("grub.path", "/boot/grub")
if install.startswith("/"): install = install[1:]
if install.endswith("/grub"): install = install[:-5]
grub = os.path.join(root, "usr/lib/grub", target)
if install.endswith("/grub"): install = install[0:-5]
rootfs = ctx.fstab.find_target("/")
# grub install base folder in mount (WORKSPACE/TARGET/mount/boot)
mnt_install = os.path.join(mount, install)
# find out mount point of rootfs
rootfs = ctx.fstab.find_target("/")
if rootfs is None or len(rootfs) <= 0 or rootfs[0] is None:
raise RuntimeError("rootfs mount point not found")
rootfs = rootfs[0]
# add filesystem module for rootfs
if len(mods) > 0:
cmds.append("--modules=" + (" ".join(mods)))
# detect grub boot sector install device
device = ctx.get("grub.device", None)
if device is None:
source = rootfs.source
if source in ctx.fsmap:
source = ctx.fsmap[source]
# loop setup by builder.disk.image.ImageBuilder
if not source.startswith("/dev/loop"):
raise RuntimeError("no device to detect grub install")
# loop offset partition setup by
if loop_get_offset(source) <= 0:
raise RuntimeError("no loop part to detect grub install")
# backing as parent disk
device = loop_get_backing(source)
if device is None:
raise RuntimeError("no device for grub install")
# run grub-install
ret = ctx.run_external(cmds)
if ret != 0: raise OSError("grub-install failed")
# copy grub installed target from mount to rootfs
src = os.path.join(mnt_install, "grub")
dst = os.path.join(root, install, "grub")
shutil.copytree(src, dst, dirs_exist_ok=True)
def proc_install(ctx: ArchBuilderContext):
Process GRUB targets install
targets: list[str] = ctx.get("grub.targets", [])
for target in targets:
if target == "i386-pc":
@ -281,12 +431,18 @@ def proc_install(ctx: ArchBuilderContext):
def proc_grub(ctx: ArchBuilderContext):
Install GRUB bootloader
root = ctx.get_rootfs()
# get grub install folder in rootfs (WORKSPACE/TARGET/rootfs/boot/grub)
install: str = ctx.get("grub.path", "/boot/grub")
if install.startswith("/"):
install = install[1:]
install = os.path.join(root, install)
os.makedirs(install, mode=0o0755, exist_ok=True)
proc_config(ctx, install)
proc_targets(ctx, install)

View File

@ -8,49 +8,77 @@ log = getLogger(__name__)
def reset_locale(ctx: ArchBuilderContext):
Remove old locale settings
root = ctx.get_rootfs()
archive = os.path.join(root, "usr/lib/locale/locale-archive")
if os.path.exists(archive): os.remove(archive)
def enable_all(ctx: ArchBuilderContext):
Add all enabled locale for build locale-archive
root = ctx.get_rootfs()
# default none
locales = ctx.get("locale.enable", [])"setup enabled locale")
# create locale.gen
file = os.path.join(root, "etc/locale.gen")
with open_config(file) as f:
for line in locales:
log.debug(f"adding locale {line}")
if len(locales) == 0:
f.write("# No any locales enabled\n")
# run locale-gen
filesystem.chroot_run(ctx, "locale-gen")
def set_default(ctx: ArchBuilderContext):
Setup default locale
root = ctx.get_rootfs()
default = ctx.get("locale.default", None)
if default is None: default = "C"
# default to C
default = ctx.get("locale.default", "C")"default locale: {default}")
# default locale config (see man:locale.conf(5))
conf = os.path.join(root, "etc/locale.conf")
with open_config(conf) as f:
def set_timezone(ctx: ArchBuilderContext):
Setup tzdata timezone info
root = ctx.get_rootfs()
timezone = ctx.get("timezone", None)
if timezone is None: timezone = "UTC"
# default to UTC
timezone = ctx.get("timezone", "UTC")"timezone: {timezone}")
# tzdata install path
dst = os.path.join("/usr/share/zoneinfo", timezone)
real = os.path.join(root, dst[1:])
if not os.path.exists(real): raise ArchBuilderConfigError(
f"timezone {timezone} not found"
# localtime symbolic link (see man:tzset(3))
lnk = os.path.join(root, "etc/localtime")
if os.path.exists(lnk): os.remove(lnk)
os.symlink(dst, lnk)
# timezone file
conf = os.path.join(root, "etc/timezone")
with open(conf, "w") as f:
@ -58,6 +86,9 @@ def set_timezone(ctx: ArchBuilderContext):
def proc_locale(ctx: ArchBuilderContext):
Setup user locale settings

View File

@ -17,22 +17,37 @@ def add_values(ctx: ArchBuilderContext, key: str, arr: list[str]):
def gen_config(ctx: ArchBuilderContext):
Generate mkinitcpio.conf
modules: list[str] = []
binaries: list[str] = []
files: list[str] = []
hooks: list[str] = []
# add default hooks
# add microcode if x86_64 (amd-ucode, intel-ucode)
if ctx.cur_arch in ["x86_64", "i386"]:
# do not add keymap by default
if ctx.get("mkinitcpio.hooks.keymap", False):
hooks.extend(["kms", "keyboard", "keymap", "consolefont"])
hooks.extend(["block", "filesystems", "fsck"])
hooks.extend(["kms", "keymap", "consolefont"])
hooks.extend(["keyboard", "block", "filesystems", "fsck"])
# add others values
add_values(ctx, "mkinitcpio.modules", modules)
add_values(ctx, "mkinitcpio.binaries", binaries)
add_values(ctx, "mkinitcpio.files", files)
# write mkinitcpio.conf to rootfs
root = ctx.get_rootfs()
cfg = os.path.join(root, "etc/mkinitcpio.conf")
with open_config(cfg) as f:
@ -40,27 +55,47 @@ def gen_config(ctx: ArchBuilderContext):
f.write("BINARIES=(%s)\n" % (" ".join(binaries)))
f.write("FILES=(%s)\n" % (" ".join(files)))
f.write("HOOKS=(%s)\n" % (" ".join(hooks)))
# TODO: add more options
def recreate_initrd(ctx: ArchBuilderContext, path: str):
Really run mkinitcpio
chroot_run(ctx, ["mkinitcpio", "-p", path])
# do not check return value of mkinitcpio
def recreate_initrd_no_autodetect(ctx: ArchBuilderContext, path: str):
Create a full initramfs without autodetect
In build stage, mkinitcpio can not find out needs modules, it will cause unbootable.
tmp = os.path.join(ctx.get_rootfs(), "tmp")
with NamedTemporaryFile("w", dir=tmp) as temp:
# copy original preset
with open(path, "r") as f:
# skip autodetect
temp.write("\ndefault_options=\"-S autodetect\"\n")
# run mkinitcpio (with path in rootfs)
path = os.path.join("/tmp", os.path.basename(
recreate_initrd(ctx, path)
def recreate_initrds(ctx: ArchBuilderContext):
Regenerate all initramfs
root = ctx.get_rootfs()
no_autodetect = ctx.get("mkinitcpio.no_autodetect", True)
folder = os.path.join(root, "etc/mkinitcpio.d")
# scan all initramfs preset and regenerate them
for preset in os.listdir(folder):
if not preset.endswith(".preset"): continue
path = os.path.join(folder, preset)
@ -69,5 +104,8 @@ def recreate_initrds(ctx: ArchBuilderContext):
def proc_mkinitcpio(ctx: ArchBuilderContext):
Process mkinitcpio options

View File

@ -82,9 +82,11 @@ def init_mount(ctx: ArchBuilderContext):
real = os.path.realpath(os.path.join(root, target))
do_mount(ctx, source, real, fstype, options)
# ensure mount point is clean
mnts = MountTab.parse_mounts()
if any(mnts.find_folder(
raise RuntimeError("mount points not cleanup")
root_mount("proc", "proc", "proc", "nosuid,noexec,nodev")
root_mount("sys", "sys", "sysfs", "nosuid,noexec,nodev,ro")
root_mount("dev", "dev", "devtmpfs", "mode=0755,nosuid")
@ -92,6 +94,8 @@ def init_mount(ctx: ArchBuilderContext):
root_mount("shm", "dev/shm", "tmpfs", "mode=1777,nosuid,nodev")
root_mount("run", "run", "tmpfs", "nosuid,nodev,mode=0755")
root_mount("tmp", "tmp", "tmpfs", "mode=1777,strictatime,nodev,nosuid")
# symbolic links for some script tools (e.g. mkinitcpio)
symlink("/proc/self/fd", "dev", "fd")
symlink("/proc/self/fd/0", "dev", "stdin")
symlink("/proc/self/fd/1", "dev", "stdout")

View File

@ -7,6 +7,9 @@ log = getLogger(__name__)
def gen_machine_info(ctx: ArchBuilderContext):
Generate /etc/machine-info for systemd
root = ctx.get_rootfs()
file = os.path.join(root, "etc/machine-info")
cfg = ctx.get("sysconf")
@ -22,6 +25,9 @@ def gen_machine_info(ctx: ArchBuilderContext):
def gen_hosts(ctx: ArchBuilderContext):
Generate /etc/hosts
addrs: list[str] = []
root = ctx.get_rootfs()
file = os.path.join(root, "etc/hosts")
@ -33,6 +39,8 @@ def gen_hosts(ctx: ArchBuilderContext):
# not set, add for FQDN
name = ctx.get("sysconf.hostname")
if "" not in addrs and name:
f.write(f" {name}\n")
@ -40,6 +48,9 @@ def gen_hosts(ctx: ArchBuilderContext):
def gen_hostname(ctx: ArchBuilderContext):
Generate /etc/hostname
root = ctx.get_rootfs()
file = os.path.join(root, "etc/hostname")
name = ctx.get("sysconf.hostname")
@ -51,6 +62,9 @@ def gen_hostname(ctx: ArchBuilderContext):
def gen_environments(ctx: ArchBuilderContext):
Generate /etc/environments
root = ctx.get_rootfs()
file = os.path.join(root, "etc/environment")
envs: dict[str] = ctx.get("sysconf.environments", [])
@ -62,6 +76,9 @@ def gen_environments(ctx: ArchBuilderContext):
def proc_names(ctx: ArchBuilderContext):
Apply names for system configs

View File

@ -7,6 +7,9 @@ log = getLogger(__name__)
def install_all(ctx: ArchBuilderContext, pacman: Pacman):
Install all pacman packages
packages = ctx.get("pacman.install", [])
if len(packages) <= 0: return"installing packages: %s", " ".join(packages))
@ -14,15 +17,24 @@ def install_all(ctx: ArchBuilderContext, pacman: Pacman):
def install_all_keyring(ctx: ArchBuilderContext, pacman: Pacman):
Install all pacman keyring packages before normal packages
packages: list[str] = ctx.get("pacman.install", [])
if len(packages) <= 0: return
# find out all keyring packages
keyrings = [pkg for pkg in packages if pkg.endswith("-keyring")]
if len(keyrings) <= 0: return"installing keyrings: %s", " ".join(keyrings))
def uninstall_all(ctx: ArchBuilderContext, pacman: Pacman):
Remove all specified pacman packages
packages = ctx.get("pacman.uninstall", [])
if len(packages) <= 0: return"uninstalling packages: %s", " ".join(packages))
@ -30,6 +42,9 @@ def uninstall_all(ctx: ArchBuilderContext, pacman: Pacman):
def append_config(ctx: ArchBuilderContext, lines: list[str]):
Generate basic pacman.conf for rootfs
lines.append("HoldPkg = pacman glibc filesystem\n")
lines.append(f"Architecture = {ctx.tgt_arch}\n")
@ -43,6 +58,9 @@ def append_config(ctx: ArchBuilderContext, lines: list[str]):
def gen_config(ctx: ArchBuilderContext, pacman: Pacman):
Generate full pacman.conf for rootfs
conf = os.path.join(ctx.get_rootfs(), "etc/pacman.conf")
lines: list[str] = []
append_config(ctx, lines)
@ -53,17 +71,30 @@ def gen_config(ctx: ArchBuilderContext, pacman: Pacman):
def proc_pacman(ctx: ArchBuilderContext, pacman: Pacman):
Install or remove packages for rootfs, and generate pacman.conf
install_all(ctx, pacman)
uninstall_all(ctx, pacman)
gen_config(ctx, pacman)
def proc_pacman_keyring(ctx: ArchBuilderContext, pacman: Pacman):
Early install keyring packages
install_all_keyring(ctx, pacman)
def trust_all(ctx: ArchBuilderContext, pacman: Pacman):
Early trust keyring for database and keyring packages
if not ctx.gpgcheck: return
trust = ctx.get("", [])
# receive all keys now
# local sign keys
for key in trust: pacman.lsign_key(key)

View File

@ -6,12 +6,19 @@ log = getLogger(__name__)
def proc_systemd(ctx: ArchBuilderContext):
Enable or disable systemd units files, and set default target
systemd_comp.enable(ctx, ctx.get("systemd.enable", []))
systemd_comp.disable(ctx, ctx.get("systemd.disable", []))
systemd_comp.set_default(ctx, ctx.get("systemd.default", None))
def proc_machine_id(ctx: ArchBuilderContext):
Remove or set machine-id
Never duplicate machine id, it should generate when first boot
id = ctx.get("machine-id", "")
root = ctx.get_rootfs()
mid = os.path.join(root, "etc/machine-id")

View File

@ -6,61 +6,94 @@ log = getLogger(__name__)
def proc_user(ctx: ArchBuilderContext, cfg: dict):
Create a new user and set password
if "name" not in cfg: raise ArchBuilderConfigError("username not set")
name = cfg["name"]
cmds = []
if ctx.passwd.lookup_name(name) is None:
# user is not exists, create it
cmds.append("-m") # create home
action = "created"
# user is already exists, modify it
action = "modified"
# add all options
if "uid" in cfg: cmds.extend(["-u", str(cfg["uid"])])
if "gid" in cfg: cmds.extend(["-g", str(cfg["gid"])])
if "home" in cfg: cmds.extend(["-d", cfg["home"]])
if "shell" in cfg: cmds.extend(["-s", cfg["shell"]])
if "groups" in cfg: cmds.extend(["-G", str(cfg["groups"])])
# run useradd or usermod
ret = chroot_run(ctx, cmds)
if ret != 0: raise OSError(f"{cmds[0]} failed")
# we want to set a password for user
if "password" in cfg:
cmds = ["chpasswd"]
text = f"{name}:{cfg['password']}\n"
ret = chroot_run(ctx, cmds, stdin=text)
if ret != 0: raise OSError("chpasswd failed")
# reload user database
ctx.reload_passwd()"{action} user {name}")
def proc_group(ctx: ArchBuilderContext, cfg: dict):
Create a new group
if "name" not in cfg: raise ArchBuilderConfigError("groupname not set")
name = cfg["name"]
cmds = []
if ctx.passwd.lookup_name(name) is None:
# group is not exists, create it
action = "created"
# group is already exists, modify it
action = "modified"
# add all options
if "gid" in cfg: cmds.extend(["-g", str(cfg["gid"])])
# run groupadd or groupmod
ret = chroot_run(ctx, cmds)
if ret != 0: raise OSError(f"{name} failed")
# reload user database
ctx.reload_passwd()"{action} group {name}")
def proc_users(ctx: ArchBuilderContext):
Create all users
for user in ctx.get("sysconf.user", []):
proc_user(ctx, user)
def proc_groups(ctx: ArchBuilderContext):
Create all groups
for group in ctx.get("", []):
proc_group(ctx, group)
def proc_usergroup(ctx: ArchBuilderContext):
Create all users and groups
proc_groups(ctx) # create groups before users

View File

@ -39,6 +39,9 @@ class Pacman:
caches: list[str]
def append_repos(self, lines: list[str]):
Add all databases into config
for repo in self.databases:
db = self.databases[repo]
@ -47,6 +50,9 @@ class Pacman:
lines.append(f"Server = {server}\n")
def append_config(self, lines: list[str]):
Add basic pacman config for host
siglevel = ("Required DatabaseOptional" if self.ctx.gpgcheck else "Never")
for cache in self.caches:
@ -66,6 +72,9 @@ class Pacman:
def init_keyring(self):
Initialize pacman keyring
path = os.path.join(, "rootfs")
keyring = os.path.join(path, "etc/pacman.d/gnupg")
if not self.ctx.gpgcheck: return
@ -76,6 +85,9 @@ class Pacman:
def init_config(self):
Create host pacman.conf
config = os.path.join(, "pacman.conf")
if os.path.exists(config):
@ -88,6 +100,9 @@ class Pacman:
def pacman_key(self, args: list[str]):
Call pacman-key for rootfs
if not self.ctx.gpgcheck:
raise RuntimeError("GPG check disabled")
keyring = os.path.join(self.root, "etc/pacman.d/gnupg")
@ -100,6 +115,9 @@ class Pacman:
if ret != 0: raise OSError(f"pacman-key failed with {ret}")
def pacman(self, args: list[str]):
Call pacman for rootfs
config = os.path.join(, "pacman.conf")
cmds = ["pacman"]
@ -110,20 +128,32 @@ class Pacman:
if ret != 0: raise OSError(f"pacman failed with {ret}")
def add_database(self, repo: dict):
Add a database and update it
def resolve(url: str) -> str:
Replace pacman.conf variables
return (url
.replace("$arch", self.ctx.tgt_arch)
.replace("$repo", name))
if "name" not in repo:
raise ArchBuilderConfigError("repo name not set")
name = repo["name"]
# never add local into database
if name == "local" or "/" in name:
raise ArchBuilderConfigError("bad repo name")
# register database
if name not in self.databases:
self.databases[name] = self.handle.register_syncdb(
db = self.databases[name]
# add databases servers
servers: list[str] = []
if "server" in repo:
@ -131,10 +161,15 @@ class Pacman:
for server in repo["servers"]:
db.servers = servers
# update database now via pyalpm"updating database {name}")
def load_databases(self):
Add all databases and load them
cfg = self.config
if "repo" not in cfg:
raise ArchBuilderConfigError("no repos found in config")
@ -144,21 +179,33 @@ class Pacman:
def lookup_package(self, name: str) -> list[pyalpm.Package]:
Lookup pyalpm package by name
# pass a filename, load it directly
if ".pkg.tar." in name:
pkg = self.handle.load_pkg(name)
if pkg is None: raise RuntimeError(f"load package {name} failed")
return [pkg]
s = name.split("/")
if len(s) == 2:
if s[0] not in self.databases:
# use DATABASE/PACKAGE, find it in database
if s[0] not in self.databases and s[0] != "local":
raise ValueError(f"database {s[0]} not found")
db = (self.handle.get_localdb() if s[0] == "local" else self.databases[s[0]])
pkg = db.get_pkg(s[1])
if pkg: return [pkg]
raise ValueError(f"package {s[1]} not found")
elif len(s) == 1:
# use PACKAGE, find it in all databases or find as group
# try find it as group
pkg = pyalpm.find_grp_pkgs(self.databases.values(), name)
if len(pkg) > 0: return pkg
# try find it as package
for dbn in self.databases:
db = self.databases[dbn]
pkg = db.get_pkg(name)
@ -167,18 +214,27 @@ class Pacman:
raise ValueError(f"bad package name {name}")
def init_cache(self):
host_cache = "/var/cache/pacman/pkg"
work_cache = os.path.join(, "packages")
root_cache = os.path.join(self.root, "var/cache/pacman/pkg")
Initialize pacman cache folder
host_cache = "/var/cache/pacman/pkg" # host cache
work_cache = os.path.join(, "packages") # workspace cache
root_cache = os.path.join(self.root, "var/cache/pacman/pkg") # rootfs cache
# host cache is existing, use host cache folder
if os.path.exists(host_cache):
os.makedirs(work_cache, mode=0o0755, exist_ok=True)
os.makedirs(root_cache, mode=0o0755, exist_ok=True)
def __init__(self, ctx: ArchBuilderContext):
Initialize pacman context
self.ctx = ctx
if "pacman" not in ctx.config:
raise ArchBuilderConfigError("no pacman found in config")
@ -200,6 +256,9 @@ class Pacman:
def uninstall(self, pkgs: list[str]):
Uninstall packages via pacman
if len(pkgs) == 0: return
ps = " ".join(pkgs)"removing packages {ps}")
@ -215,6 +274,9 @@ class Pacman:
asdeps: bool = False,
nodeps: bool = False,
Install packages via pacman
if len(pkgs) == 0: return
core_db = "var/lib/pacman/sync/core.db"
if not os.path.exists(os.path.join(self.root, core_db)):
@ -229,6 +291,9 @@ class Pacman:
def download(self, pkgs: list[str]):
Download packages via pacman
if len(pkgs) == 0: return
core_db = "var/lib/pacman/sync/core.db"
if not os.path.exists(os.path.join(self.root, core_db)):
@ -239,6 +304,9 @@ class Pacman:
def install_local(self, files: list[str]):
Install a local packages via pacman
if len(files) == 0: return"installing local packages %s", " ".join(files))
args = ["--needed", "--upgrade"]
@ -246,12 +314,18 @@ class Pacman:
def refresh(self, /, force: bool = False):
Update local databases via pacman
""""refresh pacman database")
args = ["--sync", "--refresh"]
if force: args.append("--refresh")
def recv_keys(self, keys: str | list[str]):
Receive a key via pacman-key
args = ["--recv-keys"]
if type(keys) is str:
@ -262,6 +336,9 @@ class Pacman:
def lsign_key(self, key: str):
Local sign a key via pacman-key
self.pacman_key(["--lsign-key", key])
def pouplate_keys(
@ -269,6 +346,9 @@ class Pacman:
names: str | list[str] = None,
folder: str = None
Populate all keys via pacman-key
args = ["--populate"]
if folder: args.extend(["--populate-from", folder])
if names is None: pass
@ -278,31 +358,49 @@ class Pacman:
def find_package_file(self, pkg: pyalpm.Package) -> str | None:
Find out pacman package archive file in cache
for cache in self.caches:
p = os.path.join(cache, pkg.filename)
if os.path.exists(p): return p
return None
def trust_keyring_pkg(self, pkg: pyalpm.Package):
Trust a keyring package from file without install it
if not self.ctx.gpgcheck: return
names: list[str] = []
target = os.path.join(, "keyrings")
keyring = "usr/share/pacman/keyrings/"
# find out file path
path = self.find_package_file(pkg)
# cleanup keyring extract folder
if os.path.exists(target):
os.makedirs(target, mode=0o0755)
if path is None: raise RuntimeError(
f"package {} not found"
# open keyring package to extract
log.debug(f"processing keyring package {}")
with libarchive.file_reader(path) as archive:
for file in archive:
pn: str = file.pathname
if not pn.startswith(keyring): continue
# get the filename of file
fn = pn[len(keyring):]
if len(fn) <= 0: continue
# add keyring name to populate
if fn.endswith(".gpg"): names.append(fn[:-4])
# extract file
dest = os.path.join(target, fn)
log.debug(f"extracting {pn} to {dest}")
with open(dest, "wb") as f:
@ -311,9 +409,14 @@ class Pacman:
fd = f.fileno()
os.fchmod(fd, file.mode)
os.fchown(fd, file.uid, file.gid)
# trust extracted keyring
self.pouplate_keys(names, target)
def add_trust_keyring_pkg(self, pkgnames: list[str]):
Trust a keyring package from file without install it
if not self.ctx.gpgcheck: return
if len(pkgnames) <= 0: return

View File

@ -4,13 +4,18 @@ from builder.lib.context import ArchBuilderContext
def systemctl(ctx: ArchBuilderContext, args: list[str]):
Call systemctl in rootfs
path = ctx.get_rootfs()
full_args = ["systemctl"]
if utils.have_external("systemctl"):
# use host systemctl possible
ret = ctx.run_external(full_args)
# if host systemd is unavailable, use chroot run
ret = filesystem.chroot_run(ctx, full_args)
if ret != 0: raise OSError(
@ -20,6 +25,9 @@ def systemctl(ctx: ArchBuilderContext, args: list[str]):
def enable(ctx: ArchBuilderContext, units: list[str]):
Enable systemd units
if len(units) <= 0: return
args = ["enable", "--"]
@ -27,6 +35,9 @@ def enable(ctx: ArchBuilderContext, units: list[str]):
def disable(ctx: ArchBuilderContext, units: list[str]):
Disable systemd units
if len(units) <= 0: return
args = ["disable", "--"]
@ -34,5 +45,8 @@ def disable(ctx: ArchBuilderContext, units: list[str]):
def set_default(ctx: ArchBuilderContext, unit: str):
Set default boot target for systemd
if not unit: return
systemctl(ctx, ["set-default", "--", unit])

View File

@ -62,9 +62,13 @@ def parse_user_from(
node: dict,
default: tuple[int, int] = (0, -1)
) -> tuple[int, int]:
if "owner" in node: return parse_owner(ctx, node["owner"])
uid = parse_usergroup_from(ctx, node, False, default[0])
gid = parse_usergroup_from(ctx, node, True, default[1])
Parse user/group id from config
uid, gid = default
if "owner" in node: uid, gid = parse_owner(ctx, node["owner"])
uid = parse_usergroup_from(ctx, node, False, uid)
gid = parse_usergroup_from(ctx, node, True, gid)
if gid == -1:
user = ctx.passwd.lookup_uid(uid)
if user is None: raise ArchBuilderConfigError(

View File

@ -23,10 +23,16 @@ class Area(SerializableDict):
def reset(self) -> Self:
Remove all fields
self.set(-1, -1, -1)
return self
def from_dict(self, o: dict) -> Self:
Load all fields from config
if "start" in o: self.start = size_to_bytes(o["start"])
if "offset" in o: self.start = size_to_bytes(o["offset"])
@ -36,6 +42,9 @@ class Area(SerializableDict):
return self
def is_area_in(self, area: Self) -> bool:
Is another area full in this area
return (
@ -45,6 +54,9 @@ class Area(SerializableDict):
def fixup(self) -> Self:
Fill missing fields
if self.start >= 0 and self.end >= 0 and self.start > self.end + 1:
raise ValueError("start large than end")
if 0 <= self.end < self.size and self.size >= 0:
@ -52,17 +64,20 @@ class Area(SerializableDict):
if self.start >= 0 and self.end >= 0 and self.size >= 0:
if self.size != self.end - self.start + 1:
raise ValueError("bad size")
elif self.start >= 0 and self.end >= 0:
elif self.start >= 0 and self.end >= 0: # need size
self.size = self.end - self.start + 1
elif self.start >= 0 and self.size >= 0:
elif self.start >= 0 and self.size >= 0: # need end
self.end = self.start + self.size - 1
elif self.end >= 0 and self.size >= 0:
elif self.end >= 0 and self.size >= 0: # need start
self.start = self.end - self.size + 1
raise ValueError("missing value")
return self
def __init__(self, start: int = -1, end: int = -1, size: int = -1, area: Self = None):
Initialize a area
if area: start, end, size = area.to_tuple()
self.start, self.end, self.size = start, end, size
@ -78,9 +93,15 @@ def to_tuple(start: int = -1, end: int = -1, size: int = -1, area: Area = None)
class Areas(list[Area], SerializableList):
def is_area_in(self, area: Area) -> bool:
Is an area fully in this areas
return any(pool.is_area_in(area) for pool in self)
def merge(self) -> Self:
Merge all areas
idx = 0
self.sort(key=lambda x: (x.start, x.end))
while len(self) > 0:
@ -91,6 +112,7 @@ class Areas(list[Area], SerializableList):
if idx > 0:
last = self[idx - 1]
if last.end + 1 >= curr.start:
# last end equals to this start
ent = Area(last.start, curr.end)
@ -108,6 +130,9 @@ class Areas(list[Area], SerializableList):
size: int = -1,
area: Area = None,
) -> Area | None:
Lookup an area with fields
start, end, size = to_tuple(start, end, size, area)
for area in self:
if not (area.start <= start <= area.end): continue
@ -117,6 +142,9 @@ class Areas(list[Area], SerializableList):
return None
def align(self, align: int) -> Self:
Align all fields to value
self.sort(key=lambda x: (x.start, x.end))
for area in self:
start = round_up(area.start, align)
@ -136,6 +164,9 @@ class Areas(list[Area], SerializableList):
size: int = -1,
area: Area = None
) -> Area | None:
Add an area to this areas
if area: start, end, size = area.to_tuple()
cnt = (start >= 0) + (end >= 0) + (size >= 0)
if cnt < 2: raise ValueError("missing value")
@ -151,6 +182,9 @@ class Areas(list[Area], SerializableList):
size: int = -1,
area: Area = None,
) -> bool:
Remove a range from areas
start, end, size = to_tuple(start, end, size, area)
if len(self) <= 0: return False
rs = min(area.start for area in self)
@ -174,6 +208,9 @@ class Areas(list[Area], SerializableList):
area: Area = None,
biggest: bool = True,
) -> Area | None:
Find matched area
if area: start, end, size = area.to_tuple()
cnt = (start >= 0) + (end >= 0) + (size >= 0)
if cnt >= 2:

View File

@ -11,27 +11,45 @@ class CGroup:
def path(self) -> str:
Get full path of this cgroup
return os.path.join(self.fs,
def valid(self) -> bool:
Can read or write to this cgroup
return os.path.exists(self.path)
def create(self):
Create this cgroup now
if self.valid: return
def destroy(self):
Destroy the cgroup
if not self.valid: return
def add_pid(self, pid: int):
Add a pid to track
if not self.valid: return
procs = os.path.join(self.path, "cgroup.procs")
with open(procs, "w") as f:
def list_pid(self) -> list[int]:
List all tracked children progress id
ret: list[int] = []
if not self.valid: return ret
procs = os.path.join(self.path, "cgroup.procs")
@ -41,22 +59,36 @@ class CGroup:
return ret
def kill_all(self, sig: int = signal.SIGTERM, timeout: int = 10, kill: int = 8):
Kill all children process and wait them exit
if not self.valid: return
pids = self.list_pid()
remain = 0
while True:
# send a signal
for pid in pids:
log.debug(f"killing {pid}")
try: os.kill(pid, sig)
except: pass
# waitpid to clean zombie
try: os.waitpid(-1, os.WNOHANG)
except: pass
# check all children was exited
pids = self.list_pid()
if len(pids) <= 0: break
# set to SIGKILL when reached kill time
if 0 < kill <= remain:
sig = signal.SIGKILL
# timeoutd, throw out
if remain >= timeout:
raise TimeoutError("killing pids timedout")
# wait...
def __init__(self, name: str, fs: str = None):

View File

@ -12,6 +12,9 @@ class ArchBuilderConfigError(Exception):
def _dict_merge(dst: dict, src: dict):
Merge two dict with override
for key in src.keys():
st = type(src[key])
if key in dst and st is type(dst[key]):

View File

@ -89,6 +89,9 @@ class ArchBuilderContext:
group: GroupFile = GroupFile()
def get(self, key: str, default=None):
Get config value
try: return dict_get(key, self.config)
except: return default
@ -104,6 +107,9 @@ class ArchBuilderContext:
def cleanup(self):
Cleanup build context
from import undo_mounts
@ -139,6 +145,9 @@ class ArchBuilderContext:
return ret
def reload_passwd(self):
Reload user database
root = self.get_rootfs()
pf = os.path.join(root, "etc/passwd")
gf = os.path.join(root, "etc/group")
@ -148,9 +157,15 @@ class ArchBuilderContext:
if os.path.exists(gf):
def finish_config(self):
Done load configs
self.config_orig = deepcopy(self.config)
def resolve_subscript(self):
Run subscript replaces
ss = SubScript()
self.config = deepcopy(self.config_orig)

View File

@ -4,6 +4,9 @@ from builder.lib import serializable
class SerializableEncoder(json.JSONEncoder):
JSON implement of serializable interface
def default(self, o):
if isinstance(o, UUID):
return str(o)

View File

@ -33,6 +33,9 @@ class MountPoint(SerializableDict):
def virtual(self) -> bool:
Is current mount point a virtual filesystem
if self.fstype:
if self.fstype in virtual_fs: return True
if self.fstype in real_fs: return False
@ -45,6 +48,12 @@ class MountPoint(SerializableDict):
def level(self) -> int:
Get current target levels
/ => 1
/boot => 2
/usr/bin => 3
if is None: return 0
path = os.path.realpath(
cnt = path.count(os.sep)
@ -56,13 +65,22 @@ class MountPoint(SerializableDict):
def options(self):
Get options as string
return ",".join(self.option)
def options(self, val: str):
Set options from string
self.option = val.split(",")
def get_option(self, opt: str) -> str | None:
Get an option from string
if opt in self.option:
return opt
if "=" not in opt:
@ -72,6 +90,9 @@ class MountPoint(SerializableDict):
return None
def remove_option(self, opt: str | list[str]) -> Self:
Remove an option
if type(opt) is list[str]:
for o in opt:
@ -86,11 +107,17 @@ class MountPoint(SerializableDict):
return self
def exclusive_option(self, opt: str, opt1: str, opt2: str) -> Self:
Remove a exclusive option
if opt == opt1 or opt == opt2:
return self
def add_option(self, opt: str) -> Self:
Add an option
self.exclusive_option(opt, "ro", "rw")
self.exclusive_option(opt, "dev", "nodev")
self.exclusive_option(opt, "suid", "nosuid")
@ -102,10 +129,16 @@ class MountPoint(SerializableDict):
return self
def ro(self) -> Self:
Set mount point to read-only
return self
def rw(self) -> Self:
Set mount point to read-write
return self
@ -115,6 +148,9 @@ class MountPoint(SerializableDict):
def have_options(self) -> bool: return len(self.option) > 0
def update_device(self):
Update device field from source
if self.virtual or self.source is None: return
if self.source.startswith(os.sep):
self.device = self.source
@ -124,6 +160,9 @@ class MountPoint(SerializableDict):
def persist_source(self, tag: str = "UUID"):
Change source to persist source
if self.virtual: return
if self.device is None: self.update_device()
if self.device is None: return
@ -136,6 +175,9 @@ class MountPoint(SerializableDict):
def tolibmount(self) -> libmount.Context:
To util-linux libmount context
mnt = libmount.Context() =
if self.have_source(): mnt.source = self.source
@ -144,9 +186,15 @@ class MountPoint(SerializableDict):
return mnt
def ismount(self) -> bool:
Is current mount point mounted
return os.path.ismount(
def mount(self) -> Self:
Mount now
if not os.path.exists(
os.makedirs(, mode=0o0755)
if not os.path.ismount(
@ -161,6 +209,9 @@ class MountPoint(SerializableDict):
return self
def umount(self) -> Self:
UnMount now
if os.path.ismount(
lib = self.tolibmount()
@ -168,6 +219,9 @@ class MountPoint(SerializableDict):
return self
def from_mount_line(self, line: str) -> Self:
Load from mtab / fstab
d = line.split()
if len(d) != 6:
raise ValueError("bad mount line")
@ -180,6 +234,10 @@ class MountPoint(SerializableDict):
return self
def to_mount_line(self) -> str:
To mount tab line string
fields = [
@ -246,6 +304,9 @@ class MountPoint(SerializableDict):
class MountTab(list[MountPoint], SerializableList):
def find_folder(self, folder: str) -> Self:
Find mount point target starts with folder
root = os.path.realpath(folder)
return [mnt for mnt in self if]
@ -254,12 +315,18 @@ class MountTab(list[MountPoint], SerializableList):
def find_fstype(self, fstype: str) -> Self: return [mnt for mnt in self if mnt.fstype == fstype]
def clone(self) -> Self:
Fully clone a MountTab
mnts = MountTab()
for mnt in self:
return mnts
def mount_all(self, prefix: str = None, mkdir: bool = False) -> Self:
Mount all mount points
for mnt in self:
m = mnt.clone()
if prefix:
@ -271,9 +338,15 @@ class MountTab(list[MountPoint], SerializableList):
return self
def resort(self):
Sort mount points by path level
self.sort(key=lambda x: (x.level, len(,
def strip_virtual(self) -> Self:
Remove all virtual filesystem mount points
for mnt in self:
if mnt.virtual:
@ -288,6 +361,9 @@ class MountTab(list[MountPoint], SerializableList):
return self
def to_mount_file(self, linesep=os.linesep) -> str:
Convert to mount file (fstab)
ret = "# Source Target FS-Type Options FS-Freq FS-Dump"
ret += linesep
for point in self:

View File

@ -4,10 +4,16 @@ from builder.lib.serializable import SerializableDict, SerializableList
def zero2empty(num: int) -> str:
Convert number to string, when zero return empty
return str(num) if num != 0 else ""
def none2empty(val: str) -> str:
Return a string, when None return empty
return val if val else ""
@ -25,6 +31,9 @@ class UserFile(SerializableList):
def unload(self): pass
def load_str(self, content: str | list[str]) -> Self:
Load whole file as string
if type(content) is str:
content = content.split("\n")
for line in content:

View File

@ -13,6 +13,9 @@ def str_find_all(
start: typing.SupportsIndex | None = None,
end: typing.SupportsIndex | None = None,
) -> int:
Find the closest string with multiple key
if type(keys) is str: return orig.find(keys, start, end)
result: list[int] = [orig.find(key, start, end) for key in keys]
while -1 in result: result.remove(-1)
@ -20,6 +23,11 @@ def str_find_all(
def parse_cmd_args(cmd: str | list[str]) -> list[str]:
Parse command line to list
parse_cmd_args("ls -la /mnt") = ["ls", "-la", "/mnt"]
parse_cmd_args(["ls", "-la", "/mnt"]) = ["ls", "-la", "/mnt"]
if type(cmd) is str: return shlex.split(cmd)
elif type(cmd) is list: return cmd
else: raise TypeError("unknown type for cmd")
@ -49,16 +57,37 @@ def fd_get_path(fd: int) -> str | None:
fd_get_path(1) = "/dev/pts/0"
link = f"/proc/self/fd/{fd}"
# target is not exists?
if not os.path.exists(link): return None
# read link of fd
path = os.readlink(link)
# must starts with / (is an absolute path)
if not path.startswith("/"): return None
# do not use memfd
if path.startswith("/memfd:"): return None
# do not use a deleted file
if path.endswith(" (deleted)"): return None
# target file is not exists (should not happen)
if not os.path.exists(path): return None
return path
def size_to_bytes(value: str | int, alt_units: dict = None) -> int:
Convert human-readable size string to number
size_to_bytes("1MiB") = 1048576
size_to_bytes("4K") = 4096
size_to_bytes("64b") = 8
size_to_bytes(123) = 123
size_to_bytes("2048s", {'s': 512}) = 1048576
units = {
'b': 0.125, 'bit': 0.125, 'bits': 0.125, 'Bit': 0.125, 'Bits': 0.125,
'B': 1, 'Byte': 1, 'Bytes': 1, 'bytes': 1, 'byte': 1,
@ -71,45 +100,89 @@ def size_to_bytes(value: str | int, alt_units: dict = None) -> int:
'z': 10**15, 'zB': 10**15, 'zb': 10**15, 'Z': 2**50, 'ZB': 2**50, 'ZiB': 2**50,
'y': 10**15, 'yB': 10**15, 'yb': 10**15, 'Y': 2**50, 'YB': 2**50, 'YiB': 2**50,
if type(value) is int: return value
if type(value) is int:
# return number directly
return value
elif type(value) is str:
# add custom units
if alt_units: units.update(alt_units)
# find all matched units
matches = {unit: len(unit) for unit in units if value.endswith(unit)}
# find out the longest matched unit
max_unit = max(matches.values(), default=0)
# use the longest unit
unit = next((unit for unit in matches.keys() if matches[unit] == max_unit), None)
# get mul for target unit
mul = units[unit] if unit else 1.0
# convert string to target number
return int(float(value[:len(value)-max_unit].strip()) * mul)
else: raise TypeError("bad size value")
def bytes_pad(b: bytes, size: int, trunc: bool = False, pad: bytes = b'\0') -> bytes:
Padding a bytes to specified length
l = len(b)
if l > size and trunc: b = b[:size]
if l < size: b += pad * (size - l)
# if larger than specified size, truncate
if l > size and trunc:
b = b[:size]
# if smaller than specified size, padding
if l < size:
b += pad * (size - l)
return b
def round_up(value: int, align: int) -> int:
Align up a number
round_down(0x2000, 0x1000) = 0x2000
round_down(0x2001, 0x1000) = 0x3000
round_down(0x1FFF, 0x1000) = 0x2000
return (value + align - 1) & ~(align - 1)
def round_down(value: int, align: int) -> int:
Align down a number
round_down(0x2000, 0x1000) = 0x2000
round_down(0x2001, 0x1000) = 0x2000
round_down(0x1FFF, 0x1000) = 0x1000
return value & ~(align - 1)
def open_config(path: str, mode=0o0644) -> io.TextIOWrapper:
Open a config file for write
If original file is existing, move to FILE.dist
dist = f"{path}.dist"
have_dist = False
if os.path.exists(dist):
# dist file already exists, no move
have_dist = True
elif os.path.exists(path):
# target file already exists, rename to dist
shutil.move(path, dist)
have_dist = True
# FIXME: should not move previous write to dist
# open and truncate
flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd =, flags=flags, mode=mode)
if fd < 0: raise IOError(f"open {path} failed")
fp = os.fdopen(fd, "w")
# write a comment to tell user dist was renamed
fp.write("# This file is auto generated by arch-image-builder\n")
if have_dist:
fn = os.path.basename(dist)
@ -119,10 +192,18 @@ def open_config(path: str, mode=0o0644) -> io.TextIOWrapper:
# file close managed by parent function
return fp
def path_to_name(path: str) -> str:
Convert path to a identifier
path_to_name("") = "empty"
path_to_name("/") = "rootfs"
path_to_name("/boot") = "boot"
path_to_name("/etc/fstab") = "etc-fstab"
if path == "/": return "rootfs"
if path.startswith("/"): path = path[1:]
if len(path) <= 0: return "empty"

View File

@ -1,3 +1,4 @@
# Default user for ArchLinux ARM
- name: alarm

View File

@ -1,3 +1,4 @@
# Default user for ArchLinux
- name: arch

View File

@ -1,5 +1,7 @@
# Users in wheel group can run root commands without password
# polkit
- path: /etc/polkit-1/rules.d/99-wheel.rules
mode: 0640
content: |
@ -7,10 +9,12 @@ filesystem:
return polkit.Result.YES;
# sudo
- path: /etc/sudoers.d/wheel
mode: 0640
content: |
# Ensure sudo is installed
- sudo

View File

@ -1,3 +1,4 @@
# GNOME desktop
- gnome
@ -6,4 +7,5 @@ systemd:
- gdm.service
# Ensure NetworkManager is enabled
- packages/network-manager

View File

@ -2,20 +2,36 @@ name: AYN Odin 2
arch: aarch64
soc: qcs8550
device: ayn-odin2
# hypdtbo for platform ABL boot
device_suffix: -hypdtbo
# For regulatory.db
- wireless-regdb
# Qualcomm firmwares for AYN Odin2
- linux-firmware-ayn-odin2
# Mainline kernel for AYN Odin2
- linux-ayn-odin2-edge
# Adreno 740 GPU
- mesa-qcom-git
# Vulkan test tools
- vulkan-tools
# Vulkan dependency
- xcb-util-keysyms
# No modem in this device
- rmtfs.service
# GamePAD workaround
- path: /etc/udev/rules.d/99-${device}.rules
content: |
SUBSYSTEM=="input", ATTRS{name}=="Ayn Odin2 Gamepad", MODE="0666", ENV{ID_INPUT_MOUSE}="0", ENV{ID_INPUT_JOYSTICK}="1"
@ -41,6 +57,7 @@ filesystem:
chassis: handset
# Force to use zink driver for OpenGL
@ -53,6 +70,8 @@ kernel:
- allow_mismatched_32bit_el0
# Put these firmware to initramfs for boot
# TODO: do not put they into initramfs
- /usr/lib/firmware/qcom/sm8550/ayn/odin2/adsp.mbn
- /usr/lib/firmware/qcom/sm8550/ayn/odin2/adsp_dtb.mbn
- /usr/lib/firmware/qcom/sm8550/ayn/odin2/cdsp.mbn
@ -72,8 +91,18 @@ mkinitcpio:
- os/archlinuxarm
- repo/archlinuxcn
- repo/renegade-project
# Generic qualcomm related config
- device/qcom
# USB Gadget tools
- packages/systemd-gadget
# OpenSSH Server
- packages/openssh
# Text editor
- packages/editor
# Bluetooth related services
- packages/bluez

View File

@ -2,10 +2,19 @@ platform: qcom
# Android A/B Slot Switcher
- qbootctl
# Qualcomm IPC Router
- qrtr
# Modem remote filesystem
- rmtfs
# TFTP via Qualcomm IPC Router
- tqftpserv
# Protection Domain Mapper
- pd-mapper

View File

@ -1,31 +1,44 @@
# I18N for Chinese (Simplified)
- "zh_CN.UTF-8 UTF-8"
- "en_US.UTF-8 UTF-8"
default: zh_CN.UTF-8
# Wireless regulatory
- path: /etc/conf.d/wireless-regdom
content: |
# China accelerated DNS
- path: /etc/systemd/resolved.conf.d/cn-dns.conf
content: |
# China accelerated NTP Server
- path: /etc/systemd/timesyncd.conf.d/cn-ntp.conf
content: |
# Fonts for CJK language
- noto-fonts-cjk
- wqy-bitmapfont
- wqy-microhei
- wqy-microhei-lite
- wqy-zenhei
# Input method for Chinese
- ibus
- ibus-libpinyin

View File

@ -1,3 +1,4 @@
# Bluetooth
- bluez

View File

@ -1,10 +1,13 @@
# USB Gadget tools
- systemd-gadget
- dnsmasq
# Disable tty ACM to avoid bugs
- getty@ttyGS0.service
- usbgadget-func-acm.service
# Enable systemd-networkd for RNDIS
- systemd-networkd.service

View File

@ -1,5 +1,6 @@
# Network configs from archiso
- path: /etc/systemd/network/
content: |