mirror of
https://github.com/BigfootACA/arch-image-builder.git
synced 2024-11-14 15:03:33 +08:00
455 lines
10 KiB
Python
455 lines
10 KiB
Python
import io
|
|
import os
|
|
import libmount
|
|
from typing import Self
|
|
from logging import getLogger
|
|
from builder.lib.blkid import Blkid
|
|
from builder.lib.serializable import SerializableDict, SerializableList
|
|
log = getLogger(__name__)
|
|
|
|
virtual_fs = [
|
|
"sysfs", "tmpfs", "proc", "cgroup", "cgroup2", "hugetlbfs",
|
|
"devtmpfs", "binfmt_misc", "configfs", "debugfs", "tracefs", "cpuset",
|
|
"securityfs", "sockfs", "bpf", "pipefs", "ramfs", "binder", "bdev",
|
|
"devpts", "autofs", "efivarfs", "mqueue", "resctrl", "pstore", "fusectl",
|
|
]
|
|
|
|
real_fs = [
|
|
"reiserfs", "ext4", "ext3", "ext2", "cramfs", "squashfs", "minix", "vfat",
|
|
"msdos", "exfat", "iso9660", "hfsplus", "gfs2meta", "ecryptfs", "ntfs3", "ufs",
|
|
"jffs2", "ubifs", "affs", "romfs", "ocfs2_dlmfs", "omfs", "jfs", "xfs", "nilfs2",
|
|
"befs", "ocfs2", "btrfs", "hfs", "gfs2", "udf", "f2fs", "bcachefs", "erofs",
|
|
]
|
|
|
|
|
|
class MountPoint(SerializableDict):
|
|
device: str = None
|
|
source: str = None
|
|
target: str = None
|
|
fstype: str = None
|
|
option: list[str] = []
|
|
fs_freq: int = 0
|
|
fs_passno: int = 0
|
|
|
|
@property
|
|
def virtual(self) -> bool:
|
|
"""
|
|
Is current mount point a virtual filesystem
|
|
"""
|
|
if self.fstype:
|
|
if self.fstype in virtual_fs: return True
|
|
if self.fstype in real_fs: return False
|
|
if self.device:
|
|
if self.device.startswith(os.sep): return False
|
|
if self.source:
|
|
if self.source.startswith(os.sep): return False
|
|
if "=" in self.source: return False
|
|
return True
|
|
|
|
@property
|
|
def level(self) -> int:
|
|
"""
|
|
Get current target levels
|
|
/ => 1
|
|
/boot => 2
|
|
/usr/bin => 3
|
|
"""
|
|
if self.target is None: return 0
|
|
path = os.path.realpath(self.target)
|
|
cnt = path.count(os.sep)
|
|
if (
|
|
path.startswith(os.sep) and
|
|
not path.endswith(os.sep)
|
|
): cnt += 1
|
|
return cnt
|
|
|
|
@property
|
|
def options(self):
|
|
"""
|
|
Get options as string
|
|
"""
|
|
return ",".join(self.option)
|
|
|
|
@options.setter
|
|
def options(self, val: str):
|
|
"""
|
|
Set options from string
|
|
"""
|
|
self.option = val.split(",")
|
|
|
|
def get_option(self, opt: str) -> str | None:
|
|
"""
|
|
Get an option from string
|
|
"""
|
|
if opt in self.option:
|
|
return opt
|
|
if "=" not in opt:
|
|
start = f"{opt}="
|
|
values = (o for o in self.option if o.startswith(start))
|
|
return next(values, None)
|
|
return None
|
|
|
|
def remove_option(self, opt: str | list[str]) -> Self:
|
|
"""
|
|
Remove an option
|
|
"""
|
|
if type(opt) is list[str]:
|
|
for o in opt:
|
|
self.remove_option(o)
|
|
return
|
|
if opt in self.option:
|
|
self.option.remove(opt)
|
|
return
|
|
if "=" in opt: opt = opt[:opt.find("=")]
|
|
val = self.get_option(opt)
|
|
if val:
|
|
self.remove_option(val)
|
|
return self
|
|
|
|
def exclusive_option(self, opt: str, opt1: str, opt2: str) -> Self:
|
|
"""
|
|
Remove a exclusive option
|
|
"""
|
|
if opt == opt1 or opt == opt2:
|
|
self.remove_option(opt1)
|
|
return self
|
|
|
|
def add_option(self, opt: str) -> Self:
|
|
"""
|
|
Add an option
|
|
"""
|
|
self.exclusive_option(opt, "ro", "rw")
|
|
self.exclusive_option(opt, "dev", "nodev")
|
|
self.exclusive_option(opt, "suid", "nosuid")
|
|
self.exclusive_option(opt, "exec", "noexec")
|
|
self.exclusive_option(opt, "relatime", "noatime")
|
|
self.remove_option(opt)
|
|
if opt not in self.option:
|
|
self.option.append(opt)
|
|
return self
|
|
|
|
def ro(self) -> Self:
|
|
"""
|
|
Set mount point to read-only
|
|
"""
|
|
self.add_option("ro")
|
|
return self
|
|
|
|
def rw(self) -> Self:
|
|
"""
|
|
Set mount point to read-write
|
|
"""
|
|
self.add_option("rw")
|
|
return self
|
|
|
|
def have_source(self) -> bool: return self.source and self.source != "none"
|
|
def have_target(self) -> bool: return self.target and self.target != "none"
|
|
def have_fstype(self) -> bool: return self.fstype and self.fstype != "none"
|
|
def have_options(self) -> bool: return len(self.option) > 0
|
|
|
|
def update_device(self):
|
|
"""
|
|
Update device field from source
|
|
"""
|
|
if self.virtual or self.source is None: return
|
|
if self.source.startswith(os.sep):
|
|
self.device = self.source
|
|
return
|
|
if "=" in self.source:
|
|
self.device = Blkid().evaluate_tag(self.source)
|
|
return
|
|
|
|
def persist_source(self, tag: str = "UUID"):
|
|
"""
|
|
Change source to persist source
|
|
"""
|
|
if self.virtual: return
|
|
if self.device is None: self.update_device()
|
|
if self.device is None: return
|
|
tag = tag.upper()
|
|
if tag == "PATH":
|
|
self.source = self.device
|
|
return
|
|
self.source = Blkid().get_tag_value(
|
|
None, tag, self.device
|
|
)
|
|
|
|
def tolibmount(self) -> libmount.Context:
|
|
"""
|
|
To util-linux libmount context
|
|
"""
|
|
mnt = libmount.Context()
|
|
mnt.target = self.target
|
|
if self.have_source(): mnt.source = self.source
|
|
if self.have_fstype(): mnt.fstype = self.fstype
|
|
if self.have_options(): mnt.options = self.options
|
|
return mnt
|
|
|
|
def ismount(self) -> bool:
|
|
"""
|
|
Is current mount point mounted
|
|
"""
|
|
return os.path.ismount(self.target)
|
|
|
|
def mount(self) -> Self:
|
|
"""
|
|
Mount now
|
|
"""
|
|
if not os.path.exists(self.target):
|
|
os.makedirs(self.target, mode=0o0755)
|
|
if not os.path.ismount(self.target):
|
|
log.debug(
|
|
f"try mount {self.source} "
|
|
f"to {self.target} "
|
|
f"as {self.fstype} "
|
|
f"with {self.options}"
|
|
)
|
|
lib = self.tolibmount()
|
|
lib.mount()
|
|
return self
|
|
|
|
def umount(self) -> Self:
|
|
"""
|
|
UnMount now
|
|
"""
|
|
if os.path.ismount(self.target):
|
|
lib = self.tolibmount()
|
|
lib.umount()
|
|
log.debug(f"umount {self.target} successfuly")
|
|
return self
|
|
|
|
def from_mount_line(self, line: str) -> Self:
|
|
"""
|
|
Load from mtab / fstab
|
|
"""
|
|
d = line.split()
|
|
if len(d) != 6:
|
|
raise ValueError("bad mount line")
|
|
self.source = d[0]
|
|
self.target = d[1]
|
|
self.fstype = d[2]
|
|
self.options = d[3]
|
|
self.fs_freq = int(d[4])
|
|
self.fs_passno = int(d[5])
|
|
return self
|
|
|
|
def to_mount_line(self) -> str:
|
|
"""
|
|
To mount tab line string
|
|
PARTLABEL=
|
|
"""
|
|
self.fixup()
|
|
fields = [
|
|
self.source,
|
|
self.target,
|
|
self.fstype,
|
|
self.options,
|
|
str(self.fs_freq),
|
|
str(self.fs_passno),
|
|
]
|
|
return " ".join(fields)
|
|
|
|
def fixup(self) -> Self:
|
|
if not self.have_source(): self.source = "none"
|
|
if not self.have_target(): self.target = "none"
|
|
if not self.have_fstype(): self.fstype = "none"
|
|
if not self.have_options(): self.options = "defaults"
|
|
return self
|
|
|
|
def clone(self) -> Self:
|
|
mnt = MountPoint()
|
|
mnt.device = self.device
|
|
mnt.source = self.source
|
|
mnt.target = self.target
|
|
mnt.fstype = self.fstype
|
|
mnt.options = self.options
|
|
mnt.fs_freq = self.fs_freq
|
|
mnt.fs_passno = self.fs_passno
|
|
return mnt
|
|
|
|
def __init__(
|
|
self,
|
|
data: dict = None,
|
|
device: str = None,
|
|
source: str = None,
|
|
target: str = None,
|
|
fstype: str = None,
|
|
options: str = None,
|
|
option: list[str] = None,
|
|
fs_freq: int = None,
|
|
fs_passno: int = None,
|
|
):
|
|
super().__init__()
|
|
self.device = None
|
|
self.source = None
|
|
self.target = None
|
|
self.fstype = None
|
|
self.option = []
|
|
self.fs_freq = 0
|
|
self.fs_passno = 0
|
|
if data: self.from_dict(data)
|
|
if device: self.device = device
|
|
if source: self.source = source
|
|
if target: self.target = target
|
|
if fstype: self.fstype = fstype
|
|
if options: self.options = options
|
|
if option: self.option = option
|
|
if fs_freq: self.fs_freq = fs_freq
|
|
if fs_passno: self.fs_passno = fs_passno
|
|
|
|
@staticmethod
|
|
def parse_mount_line(line: str):
|
|
return MountPoint().from_mount_line(line)
|
|
|
|
|
|
class MountTab(list[MountPoint], SerializableList):
|
|
def find_folder(self, folder: str) -> Self:
|
|
"""
|
|
Find mount point target starts with folder
|
|
"""
|
|
root = os.path.realpath(folder)
|
|
return [mnt for mnt in self if mnt.target.startswith(root)]
|
|
|
|
def find_target(self, target: str) -> Self: return [mnt for mnt in self if mnt.target == target]
|
|
def find_source(self, source: str) -> Self: return [mnt for mnt in self if mnt.source == source]
|
|
def find_fstype(self, fstype: str) -> Self: return [mnt for mnt in self if mnt.fstype == fstype]
|
|
|
|
def clone(self) -> Self:
|
|
"""
|
|
Fully clone a MountTab
|
|
"""
|
|
mnts = MountTab()
|
|
for mnt in self:
|
|
mnts.append(mnt.clone())
|
|
return mnts
|
|
|
|
def mount_all(self, prefix: str = None, mkdir: bool = False) -> Self:
|
|
"""
|
|
Mount all mount points
|
|
"""
|
|
for mnt in self:
|
|
m = mnt.clone()
|
|
if prefix:
|
|
if m.target == "/": m.target = prefix
|
|
else: m.target = os.path.join(prefix, m.target[1:])
|
|
if mkdir and not os.path.exists(m.target):
|
|
os.makedirs(m.target, mode=0o0755)
|
|
m.mount()
|
|
return self
|
|
|
|
def resort(self):
|
|
"""
|
|
Sort mount points by path level
|
|
"""
|
|
self.sort(key=lambda x: (x.level, len(x.target), x.target))
|
|
|
|
def strip_virtual(self) -> Self:
|
|
"""
|
|
Remove all virtual filesystem mount points
|
|
"""
|
|
for mnt in self:
|
|
if mnt.virtual:
|
|
self.remove(mnt)
|
|
return self
|
|
|
|
def to_list(self) -> list:
|
|
return self
|
|
|
|
def from_list(self, o: list) -> Self:
|
|
self.clear()
|
|
for i in o: self.append(MountPoint().from_dict(i))
|
|
return self
|
|
|
|
def to_mount_file(self, linesep=os.linesep) -> str:
|
|
"""
|
|
Convert to mount file (fstab)
|
|
"""
|
|
ret = "# Source Target FS-Type Options FS-Freq FS-Dump"
|
|
ret += linesep
|
|
for point in self:
|
|
ret += point.to_mount_line()
|
|
ret += linesep
|
|
return ret
|
|
|
|
def write_mount_file(self, fp: io.TextIOWrapper):
|
|
fp.write(self.to_mount_file())
|
|
fp.flush()
|
|
|
|
def create_mount_file(self, path: str) -> Self:
|
|
with open(path, "w") as f:
|
|
self.write_mount_file(f)
|
|
return self
|
|
|
|
def load_mount_fp(self, fp: io.TextIOWrapper) -> Self:
|
|
for line in fp:
|
|
if line is None: break
|
|
line = line.strip()
|
|
if len(line) <= 0: continue
|
|
if line.startswith("#"): continue
|
|
mnt = MountPoint.parse_mount_line(line)
|
|
self.append(mnt)
|
|
return self
|
|
|
|
def load_mount_file(self, file: str) -> Self:
|
|
with open(file, "r") as f:
|
|
self.load_mount_fp(f)
|
|
return self
|
|
|
|
def load_fstab(self) -> Self:
|
|
self.load_mount_file("/etc/fstab")
|
|
return self
|
|
|
|
def load_mounts(self) -> Self:
|
|
self.load_mount_file("/proc/mounts")
|
|
return self
|
|
|
|
def load_mounts_pid(self, pid: int) -> Self:
|
|
path = f"/proc/{pid}/mounts"
|
|
self.load_mount_file(path)
|
|
return self
|
|
|
|
def from_mount_fp(self, fp: io.TextIOWrapper) -> Self:
|
|
self.clear()
|
|
self.load_mount_fp(fp)
|
|
return self
|
|
|
|
def from_mount_file(self, file: str) -> Self:
|
|
self.clear()
|
|
self.load_mount_file(file)
|
|
return self
|
|
|
|
def from_fstab(self, ) -> Self:
|
|
self.clear()
|
|
self.load_fstab()
|
|
return self
|
|
|
|
def from_mounts(self, ) -> Self:
|
|
self.clear()
|
|
self.load_mounts()
|
|
return self
|
|
|
|
def from_mounts_pid(self, pid: int) -> Self:
|
|
self.clear()
|
|
self.load_mounts_pid(pid)
|
|
return self
|
|
|
|
@staticmethod
|
|
def parse_mount_fp(fp: io.TextIOWrapper):
|
|
return MountTab().from_mount_fp(fp)
|
|
|
|
@staticmethod
|
|
def parse_mount_file(file: str):
|
|
return MountTab().from_mount_file(file)
|
|
|
|
@staticmethod
|
|
def parse_fstab():
|
|
return MountTab().from_fstab()
|
|
|
|
@staticmethod
|
|
def parse_mounts():
|
|
return MountTab().from_mounts()
|
|
|
|
@staticmethod
|
|
def parse_mounts_pid(pid: int):
|
|
return MountTab().from_mounts_pid(pid)
|