arch-image-builder/builder/lib/mount.py

455 lines
10 KiB
Python
Raw Normal View History

import io
import os
import libmount
from typing import Self
from logging import getLogger
from builder.lib.blkid import Blkid
from builder.lib.serializable import SerializableDict, SerializableList
log = getLogger(__name__)
virtual_fs = [
"sysfs", "tmpfs", "proc", "cgroup", "cgroup2", "hugetlbfs",
"devtmpfs", "binfmt_misc", "configfs", "debugfs", "tracefs", "cpuset",
"securityfs", "sockfs", "bpf", "pipefs", "ramfs", "binder", "bdev",
"devpts", "autofs", "efivarfs", "mqueue", "resctrl", "pstore", "fusectl",
]
real_fs = [
"reiserfs", "ext4", "ext3", "ext2", "cramfs", "squashfs", "minix", "vfat",
"msdos", "exfat", "iso9660", "hfsplus", "gfs2meta", "ecryptfs", "ntfs3", "ufs",
"jffs2", "ubifs", "affs", "romfs", "ocfs2_dlmfs", "omfs", "jfs", "xfs", "nilfs2",
"befs", "ocfs2", "btrfs", "hfs", "gfs2", "udf", "f2fs", "bcachefs", "erofs",
]
class MountPoint(SerializableDict):
device: str = None
source: str = None
target: str = None
fstype: str = None
option: list[str] = []
fs_freq: int = 0
fs_passno: int = 0
@property
def virtual(self) -> bool:
"""
Is current mount point a virtual filesystem
"""
if self.fstype:
if self.fstype in virtual_fs: return True
if self.fstype in real_fs: return False
if self.device:
if self.device.startswith(os.sep): return False
if self.source:
if self.source.startswith(os.sep): return False
if "=" in self.source: return False
return True
@property
def level(self) -> int:
"""
Get current target levels
/ => 1
/boot => 2
/usr/bin => 3
"""
if self.target is None: return 0
path = os.path.realpath(self.target)
cnt = path.count(os.sep)
if (
path.startswith(os.sep) and
not path.endswith(os.sep)
): cnt += 1
return cnt
@property
def options(self):
"""
Get options as string
"""
return ",".join(self.option)
@options.setter
def options(self, val: str):
"""
Set options from string
"""
self.option = val.split(",")
def get_option(self, opt: str) -> str | None:
"""
Get an option from string
"""
if opt in self.option:
return opt
if "=" not in opt:
start = f"{opt}="
values = (o for o in self.option if o.startswith(start))
return next(values, None)
return None
def remove_option(self, opt: str | list[str]) -> Self:
"""
Remove an option
"""
if type(opt) is list[str]:
for o in opt:
self.remove_option(o)
return
if opt in self.option:
self.option.remove(opt)
return
if "=" in opt: opt = opt[:opt.find("=")]
val = self.get_option(opt)
if val:
self.remove_option(val)
return self
def exclusive_option(self, opt: str, opt1: str, opt2: str) -> Self:
"""
Remove a exclusive option
"""
if opt == opt1 or opt == opt2:
self.remove_option(opt1)
return self
def add_option(self, opt: str) -> Self:
"""
Add an option
"""
self.exclusive_option(opt, "ro", "rw")
self.exclusive_option(opt, "dev", "nodev")
self.exclusive_option(opt, "suid", "nosuid")
self.exclusive_option(opt, "exec", "noexec")
self.exclusive_option(opt, "relatime", "noatime")
self.remove_option(opt)
if opt not in self.option:
self.option.append(opt)
return self
def ro(self) -> Self:
"""
Set mount point to read-only
"""
self.add_option("ro")
return self
def rw(self) -> Self:
"""
Set mount point to read-write
"""
self.add_option("rw")
return self
def have_source(self) -> bool: return self.source and self.source != "none"
def have_target(self) -> bool: return self.target and self.target != "none"
def have_fstype(self) -> bool: return self.fstype and self.fstype != "none"
def have_options(self) -> bool: return len(self.option) > 0
def update_device(self):
"""
Update device field from source
"""
if self.virtual or self.source is None: return
if self.source.startswith(os.sep):
self.device = self.source
return
if "=" in self.source:
self.device = Blkid().evaluate_tag(self.source)
return
def persist_source(self, tag: str = "UUID"):
"""
Change source to persist source
"""
if self.virtual: return
if self.device is None: self.update_device()
if self.device is None: return
tag = tag.upper()
if tag == "PATH":
self.source = self.device
return
self.source = Blkid().get_tag_value(
None, tag, self.device
)
def tolibmount(self) -> libmount.Context:
"""
To util-linux libmount context
"""
mnt = libmount.Context()
mnt.target = self.target
if self.have_source(): mnt.source = self.source
if self.have_fstype(): mnt.fstype = self.fstype
if self.have_options(): mnt.options = self.options
return mnt
def ismount(self) -> bool:
"""
Is current mount point mounted
"""
return os.path.ismount(self.target)
def mount(self) -> Self:
"""
Mount now
"""
if not os.path.exists(self.target):
os.makedirs(self.target, mode=0o0755)
if not os.path.ismount(self.target):
log.debug(
f"try mount {self.source} "
f"to {self.target} "
f"as {self.fstype} "
f"with {self.options}"
)
lib = self.tolibmount()
lib.mount()
return self
def umount(self) -> Self:
"""
UnMount now
"""
if os.path.ismount(self.target):
lib = self.tolibmount()
lib.umount()
log.debug(f"umount {self.target} successfuly")
return self
def from_mount_line(self, line: str) -> Self:
"""
Load from mtab / fstab
"""
d = line.split()
if len(d) != 6:
raise ValueError("bad mount line")
self.source = d[0]
self.target = d[1]
self.fstype = d[2]
self.options = d[3]
self.fs_freq = int(d[4])
self.fs_passno = int(d[5])
return self
def to_mount_line(self) -> str:
"""
To mount tab line string
PARTLABEL=
"""
self.fixup()
fields = [
self.source,
self.target,
self.fstype,
self.options,
str(self.fs_freq),
str(self.fs_passno),
]
return " ".join(fields)
def fixup(self) -> Self:
if not self.have_source(): self.source = "none"
if not self.have_target(): self.target = "none"
if not self.have_fstype(): self.fstype = "none"
if not self.have_options(): self.options = "defaults"
return self
def clone(self) -> Self:
mnt = MountPoint()
mnt.device = self.device
mnt.source = self.source
mnt.target = self.target
mnt.fstype = self.fstype
mnt.options = self.options
mnt.fs_freq = self.fs_freq
mnt.fs_passno = self.fs_passno
return mnt
def __init__(
self,
data: dict = None,
device: str = None,
source: str = None,
target: str = None,
fstype: str = None,
options: str = None,
option: list[str] = None,
fs_freq: int = None,
fs_passno: int = None,
):
super().__init__()
self.device = None
self.source = None
self.target = None
self.fstype = None
self.option = []
self.fs_freq = 0
self.fs_passno = 0
if data: self.from_dict(data)
if device: self.device = device
if source: self.source = source
if target: self.target = target
if fstype: self.fstype = fstype
if options: self.options = options
if option: self.option = option
if fs_freq: self.fs_freq = fs_freq
if fs_passno: self.fs_passno = fs_passno
@staticmethod
def parse_mount_line(line: str):
return MountPoint().from_mount_line(line)
class MountTab(list[MountPoint], SerializableList):
def find_folder(self, folder: str) -> Self:
"""
Find mount point target starts with folder
"""
root = os.path.realpath(folder)
return [mnt for mnt in self if mnt.target.startswith(root)]
def find_target(self, target: str) -> Self: return [mnt for mnt in self if mnt.target == target]
def find_source(self, source: str) -> Self: return [mnt for mnt in self if mnt.source == source]
def find_fstype(self, fstype: str) -> Self: return [mnt for mnt in self if mnt.fstype == fstype]
def clone(self) -> Self:
"""
Fully clone a MountTab
"""
mnts = MountTab()
for mnt in self:
mnts.append(mnt.clone())
return mnts
def mount_all(self, prefix: str = None, mkdir: bool = False) -> Self:
"""
Mount all mount points
"""
for mnt in self:
m = mnt.clone()
if prefix:
if m.target == "/": m.target = prefix
else: m.target = os.path.join(prefix, m.target[1:])
if mkdir and not os.path.exists(m.target):
os.makedirs(m.target, mode=0o0755)
m.mount()
return self
def resort(self):
"""
Sort mount points by path level
"""
self.sort(key=lambda x: (x.level, len(x.target), x.target))
def strip_virtual(self) -> Self:
"""
Remove all virtual filesystem mount points
"""
for mnt in self:
if mnt.virtual:
self.remove(mnt)
return self
def to_list(self) -> list:
return self
def from_list(self, o: list) -> Self:
self.clear()
for i in o: self.append(MountPoint().from_dict(i))
return self
def to_mount_file(self, linesep=os.linesep) -> str:
"""
Convert to mount file (fstab)
"""
ret = "# Source Target FS-Type Options FS-Freq FS-Dump"
ret += linesep
for point in self:
ret += point.to_mount_line()
ret += linesep
return ret
def write_mount_file(self, fp: io.TextIOWrapper):
fp.write(self.to_mount_file())
fp.flush()
def create_mount_file(self, path: str) -> Self:
with open(path, "w") as f:
self.write_mount_file(f)
return self
def load_mount_fp(self, fp: io.TextIOWrapper) -> Self:
for line in fp:
if line is None: break
line = line.strip()
if len(line) <= 0: continue
if line.startswith("#"): continue
mnt = MountPoint.parse_mount_line(line)
self.append(mnt)
return self
def load_mount_file(self, file: str) -> Self:
with open(file, "r") as f:
self.load_mount_fp(f)
return self
def load_fstab(self) -> Self:
self.load_mount_file("/etc/fstab")
return self
def load_mounts(self) -> Self:
self.load_mount_file("/proc/mounts")
return self
def load_mounts_pid(self, pid: int) -> Self:
path = f"/proc/{pid}/mounts"
self.load_mount_file(path)
return self
def from_mount_fp(self, fp: io.TextIOWrapper) -> Self:
self.clear()
self.load_mount_fp(fp)
return self
def from_mount_file(self, file: str) -> Self:
self.clear()
self.load_mount_file(file)
return self
def from_fstab(self, ) -> Self:
self.clear()
self.load_fstab()
return self
def from_mounts(self, ) -> Self:
self.clear()
self.load_mounts()
return self
def from_mounts_pid(self, pid: int) -> Self:
self.clear()
self.load_mounts_pid(pid)
return self
@staticmethod
def parse_mount_fp(fp: io.TextIOWrapper):
return MountTab().from_mount_fp(fp)
@staticmethod
def parse_mount_file(file: str):
return MountTab().from_mount_file(file)
@staticmethod
def parse_fstab():
return MountTab().from_fstab()
@staticmethod
def parse_mounts():
return MountTab().from_mounts()
@staticmethod
def parse_mounts_pid(pid: int):
return MountTab().from_mounts_pid(pid)